作者:京东科技 徐拥
入门
1、什么是kafka?
apache Kafka is a distributed streaming platform. What exactly dose that mean?
Apache Kafka 是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)
2、kafka全景图:
3、Kafka的版本演进:
4、kafka选型:
Apache Kafka:也称社区版 Kafka。优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级的特性。(如果你仅仅需要一个消息引擎系统亦或是简单的流处理应用场景,同时需要对系统有较大把控度,那么我推荐你使用 Apache Kafka)
Confluent Kafka :Confluent 公司提供的 Kafka。优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证;缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。(如果你需要用到 Kafka 的一些高级特性,那么推荐你使用 Confluent Kafka。)
CDH/HDP Kafka:大数据云公司提供的 Kafka,内嵌 Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。(如果你需要快速地搭建消息引擎系统,或者你需要搭建的是多框架构成的数据平台且 Kafka 只是其中一个组件,那么我推荐你使用这些大数据云公司提供的 Kafka)
5、Kafka的基本概念:
6、Kafka的基本结构:
7、Kafka的集群结构:
8、kafka的应用场景(用户注册/异步):
9、kafka队列模式---点对点:
10、kafka队列模式---发布/订阅:
11、kafka构成角色:
1、broker:
消息格式: 主题 - 分区 - 消息 、主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份
这样设计的原因是:不使用多topic做负载均衡,意义在于对业务屏蔽该逻辑。业务只需要对topic进行发送,指定负载均衡策略即可 同时 topic分区是实现负载均衡以及高吞吐量的关键
Topic的创建流程
2、Producer:
发送消息流程
3、Consumer:
Kafka消费者对象订阅主题并接收Kafka的消息,然后验证消息并保存结果。Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。消费者组的设计是对消费者进行的一个横向伸缩,用于解决消费者消费数据的速度跟不上生产者生产数据的速度的问题,通过增加消费者,让它们分担负载,分别处理部分分区的消息
4、Consumer Group:
它是kafka提供的具有可扩展且可容错的消费者机制
特性:
1、 Consumer Group下可以有一个或多个 Consumer实例;
2、在一个Katka集群中,Group ID标识唯一的一个Consumer Group;
3、 Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的 某个Consumer实例消费。
Consumer Group 两大模型:
1、如果所有实例都属于同一个Group,那么它实现的是消息队列模型;
2、如果所有实例分别属于不同的GrouD,那么它实现的就是发布/订阅模型。
12、Kafka的工作流程:
13、Kafka常用命令:
进阶
14、Kafka的文件存储机制—log:
15、Kafka的文件存储机制—分片/索引:
16、Kafka的文件存储机制—index/log:
17、kafka 如何支持百万QPS?
顺序读写 :
生产者写入数据和消费者读取数据都是顺序读写的
Batch Data(数据批量处理):
当消费者(consumer)需要消费数据时,首先想到的是消费者需要一条,kafka发送一条,消费者再要一条kafka再发送一条。但实际上 Kafka 不是这样做的,Kafka 耍小聪明了。Kafka 把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候 Kafka 直接把文件发送给消费者。比如说100万条消息放在一个文件中可能是10M的数据量,如果消费者和Kafka之间网络良好,10MB大概1秒就能发送完,既100万TPS,Kafka每秒处理了10万条消息。
MMAP(内存映射文件):
MMAP也就是内存映射文件,在64位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射,完成映射之后对物理内存的操作会被同步到硬盘上。
通过MMAP技术进程可以像读写硬盘一样读写内存(逻辑内存),不必关心内存的大小,因为有虚拟内存兜底。这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。也有一个很明显的缺陷,写到MMAP中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 flush 的时候才把数据真正的写到硬盘。
Zero Copy(零拷贝):
如果不使用零拷贝技术,消费者(consumer)从Kafka消费数据,Kafka从磁盘读数据然后发送到网络上去,数据一共发生了四次传输的过程。其中两次是 DMA 的传输,另外两次,则是通过 CPU 控制的传输。
第一次传输:从硬盘上将数据读到操作系统内核的缓冲区里,这个传输是通过 DMA 搬运的。
第二次传输:从内核缓冲区里面的数据复制到分配的内存里面,这个传输是通过 CPU 搬运的。
第三次传输:从分配的内存里面再写到操作系统的 Socket 的缓冲区里面去,这个传输是由 CPU 搬运的。
第四次传输:从 Socket 的缓冲区里面写到网卡的缓冲区里面去,这个传输是通过 DMA 搬运的。
实际上在kafka中只进行了两次数据传输,如下图:
第一次传输:通过 DMA从硬盘直接读到操作系统内核的读缓冲区里面。
第二次传输:根据 Socket 的描述符信息直接从读缓冲区里面写入到网卡的缓冲区里面。
我们可以看到同一份数据的传输次数从四次变成了两次,并且没有通过 CPU 来进行数据搬运,所有的数据都是通过 DMA 来进行传输的。没有在内存层面去复制(Copy)数据,这个方法称之为零拷贝(Zero-Copy)。
无论传输数据量的大小,传输同样的数据使用了零拷贝能够缩短 65%的时间,大幅度提升了机器传输数据的吞吐量,这也是Kafka能够支持百万TPS的一个重要原因
18、压缩:
特性:
节省网络传输带宽以及 Kafka Broker 端的磁盘占用。
生产者配置 :
compression.type
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producerproducer = new KafkaProducer<>(props)
broker开启压缩:
Broker 端也有一个参数叫 compression.type 默认值为none,这意味着发送的消息是未压缩的。否则,您指定支持的类型:gzip、snappy、lz4或zstd。 Producer 端压缩、Broker 端保持、Consumer 端解压缩。
broker何时压缩:
情况一:Broker 端指定了和 Producer 端不同的压缩算法。(风险:可能会发生预料之外的压缩 / 解压缩操作,表现为 Broker 端 CPU 使用率飙升)
想象一个对话:
Producer 说:“我要使用 GZIP 进行压缩。
Broker 说:“不要,我这边接收的消息必须使用配置的 lz4 进行压缩
情况二: Broker 端发生了消息格式转换 (风险:涉及额外压缩/解压缩,且 Kafka 丧失 Zero Copy 特性)
Kafka 共有两大类消息格式,社区分别称之为 V1 版本和 V2 版本
为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩
消息何时解压缩:
Consumer:收到到压缩过的消息会解压缩还原成之前的消息。
broker:收到producer的消息 压缩算法和自己的不一致/兼容新老版本的消息格式
压缩算法对比:
以 Kafka 为例,吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;压缩比方面,zstd > LZ4 > GZIP > Snappy;
具体到物理资源,使用 Snappy 算法占用的网络带宽最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
在 CPU 使用率方面,各个算法表现得差不多,只是在压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU;
19、Exactly-Once(ACK应答机制):
1、At Least Once
最少发送一次,Ack级别为-1,保证数据不丢失
2、At Most Once
最多发送一次,Ack级别为1,保证数据不重复
3、幂等性
保证producer发送的数据在broker只持久化一条
4、Exactly Once(0.11版本)
At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将Producer的参数中 enable.idompotence设置为 true即可。 Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。
20、producer如何获取metadata:
1:在创建KafkaProducer实例时 第一步:生产者应用会在后台创建并启动一个名为Sender的线程,
2:该Sender线程开始运行时,首先会创建与Broker的连接。 第二步:此时不知道要连接哪个Broker,kafka会通过METADATA请求获取集群的元数据,连接所有的Broker。
3:Producer 通过 metadata.max.age.ms定期更新元数据,在连接多个broker的情况下,producer的InFlightsRequests中维护着每个broker的等待回复消息的队列,等待数量越少说明broker处理速度越快,负载越小,就会发到哪个broker上
21、kafka真的会丢消息吗?
kafka最优配置:
Producer:
如果是Java客户端 建议使用 producer.send(msg, callback) ,callback(回调)它能准确地告诉你消息是否真的提交成功了。
设置 acks = all。acks 是 Producer 的参数,如果设置成 all,需要所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
设置 retries 为一个较大的值。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
Consumer:
消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。
broker :
设置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
设置 replication.factor >= 3,目前防止消息丢失的主要机制就是冗余。
设置 min.insync.replicas > 1,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
22、kafka Replica:
本质就是一个只能追加写消息的提交日志。根据 Kafka 副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用
3个特性:
第一,在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
第二,Kafka 的副本机制比其他分布式系统要更严格一些。在 Kafka 中,追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
第三,当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
意义: 方便实现“Read-your-writes”
(1)含义:当使用生产者API向Kafka成功写入消息后,马上使用消息者API去读取刚才生产的消息。 (2)如果允许追随者副本对外提供服务,由于副本同步是异步的,就可能因为数据同步时间差,从而使客户端看不到最新写入的消息。 B :方便实现单调读(Monotonic Reads) (1)单调读:对于一个消费者用户而言,在多处消息消息时,他不会看到某条消息一会存在,一会不存在。 (2)如果允许追随者副本提供读服务,由于消息是异步的,则多个追随者副本的状态可能不一致。若客户端每次命中的副本不同,就可能出现一条消息一会看到,一会看不到
23、ISR(In-Sync Replica Set)LEO&HW 机制:
HW(High Watermark)是所有副本中最小的LEO。
比如: 一个分区有3个副本,一个leader,2个follower。producer向leader写了10条消息,follower1从leader处拷贝了5条消息,follower2从leader处拷贝了3条消息,那么leader副本的LEO就是10,HW=3;follower1副本的LEO就是5
HW作用:保证消费数据的一致性和副本数据的一致性 通过HW机制。leader处的HW要等所有follower LEO都越过了才会前移
ISR: 所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)
1、Follower故障:
当follower挂掉之后,会被踢出ISR;
当follower恢复后,会读取本地磁盘记录的HW,然后截掉HW之后的部分,从HW开始从leader继续同步数据,当该follower的LEO大于等于该partition的HW的时候,就是它追上leader的时候,会被重新加入到ISR中
2、Leader故障:
当leader故障之后,会从follower中选出新的leader,为保证多个副本之间的数据一致性,其余的follower会将各自HW之后的部分截掉(新leader如果没有那部分数据 follower就会截掉造成数据丢失),重新从leader开始同步数据,但是只能保证副本之间的数据一致性,并不能保证数据不重复或丢失。
24、Consumer分区分配策略:
自定义分区策略:
你需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
List
//随机
//return ThreadLocalRandom.current().nextInt(partitions.size());
//按消息键保序策略
//return Math.abs(key.hashCode()) % partitions.size();
//指定条件
return partitions.stream().filter(Predicate(指定条件))).map(PartitionInfo::partition).findAny().get();
}
25、kafka中一个不为人知的topic:
consumer_offsets:
老版本的Kafka会把位移信息保存在Zk中 ,但zk不适用于高频的写操作,这令zk集群性能严重下降,在新版本中将位移数据作为一条条普通的Kafka消息,提交至内部主题(_consumer_offsets)中保存,实现高持久性和高频写操作。
位移主题每条消息内容格式:Group ID,主题名,分区号
当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。也可以手动创建 分区数依赖于Broker端的offsets.topic.num.partitions的取值,默认为50 副本数依赖于Broker端的offsets.topic.replication.factor的取值,默认为3
思考:
只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息,就算没有新消息进来 也会通过定时任务重复写相同位移 最终撑爆磁盘?
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据,这个后台线程叫 Log Cleaner,对相同的key只保留最新的一条消息。
26、Consumer Group Rebalance:
术语简介:
Rebalance :就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。
Coordinator:它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。
Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。
如何确定Coordinator位置 :partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount) 比如(abs(627841412 % 50)=12 Coordinator就在 partitionId=12的Leader 副本所在的 Broker)。
Rebalance的危害:
Rebalance 影响 Consumer 端 TPS 这期间不会工作
Rebalance 很慢 Consumer越多 Rebalance时间越长
Rebalance 效率不高 需要所有成员参与
触发 Rebalance场景:
组成员数量发生变化
订阅主题数量发生变化
订阅主题的分区数发生变化
如何避免 Rebalance:
设置 session.timeout.ms = 15s (session连接时间 默认10)
设置 heartbeat.interval.ms = 2s(心跳时间)
max.poll.interval.ms (取决你一批消息处理时长 默认5分钟)
要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
27、Kafka 拦截器:
Kafka 拦截器分为生产者拦截器和消费者拦截器,可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。
例:生产者Interceptor
部分图文资料地址
-- 极客时间 Kafka 核心技术与实战