Kafka重要知识点之消费组概念

Stella981
• 阅读 416

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

Kafka重要知识点之消费组概念

Kafka重要知识点之消费组概念

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

Kafka重要知识点之消费组概念

Kafka重要知识点之消费组概念

暴走大数据

点击右侧关注,暴走大数据!

Kafka重要知识点之消费组概念

在kafka中,某些Topic的主题拥有数百万甚至数千万的消息量,如果仅仅靠个消费者进程消费,那么消费速度会非常慢,所以我们需要使用使用kafka提供的消费组功能,同一个消费组的多个消费者就能分布到多个物理机器上以加速消费

每个消费者组都会有一个独一无二的消费者组id来标记自己。每一个消费者group可能有一个或者多个消费者,对于当前消费组来说,topic中每条数据只要被消费组内任何一个消费者消费一次,那么这条数据就可以认定被当前消费组消费成功。

总而言之,kafka的消费组有如下三个特征

  1. 每个消费组有一个或者多个消费者

  2. 每个消费组拥有一个唯一性的标识id

  3. 消费组在消费topic的时候,topic的每个partition只能分配给一个消费者

Kafka消费组消费的示例代码如下所示

使用如下命令创建一个具有8个partition的HelloKafka

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 8 --topic HelloKafka

如下是kafka集群消费的示例代码

