配置参考
然后不废话,直接贴最终的关键配置:
# 想实现消息队列中保存2小时的消息,那么配置应该像这样:
log.roll.hours=1
log.retention.hours=2
log.segment.delete.delay.ms=0
Kafka日志存储机制分析
首先先贴一下对我帮助很大的一篇文档: https://www.zybuluo.com/jewes/note/64450 。这篇文章详细阐述了kafka的日志存储机制,建议深入阅读。
然后我简述一下。我们将对照kafka的broker配置简单说明一下: http://kafka.apache.org/documentation.html#brokerconfigs
Kafka的持久化策略更像redis
—— 数据都在内存中,定期flush到硬盘上持久化存储,以保证重启的时候数据不丢。flush策略由log.flush.*
这些properties控制。
每个topic可以存储于多个partition
,每个partition
在kafka的log目录下表现为topicname-id
这样的文件夹,如mytopic-0
。kafka队列中的内容会按照日志
的形式持久化到硬盘上。每个日志文件称为“段”(segment
)。
Kafka清理队列中过期message
的方式实际上就是删除过期的segment
,这种表现形式十分类似于日志滚动
。因此,控制kafka队列中消息的保存时间的方式实际上就是日志文件定期分段,然后定期清理掉过期的段文件。
与控制分段策略相关的几个properties:
log.roll.{hours,ms} —— 日志滚动的周期时间(小时,毫秒,log.roll.ms优先级更高),到达指定周期时强制生成一个新的segment。
log.segment.bytes —— 每个segment的最大容量上限(默认1GB)。到达指定容量时会强制生成一个新的segment。
与过期segment处理策略相关的几个properties:
cleanup.policy={compact,delete} —— 过期segment处理算法,默认delete。
log.retention.{hours,minutes,ms} —— 日志保留时间(小时,分钟,毫秒。优先级依次升高),超出保留时间的日志执行cleanup.policy定义的操作
log.segment.delete.delay.ms —— 删除日志文件前的保留一段时间。默认60000。
log.retention.check.interval.ms —— log checker的检测是否需要删除文件的周期。默认300000。
现在解释一下开头的配置段的含义——每小时滚动一个日志文件,日志删除(cleanup.policy默认为delete)时间为2小时,