Kafka 架构设计

Stella981
• 阅读 766

Kafka 总体架构

Kafka 架构设计

kafka的总体架构还是遵循消息中间件的架构,即产生消息->存储消息->消费消息。

Kafka 生产者设计

Kafka 架构设计

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了 两个线程即主线程Sender 线程,以及一个缓冲区RecordAccumulator

  • RecordAccumulator:消息发送的内存缓冲区域,当该区域满了一后,生产者要么被阻塞,要么会抛出异常;RecordAccumulator 内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。
  • main 线程:将消息发送给 RecordAccumulator,发送过程中消息会经历拦截器,序列化,分区器;
  • Sender 线程:不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
  • batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据,默认16kb。
  • linger.ms:如果数据迟迟未达到 batch.size sender 等待 linger.time 之后就会发送数据。
  • buffer.memory:RecordAccumulator 缓冲区大小 默认32M。

这样设计带来的好处就是可以提升生产者的吞吐量:

  1. 主线程直接将消息发送到缓冲区,这样主线程的发送会执行得非常的快。
  2. Sender线程可以从缓冲区里面批量获并发送取数据到kafka broker,这样可以减少网络开销。

分区原则

Kafka 的生产者发送消息是面向分区的,也就是说在发送消息的时候必须知道消息是发送到那个分区之中,具体分区规则如下:

  1. 创建ProducerRecord对象时,指定 partition 的情况下,直接将指明的值直接作为 partiton 值,作为该消息对饮分区;
  2. 创建ProducerRecord对象时,没有指定 partition 值但有 key 的情况下,那么将通过 key 的 hash 值与 topic 的 partition 总数进行取余得到该消息对应的 partition 值;
  3. 创建ProducerRecord对象时,既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

生产者的数据可靠性保证

生产者的数据可靠性保证主要是通过ack机制和重试机制来保证的。kafka生产者向kafka集群发送数据时,当消息达到kafka集群里面的分区后会向kafka发送ack(acknowledgement 确认收到),如果生产者收到 ack,就会进行下一轮的发送,否则重新发送数据。为了兼顾新能和数据可靠性,kafka的应答机制有三个级别,分别是:

  • 1:当消息写到 leader 成功后向生产者返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据,默认是这个级别;
  • 0:不向生产者返回ack,这种情况下生产者延迟最低,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
  • -1(all):当ISR中的所有副本都成功写入消息之后才像生产者返回ack。这种方式数据可靠,但是延迟最高;如果ISR副本同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复

Kafka Cluster 设计

Kafka 架构设计

一台kafka服务器就是一个Broker,多个broker配置同一组zk就构成了一个kafka Cluster。

在Kafka Broker内部消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的,以此来屏蔽底层的复杂逻辑,类似我们常用的分库分表组件。

  • topic 是逻辑上的概念,而真实存储数据的partition(分区),分区是物理隔离的,可以分布在不同的机器上,一个分区只能被一个消费者组中的一个消费者消费,分区机制极大的提升了kafka的读写性能;
  • 每个分区单独维护offset,所以kafka只能保证单分区内消息的顺序性;
  • 每个分区有一个leader和多个follower(副本),leader负责处理生产者和消费者请求,follower只是备份数据,当leader挂掉,默认会选择ISR中的副本中offset最大的follower,将其提升成leader,副本机制使得kafka具备高可用性和安全性(数据安全),即使其中某些broker挂掉,kafka也能提供服务;
  • kafka的所有消息都是持久化到磁盘的,这使得kafka消息具备了持久性;
  • kafka还采用了顺序写和零拷贝技术,使得kafka单机写性能很高。同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s;

分区与副本

为了提升kafka的可用性和数据的可靠性,kafka引入了副本机制,每一个分区存在一个leader副本和多个follower副本,follower副本分散在不同的机器当中。kafka与生产者和消费者交互都是通过leader副本来完成的,follower副本的作用只是备份。

当生产者发出的一条消息首先会被写入分区的leader副本,然后等待ISR集合中的所有 follower 副本同步完leader副本数据,在后会更新分区的HW,进而消费者可以消费到这条消息。

ISR

ISR(in-sync replica set)是ISR 是指与leader 副本保持同步状态的副本集合,也leader副本本身,在ack机制设置为-1的情况下,需要保证每条消息必须完全同步到ISR集合中所有副本才会返回ACK。ISR中默认包含leader分区和所有 follower 分区,当follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由更 replica.lag.time.max.ms 参数设定。当Leader 发生故障之后,就会从 ISR 中选举新的 leader。

AR 是指所有的副本

HW、LEO

kafka中的每一条消息都有一个offset,对消息的处理大多都是基于offset的。一个ISR中存在多个副本,而所有副本所保存的数据可能是不一样的,这个时候每个副本中的offset可能是不一样的,如图:

Kafka 架构设计

  • LEO(High Watermark):指的是每个副本最大的 offset。
  • HW(Log End Offset):指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

可靠性分析