public class GroupConsumer {    private KafkaConsumer<String, String> consumer;    private final int id;    public GroupConsumer(int id) {        this.id = id;        Properties props = new Properties();        props.put("client.id", "client-" + id);        //zookeeper 配置        props.put("bootstrap.servers", KafkaConfig.SERVER);        //group 代表一个消费组        props.put("group.id", KafkaConfig.GROUP);        //zk连接超时        props.put("zookeeper.session.timeout.ms", "4000");        props.put("zookeeper.sync.time.ms", "200");        //关闭自动提交        props.put("enable.auto.commit", "false");        props.put("auto.offset.reset", "earliest");//        props.put("auto.commit.interval.ms", "1000");//        props.put("auto.offset.reset", "smallest");        //序列化类        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");        consumer = new KafkaConsumer<String, String>(props);        consumer.subscribe(Collections.singleton(KafkaConfig.TOPIC));    }    public void consume() {        while (true) {            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));            for (ConsumerRecord<String, String> record : records) {                System.out.printf("id = %d , partition = %d , offset = %d, key = %s, value = %s%n", id, record.partition(), record.offset(), record.key(), record.value());            }            consumer.commitSync();        }    }    public static void main(String[] args) throws InterruptedException {        for (int i = 0; i < 8; i++) {            final int id = i;            new Thread() {                @Override                public void run() {                    new GroupConsumer(id).consume();                }            }.start();        }        TimeUnit.SECONDS.sleep(Long.MAX_VALUE);    }}

Kafka是如何保证每个主题的每条消息只能被消费者组的一个消费者消费呢?取决于两个因素,一个是partition管理,另一个是对offset的管理

1. Partition分配

一个kafka主题会有多个分区,分配partition需要保证每个分区都有消费者消费,topic的每个分区只能分配给某个消费组下的一个消费者,这样的话也能保证每个partition的顺序消费,如果分区数目比较多那么一个消费者会被分配到多个分区,比如第一张图Consumer Group A,如果分区数目比较少,但是消费者数目比较多,某些消费者就会处于空闲状态,比如第二张图的Consumer5。

Kafka重要知识点之消费组概念

Kafka重要知识点之消费组概念

2. 分配策略

在实例代码中还涉及到了分区分配策略参数partition.assignment.strategy,kafka自带三种扩展策略

Kafka重要知识点之消费组概念

默认情况下,该参数的值为RangeAssignor,它的目的是尽量保证将分区平均分配给消费者。

2.1 RangeAssignor

RangeAssignor在分配partition的时候,它首先会对Consumer进行排序,排序的依据是字典序。举一个例子,比如现在有一个消费组,它包含3个消费者c1,c2,c3。该消费组订阅了一个包含8个partition的Order Topic,那么分配结果如下

C1:Partition0, Partition1,Partition2C2:Partition3, Partition4,Partition5C3:Partition6, Partition7

这个分配结果会导致字典序前部的消费者分配到的partition数量过多,如果该消费组订阅的主题很多,C1和C2的负载会越来越高,最终会导致消费者不能及时消费。

2.2 RoundRobinAssignor

相比之下,RoundRobinAssignor分配策略就公平的多,它对应的参数是org.apache.kafka.clients.consumer.RoundRobinAssignor。它的原理是将消费者以及消费者订阅的所有Topic按照字典序进行排序,还是举一个例子,现在有一个消费组,它包含有3个消费者C1,C2,C3,他们都订阅了一个partition数目为4的Topic1,另一个partition为3的Topic2,那么分配结果如下

C1:Topic1-P0 Topic1-P3,Topic2-2C2:Topic1-P1, Topic2-P0,C3:Topic1-P2, Topic2-P1

如果这个消费组订阅了partition数目为4的Topic1,一个partition为3的Topic2,一个partition为5的Topic3,那么分配结果如下

C1:Topic1-P0 Topic1-P3,Topic2-2,Topic3-2C2:Topic1-P1, Topic2-P0,Topic3-0,Topic3-3C3:Topic1-P2, Topic2-P1,Topic3-1,Topic3-4

可以看到Topic的余数partition负载不会始终加在C1消费者上。

2.3 StickyAssignor

第三种分配策略为StickyAssignor,它对应的分配策略配置参数是org.apache.kafka.clients.consumer.StickyAssignor

我们思考一种场景,如果某个消费组包含3个消费者,C0,C1,C2,消费组订阅了3个partion的Topic 1和Topic2,那么使用StickyAssignor分配的结果如下

C1:Topic1-P0 Topic2-P0,C2:Topic1-P1, Topic2-P1,C3:Topic1-P2, Topic2-P2

这个分配结果和轮训分配好像一样,但是如果C1消费者宕机触发partition分配的重平衡,那么分配结果就发生了变化

C2:Topic1-P1, Topic2-P1,Topic1-P0C3:Topic1-P2, Topic2-P2,Topic1-P1

可以看到C2,C3原先持有的分区不变,但是StickyAssignor将C1的负载均衡的分配给了C2和C3,这很符合粘性分配的字面意思

2. 消费组offset管理

2.1 手动提交

上述的消费代码笔者使用如下参数关闭自动提交

props.put("enable.auto.commit", “false”);

在一批消息消费完成之后,不要忘了提交offset,否则会导致消费者重复消费相同的消息。,消费者在被关闭的时候,消费者也会自动提交offset,所以如果我们判断消费者完成消费,我们可以使用try-finally关闭消费者。手动提交offset有两种方式,同步和异步方式。

同步提交

consumer.commitSync();

所谓的同步,指的是Consumer会一直等待提交offset成功,在此期间不能继续拉取以及消费消息,如果提交失败, consumer会一直重复尝试提交,直到超时,默认的时间是60秒

异步提交

consumer.commitAsync();

异步提交不会阻塞消费者线程,提交失败的时候不会进行重试,但是我们可以为异步提交创建一个监听器,在提交失败的时候进行重试,下面的代码是注册监听器的代码

consumer.commitAsync(new RetryOffsetCommitCallback(consumer));

offset失败重试回调监听器

public class RetryOffsetCommitCallback implements OffsetCommitCallback {    private static Logger LOGGER = LoggerFactory.getLogger(RetryOffsetCommitCallback.class);    private KafkaConsumer<String, String> consumer;    public RetryOffsetCommitCallback(KafkaConsumer<String, String> consumer) {        this.consumer = consumer;    }    @Override    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {        if (exception != null) {            LOGGER.info(exception.getMessage(), exception);            consumer.commitAsync(offsets, this);        }    }}

2.2 自动提交

在大多数情况下,为了避免消息被重复消费,我们使用自动提交机制,我们可以通过如下参数进行配置

props.put("enable.auto.commit",”true”);props.put("auto.commit.interval.ms",”1000”);

在上述参数配置情况下,Consumer会以每秒一次的频率定期的持久化offset,看到这笔者有一个疑问,如果消费者意外宕机,那么距离上一次提交的offset又会被重新消费,如果业务和钱相关,那么就会有大麻烦,所以消费者消费消息的时候,需要实现幂等性,关于幂等性的话题,笔者未来写一篇文章介绍如何实现消费幂等性。

除了以固定频率提交offset之外,kafka在关闭consumer的时候也会提交offset

consumer.close()

旧版本的kafka会将消费偏移提交到Zookeeper中,提交的路径如下

/consumers/ConsumerGroup/offsets/TestTopic/0

其中ConsumerGroup代表具体的消费组,而TestTopic代表消费主题,末尾的数字代表分区号。但是Zookeeper作为分布式协调系统,不适合作为频繁读写工具。于是新版本的kafka将消费位移存储在kafka内部的主题_consumer_offsets中。

在一个大型系统中,会有非常多的消费组,如果这些消费组同时提交位移,Broker服务器会有比较大的负载,所以kafka的_consumer_offsets拥有50个分区,这样_consumer_offsets的分区就能均匀分布到不同的机器上,即使多个消费组同时提交offset,负载也能均匀的分配到不同的机器上

消费者在提交位移的时候,消费者将位移提交到哪个分区呢?消费者是通过如下公式确定partition的

Math.abs(hash(groupID)) % numPartitions

消费者会定期向这个partition提交位移,那么同一个消费组,同一个Topic,同一个partition提交的位移会不会越来越多呢?答案是不会,kafka有压缩机制,会定期压缩_consumer_offsets,压缩的依据是消息message中包含的key(即groupID+topic+分区id),kafka会合并相同的key,,只留下最新消费组。

2.3 手动提交VS自动提交

上述文章花了很长的时间介绍了手动提交和自动提交,那么我们如何选择呢?自动提交的优点是实现简单,但是消息可能会发生丢失,举一个场景,比如Consumer从broker拉取了500条消息,此时正在消费100条,但是自动提交机制可能就将offset提交了,如果此时Consumer宕机,那么当前的ConsumerGroup还有400条消息就再也消费不到了。如果消息特别重要绝对不允许丢失,那么应该使用手动提交offset。

Kafka重要知识点之消费组概念

Kafka重要知识点之消费组概念

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶

微信公众号|import_bigdata

欢迎点赞+收藏+转发朋友圈素质三连

Kafka重要知识点之消费组概念

文章不错?点个【在看】吧!** 👇**

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
3年前
ClickHouse大数据领域企业级应用实践和探索总结
点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源!(https://oscimg.oschina.net/oscnet/bb00e5f54a164cb9827f1dbccdf87443.jpg)!(https://oscimg.oschina.net/oscnet/dc8da835ff1b4
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
11小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(