Kafka 消费线程模型在中通消息服务运维平台的应用

Stella981
• 阅读 666

最近有些朋友问到 Kafka 消费者消费相关的问题,如下:

Kafka 消费线程模型在中通消息服务运维平台的应用

以上问题看出来这位朋友刚接触 Kafka,我们都知道 Kafka 相对 RocketMQ 来说,消费端是非常 “原生” 的,不像 RocketMQ 将消费线程模型都封装好,用户不用关注内部消费细节。

Kafka 的消费类 KafkaConsumer 是非线程安全的,意味着无法在多个线程中共享 KafkaConsumer 对象,因此创建 Kafka 消费对象时,需要用户自行实现消费线程模型,常见的消费线程模型如下:

1、每个线程维护一个 KafkaConsumer

Kafka 消费线程模型在中通消息服务运维平台的应用

从消费消费模型可看出每个 KafkaConsumer 会负责固定的分区,因此无法提升单个分区的消费能力,如果一个主题分区数量很多,只能通过增加 KafkaConsumer 实例提高消费能力,这样一来线程数量过多,导致项目 Socket 连接开销巨大。

2、单 KafkaConsumer 实例 + 多 worker 线程

Kafka 消费线程模型在中通消息服务运维平台的应用

当 KafkaConsumer 实例与消息消费逻辑解耦后,我们不需要创建多个 KafkaConsumer 实例就可进行多线程消费,还可根据消费的负载情况动态调整 worker 线程,具有很强的独立扩展性,在公司内部使用的多线程消费模型就是用的单 KafkaConsumer 实例 + 多 worker 线程模型。

中通消息服务运维平台(ZMS)使用的 Kafka 消费线程模型是第二种:单 KafkaConsumer 实例 + 多 worker 线程。

以下我们来分析 ZMS 是如何实现单 KafkaConsumer 实例 + 多 worker 线程的消费线程模型的。

com.zto.consumer.KafkaConsumerProxy#addUserDefinedProperties

Kafka 消费线程模型在中通消息服务运维平台的应用

KafkaConsumerProxy 对 KafkaConsumer 进行了一层封装处理,是 ZMS 对外提供的 Kafka 消费对象,在创建一个 KafkaConsumerProxy 对象时,会进行以上属性赋值的具体操作,其中会根据用户配置进行消费线程的设置,从图中可看出,是否顺序消费对创建的线程池也是不一样的,ZMS 为什么会这么做呢?

单 KafkaConsumer 实例 + 多 worker 线程消费线程模型,由于消费逻辑是利用多线程进行消费的,因此并不能保证其消息的消费顺序,如果我们需要在 Kafka 中实现顺序消费,那么需要保证同一类消息放入同一个线程当中,我用如下图表示:

Kafka 消费线程模型在中通消息服务运维平台的应用

但需要注意的是,以上仅仅是保证正常情况下能够实现顺序消费,如果期间出现重平衡等异常情况,就会导致消费顺序被打乱,不过本身像 RocketMQ 一样是不能保证严格的顺序消费,对于能容忍消息短暂乱序的业务来说,这是一个不错的实现方式。

com.zto.consumer.KafkaConsumerProxy#register

Kafka 消费线程模型在中通消息服务运维平台的应用

以上,ZMS 每注册一个 KafkaConsumerProxy,都会使用新的线程去处消费 KafkaConsumer,前面也说过了 KafkaConsumer 是非线程安全的。

com.zto.consumer.KafkaConsumerProxy#submitRecords

Kafka 消费线程模型在中通消息服务运维平台的应用

以上是 ZMS 实现多线程消费逻辑的核心,ZMS 会对用消息分区和线程池列表缓存进行取模,从而使得相同分区的消息会被分配到相同线程池中执行,对于顺序消费来说至关重要,前面我也说了,当用户配置了顺序消费时,每个线程池只会分配一个线程,如果相同分区的消息分配到同一个线程池中执行,也就意味着相同分区的消息会串行执行,实现消息消费的顺序性。

以上就是 ZMS Kafka 消费线程模型的简单分析。

最后附上 ZMS 的 GitHub 地址:

https://github.com/ZTO-Express/zms

欢迎大家提出宝贵意见。

本文分享自微信公众号 - 后端进阶(objcoding)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
9小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(