越多的副本数越能够保证数据的可靠性,不过副本数越多那么需要同步的数据也就越多,这也会引起磁盘、网络带宽的浪费,同时会引起性能的下降。一般而言,设置副本数为3即可满足绝大多数场景对可靠性的要求,而对可靠性要求更高的场景下,可以适当增大这个数值,比如国内部分银行在使用 Kafka 时就会设置副本数为 5,acks设置为-1。

由于副本机制和ack机制的存在,所以即使Kafka其中的某些节点挂掉后,Kafka依然可以对外提供服务:

  • follower 故障: follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,然后从 HW 开始向 leader 进行同步。 等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重 新加入 ISR 了。

  • leader 故障: leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Kafka文件存储机制

刚刚说了kafka的数据是存放在分区中的,我们通过查看文件server.properties中的log.dirs配置我们就可以找到数据文件的存放地址。

# A comma separated list of directories under which to store log files
log.dirs=/kafka/kafka-logs-5d429197312c

Kafka 架构设计

通过文件我们可以发现数据kafka的数据命名方式是topic-partition的方式,如:test_cluster_topic-0。进入test_cluster_topic-0目录查看数据文件:

bash-4.4# cd test_cluster_topic-0/
bash-4.4# ls
00000000000000000000.index      00000000000013825171.log
00000000000000000000.log        00000000000013825171.snapshot
00000000000000000000.timeindex  00000000000013825171.timeindex
00000000000010000000.snapshot   leader-epoch-checkpoint
00000000000013825171.index
bash-4.4# 
  • 00000000000000000000.index:基于offset的索引文件。
  • 00000000000000000000.timeindex:基于时间的索引文件。
  • 00000000000000000000.log:真实的数据文件。

partition中的数据采用了分片和索引机制,文件命名就是以该文件中最小的offset来命名,如00000000000000000000就表示文件中起始的offset值是0。生产者的消息会追加的方式添加到日志文件中。

Kafka 架构设计

数据索引机制

Kfka的采用了稀疏索引的方式来构建索引,即它并不保证每一条消息在索引文件中都有相应索引。默认情况下当写入4kb数据时就会增加一个索引,我们可以通过broker参数log.index.interval.bytes = 4096的值来改变索引的稀疏度,结构如图:

index固定是8位,前4位(relativeOffset)表示相对偏移量,后四位(position)表示物理地址。

标红部分表示基础偏移量(baseOffset),那么绝对偏移量offset = baseOffset + relativeOffset

Kafka 架构设计

那kafka又是如何来定位一条数据的呢?这里以查找31为例:

  1. 根据我们需要查找的偏移量定位到那个索引文件。
  2. 计算相对偏移量 31-0=31
  3. 在索引文件中,通过二分查查找发,找到不大于31的最大索引,即29。
  4. 最后根据物理地址261开始,从.log文件中顺序查到到31的数据项。

因为这里的baseOffset为0,所以图里面的relativeOffset就是绝对偏移量。

Kafka 消费者设计

Kafka 架构设计

kafka消费者分为消费者组和消费者。一个消费者组中包含多个消费者。一个topic中的一条消息会被所有消费者组消费一次,但是在同一个消费者组中,正常情况下一条消息只会被一个消费者消费。

每一个分区只能被一个消费组中的一个消费者所消费。

从上面的图我们可以看出,如果想增加消费能力,我们必须同时增加消费者数量和topic的分区数量。每次增加消费者,都会触发消费者消费分区的重新分配。

分区分配策略

RangeAssignor分配策略

RangeAssignor 分配策略是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。

这种方式在分配时存在一个跨度,所以会导致分区分配不均匀,进而出现某些消费者过载的情况。

假设消费组内有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

C0: t0p0、t0p1、t1p0、t1p1
C1: t0p2、t1p2

C0过载。

RoundRobinAssignor分配策略

RoundRobinAssignor分配策略是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。

这种方式,如果同一个消费者组中的消费者订阅信息是一样的时候,分区分配还是非常均匀的。但是当消费者的订阅信息不一样时,进行分区分配时就是不完全的轮询,这样就有可能发生分配不均匀的情况。

假设消费组内有3个消费者(C0、C1和C2),它们共订阅了3个主题(t0、t1、t2),这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:

C0: t0p0
C1: t1p0
C2: t1p1、t2p0、t2p1、t2p2

C2过载。

StickyAssignor分配策略

它在RoundRobinAssignor基础上新增了两个目的:

  1. 分区的分配要尽可能均匀。
  2. 分区的分配尽可能与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。

假设消费组内有3个消费者(C0、C1和C2),它们共订阅了3个主题(t0、t1、t2),这3个主题分别有1、2、3个分区,即整个消费组订阅了t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区。具体而言,消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2,那么最终的分配结果为:

C0: t0p0
C1: t1p0、t1p1
C2: t2p0、t2p1、t2p2

做到尽量均匀。

Kafka的特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展。
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
  • 高并发:支持数千个客户端同时读写。
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(