Kafka 原理详解
1 kakfa基础概念说明
Broker:消息服务器,就是我们部署的一个kafka服务
Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存储
Replica:消息的副本,即备份消息,存储在其他的broker上,当leader挂掉之后,可以从存有副本的broker中选举leader,实现了高可用
Topic:一个消息投递目标的名称,这个目标可以理解为一个消息归类或者递目标的名称。对于每一个Topic,kafka会为其维护一个如下图所示的分区的日志文件
log:每个partition分区是一个有序的,不可修改的,消息组成的对了,这些消息是不断的appended到这个commit log上的。这些partitions之中的每个消息都会被赋予一个叫做offset的顺序id编号,用来在partition中唯一性的标识这个消息。
kafka集群会保存一个时间段内所有被发布出来的信息,无论这个消息是否已经被消费过,这个时间段可以进行配置。
kafka的性能与数据流不相干,所以保存大量的消息数据不会造成性能问题
kafka关注的每个消费者的元数据信息也只有消费者的offset。这个offset由消费者控制,通常情况下当消费者读取信息时,这个数值是线性递增的,实际上消费者可以控制这个值,以获取较早实际的信息。
对log进行分区的目的是:
- 这可以让log的伸缩能力超过单台服务器上限,每个独立的partition的大小受单台服务器的性能限制,但一个topic可以有很多partition,从而它可以处理任意大小的数据
- 在并行处理这方面可以作为一个独立的单元
分布式:log的partition被分布到kafka集群中,每个服务器负责处理彼此共享的partition的一部分数据和请求,每个partition被赋值成指定的份数散布在机器之中提供故障转移能力,对于每个partition都有一个服务器作为它的leader,其他服务器则作为followers,leader负责处理关于这个partition所有的读写请求,followers则被动的复制leader。每个服务器作为某些partition的leader的同时也作为其他服务器的followers,从而实现集群的负载均衡。
Producer:生产者,将数据发布到指定的topic的partition上,这个选择策略可以配置
Consumer:消费者,kafka提供了一个consumer group的模式,一个组的所有消费者视为同一个抽象的消费者,每个消费者都有自己的消费组名称标示
消息通信通常有两种模式:
队列模式,一组消费者可能从一个服务器读取消息,每个消息被发送给了其中一个消费者,在kafka中,如果所有的消费者都处于同一个组,则这个结构就是队列模式
订阅模式,消息被广播给了所有的消费者,在kafka中,如果所有消费者都处于不同的组,则这个结构就是订阅模式
Guarantees:生产者发送Topic某个partition的消息都被有序的追加到之前发送的消息之后,对于特定的消费者,它观察到的消息的顺序与消息保存到log中的顺序一致,对于一个复制N份的topic,系统能保证在N-1台服务器失效的情况下不丢失任何已提交到log中的信息
kafka提供的消息顺序保证机制:
传统的消息队列在服务器上有序的保存消息,当有多个消费者的时候消息也是按序发送消息。但是因为消息投递到消费者的过程是异步的,所以消息到达消费者的顺序可能是乱序的。这就意味着在并行计算的场景下,消息的有序性已经丧失了。消息系统通常采用一个“排他消费者”的概念来规避这个问题,但这样就意味着失去了并行处理的能力。Kafka 在这一点上做的更优秀。Kafka 有一个 Topic 中按照 partition 并行的概念,这使它即可以提供消息的有序性担保,又可以提供消费者之间的负载均衡。这是通过将 Topic 中的partition 绑定到消费者组中的具体消费者实现的。通过这种方案我们可以保证消费者是某个partition 唯一消费者,从而完成消息的有序消费。因为 Topic有多个 partition所以在消费者实例之间还是负载均衡的。注意,虽然有以上方案,但是如果想担保消息的有序性那么我们就不能为一个partition 注册多个消费者了。
2 Kafka设计思想
2.1 持久化
kafka的消息是存储在硬盘上的,因为“磁盘慢”这个普遍性的认知,常常使人们怀疑一个这样的持久化结构是否能提供所需的性能。但实际上磁盘因为使用的方式不同,它可能比人们预想的慢很多也可能比人们预想的快很多;而且一个合理设计的磁盘文件结构常常可以使磁盘运行得和网络一样快。
kafka的设计是将所有的数据被直接写入文件系统上一个可暂不执行磁盘 flush 操作的持久化日志文件中。实际上这意味着这些数据是被传送到了内核的页缓存上。
我们在上一节讨论了磁盘性能。 一旦消除了磁盘访问模式不佳的情况,该类系统性能低下的主要原因就剩下了两个:
大量的小型 I/O 操作(小包问题),以及过多的字节拷贝(ZeroCopy)
小型的 I/O 操作发生在客户端和服务端之间以及服务端自身的持久化操作中。
为了避免这种情况,我们的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。服务器一次性的将多个消息快依次追加到日志文件中, Consumer 也是每次获取多个大型有序的消息块。这个简单的优化对速度有着数量级的提升。批处理允许更大的网络数据包,更大的顺序读写磁盘操作,连续的内存块等等,所有这些都使 KafKa 能将随机性突发性的消息写操作变成顺序性的写操作最终流向消费者。
另一个低效率的操作是字节拷贝,在消息量少时,这不是什么问题。但是在高负载的情况下,影响就不容忽视。为了避免这种情况,我们让 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。
broker 维护的消息日志本身就是一个文件目录,每个文件都由一系列以相同格式写入到磁盘的消息集合组成,这种写入格式被 producer 和 consumer 共用。保持这种通用格式可以对一些很重要的操作进行优化:持久化日志块的网络传输。 现代的 unix 操作系统提供了高度优化的数据路径,用于将数据从 pagecache 转移到 socket 网络连接中;在 Linux 中系统调用sendfile 做到这一点。
为了理解 sendfile 的意义,首先要了解数据从文件到套接字的一般数据传输路径:
- 操作系统从磁盘读取数据到内核空间的 pagecache
- 应用程序读取内核空间的数据到用户空间的缓冲区
- 应用程序将数据(用户空间的缓冲区)写回内核空间的套接字缓冲区(内核空间)
- 操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区
这显然是低效的,有四次 copy 操作和两次系统调用。使用 sendfile 方法,可以允许操作系统将数据从 pagecache 直接发送到网络,这样避免重复数据复制。所以这种优化方式,只需要最后一步的 copy 操作,将数据复制到 NIC 缓冲区。我们预期的使用场景是一个 topic 被多个消费者消费。使用 zero-copy (零拷贝)优化,数据仅仅会被复制到 pagecache 一次,在后续的消费过程中都可以复用,而不是保存在内存中在每次消费时再复制到内核空间。这使得消息能够以接近网络连接的速度被消费。
pagecache 和 sendfile 的组合使用意味着,在一个 Kafka 集群中,大多数的(紧跟生产者的)consumer 消费时,将看不到磁盘上的读取活动,因为数据完全由缓存提供。
2.2 端到端的批量压缩
Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 consumer 消费时解压缩。
Kafka 支持 GZIP,Snappy 和 LZ4 压缩协议。
2.3 The Producer
生产者直接发送数据到主分区的服务器上,不需要经过任何中间路由。
客户端控制消息发送数据到哪个分区,这个可以实现随机的负载均衡方式,或者使用一些特定语义的分区函数。我们有提供特定分区的接口让用于根据指定的键值进行 hash 分区(当然也有选项可以重写分区函数),例如,如果使用用户 ID 作为 key,则用户相关的所有数据都会被分发到同一个分区上。这允许消费者在消费数据时做一些特定的本地化处理。这样的分区风格经常被设计用于一些对本地处理比较敏感的消费者。
批处理是提升性能的一个主要驱动,为了允许批量处理,kafka 生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。 批处理,不仅仅可以配置指定的消息数量,也可以指定等待特定的延迟时间(如 64k 或 10ms),这允许汇总更多的数据后再发送,在服务器端也会减少更多的 IO 操作。 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量。
2.4 The Consumer
Kafka consumer 通过向 broker 发出一个“fetch”请求来获取它想要消费的 partition。consumer 的每个请求都在 log 中指定了对应的 offset,并接收从该位置开始的一大块数据。并且可以在需要的时候通过回退到该位置再次消费对应的数据。
持续追踪已经被消费的内容是消息系统的关键性能点之一。
Kafka 使用完全不同的方式解决消息丢失问题。Kafka 的 topic 被分割成了一组完全有序的partition,其中每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的consumer 组中的一个 consumer 消费。这意味着 partition 中 每一个consumer 的位置仅仅是一个数字,即下一条要消费的消息的 offset。这使得被消费的消息的状态信息相当少,每partition只需要一个数字。这个状态信息还可以作为周期性的 checkpoint。这以非常低的代价实现了和消息确认机制等同的效果。
这种方式还有一个附加的好处。consumer 可以回退到之前的 offset 来再次消费之前的数据,这个操作违反了队列的基本原则,但事实证明对大多数 consumer 来说这是一个必不可少的特性。 例如,如果 consumer 的代码有 bug,并且在 bug 被发现前已经有一部分数据被消费了,那么 consumer 可以在 bug 修复后通过回退到之前的 offset 来再次消费这些数据。
离线数据加载
可伸缩的持久化特性允许 consumer 只进行周期性的消费,例如批量数据加载,周期性将数据加载到诸如 Hadoop 和关系型数据库之类的离线系统中。
在 Hadoop 的应用场景中,我们通过将数据加载分配到多个独立的 map 任务来实现并行化,每一个 map 任务负责一个 node/topic/partition,从而达到充分并行化。Hadoop 提供了任务管理机制,失败的任务可以重新启动而不会有重复数据的风险,只需要简单的从原来的位置重启即可。
2.5 消息交付语义
Kafka 可以提供的消息交付语义保证有:
At most once - 消息可能会丢失但绝不重传
At least once - 消息可以重传但绝不丢失
Exactly once - 这可能是用户真正想要的,每条消息只被传递一次
2.6 Replication
Kafka 允许 topic 的 partition 拥有若干副本,你可以在 server 端配置 partition 的副本数量。
当集群中的节点出现故障时,能自动进行故障转移,保证数据的可用性。
创建副本的单位是 topic 的 partition ,正常情况下,每个分区都有一个 leader 和零或多个followers 。总的副本数是包含 leader 的总和。所有的读写操作都由 leader 处理,一般partition 的数量都比 broker 的数量多的多,各分区的 leader 均匀的分布在 brokers 中。所有的 followers 节点都同步 leader 节点的日志,日志中的消息和偏移量都和 leader 中的一致。(当然,在任何给定时间,leader 节点的日志末尾时可能有几个消息尚未被备份完成)。
Followers 节点就像普通的 consumer 那样从 leader 节点那里拉取消息并保存在自己的日志文件中。Followers 节点可以从 leader 节点那里批量拉取消息日志到自己的日志文件中。
与大多数分布式系统一样,自动处理故障需要精确定义节点 “alive” 的概念。Kafka 判断节点是否存活有两种方式:
- 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接。
- 如果节点是个 follower ,它必须能及时的同步 leader 的写操作,并且延时不能太久。
我们认为满足这两个条件的节点处于 “in sync” 状态,区别于 “alive” 和 “failed” 。Leader 会追踪所有 “in sync” 的节点。如果有节点挂掉了,或是写超时,或是心跳超时,leader 就会把它从同步副本列表中移除。 同步超时和写超时的时间由 replica.lag.time.max.ms 配置确定。
分布式系统中,我们只尝试处理 “fail/recover” 模式的故障,即节点突然停止工作,然后又恢复(节点可能不知道自己曾经挂掉)的状况。Kafka 没有处理所谓的 “Byzantine” 故障,即一个节点出现了随意响应和恶意响应(可能由于 bug 或 非法操作导致)。
当所有的分区上 in sync repicas 都应用到 log 上时,消息可以认为是 “committed”,只有committed 消息才会给 consumer。这意味着 consumer 不需要担心潜在因为 leader 失败而丢失消息。而对于 producer 来说,可以依据 latency 和 durability 来权衡选择是否等待消息被committed ,这个行动由 producer 使用的 acks 设置来决定。
在所有时间里,Kafka 保证只要有至少一个同步中的节点存活,提交的消息就不会丢失。
节点挂掉后,经过短暂的故障转移后,Kafka 将仍然保持可用性,但在网络分区( networkpartitions )的情况下可能不能保持可用性。
2.7 可靠性和持久性的保证
向 Kafka 写数据时,producers 设置 ack 是否提交完成,
0:不等待 broker 返回确认消息,
1: leader 保存成功返回或,
-1(all): 所有备份都保存成功返回。
请注意。设置 “ack = all” 并不能保证所有的副本都写入了消息。默认情况下,当 acks = all 时,只要 ISR 副本同步完成,就会返回消息已经写入。例如,一个 topic 仅仅设置了两个副本,那么只有一个 ISR 副本,那么当设置 acks = all 时返回写入成功时,剩下了的那个副本数据也可能数据没有写入。尽管这确保了分区的最大可用性,但是对于偏好数据持久性而不是可用性的一些用户,可能不想用这种策略,因此,我们提供了两个 topic 配置,可用于优先配置消息数据持久性:
- 禁用 unclean leader 选举机制 - 如果所有的备份节点都挂了,分区数据就会不可用,直到最近的 leader 恢复正常。这种策略优先于数据丢失的风险,参看上一节的 uncleanleader 选举机制。
- 指定最小的 ISR 集合大小,只有当 ISR 的大小大于最小值,分区才能接受写入操作,以防止仅写入单个备份的消息丢失造成消息不可用的情况,这个设置只有在生产者使acks = all 的情况下才会生效,这至少保证消息被 ISR 副本写入。此设置是一致性和可用性 之间的折衷,对于设置更大的最小 ISR 大小保证了更好的一致性,因为它保证将消息被写入了更多的备份,减少了消息丢失的可能性。但是,这会降低可用性,因为如果ISR 副本的数量低于最小阈值,那么分区将无法写入。
2.8 备份管理
上面关于 replicated logs 的讨论仅仅局限于单一 log ,比如一个 topic 分区。但是 Kafka 集群需要管理成百上千个这样的分区。我们尝试轮流的方式来在集群中平衡分区来避免在小节点上处理大容量的 topic。
同样关于 leadership 选举的过程也同样的重要,这段时间可能是无法服务的间隔。一个原始的 leader 选举实现是当一个节点失败时会在所有的分区节点中选主。相反,我们选用 broker之一作为 “controller”, 这个 controller 检测 broker 失败,并且为所有受到影响的分区改变leader。这个结果是我们能够将许多需要变更 leadership 的通知整合到一起,让选举过程变得更加容易和快速。如果 controller 失败了,存活的 broker 之一会变成新的 controller。
2.9 日志压缩
日志压缩可确保 Kafka 始终至少为单个 topic partition 的数据日志中的每个 message key 保留最新的已知值。这样的设计解决了应用程序崩溃、系统故障后恢复或者应用在运行维护过程中重启后重新加载缓存的场景。接下来让我们深入讨论这些在使用过程中的更多细节,阐述在这个过程中它是如何进行日志压缩的。
迄今为止,我们只介绍了简单的日志保留方法(当旧的数据保留时间超过指定时间、日志文件大小达到设置大小后就丢弃)。这样的策略非常适用于处理那些暂存的数据,例如记录每条消息之间相互独立的日志。然而在实际使用过程中还有一种非常重要的场景 – 根据 key 进行数据变更(例如更改数据库表内容),使用以上的方式显然不行。
让我们来讨论一个关于处理这样流式数据的具体的例子。假设我们有一个 topic,里面的内容包含用户的 email 地址;每次用户更新他们的 email 地址时,我们发送一条消息到这个topic,这里使用用户 Id 作为消息的 key 值。现在,我们在一段时间内为 id 为 123 的用户发送一些消息,每个消息对应 email 地址的改变。
日志压缩为我们提供了更精细的保留机制,所以我们至少保留每个 key 的最后一次更新(例如:bill@gmail.com)。这样我们保证日志包含每一个 key 的最终值而不只是最近变更的完整快照。这意味着下游的消费者可以获得最终的状态而无需拿到所有的变化的消息信息。
- 数据库更改订阅。通常需要在多个数据系统设置拥有一个数据集,这些系统中通常有一个是某种类型的数据库(无论是 RDBMS 或者新流行的 key-value 数据库)。 例如,你可能有一个数据库,缓存,搜索引擎集群或者 Hadoop 集群。每次变更数据库,也同时需要变更缓存、搜索引擎以及 hadoop 集群。 在只需处理最新日志的实时更新的情况下,你只需要最近的日志。但是,如果你希望能够重新加载缓存或恢复搜索失败的节点,你可能需要一个完整的数据集。
- 事件源。 这是一种应用程序设计风格,它将查询处理与应用程序设计相结合,并使用变更的日志作为应用程序的主要存储。
- 日志高可用。 执行本地计算的进程可以通过注销对其本地状态所做的更改来实现容错,以便另一个进程可以重新加载这些更改并在出现故障时继续进行。 一个具体的例子就是在流查询系统中进行计数,聚合和其他类似“group by”的操作。实时流处理框架Samza,使用这个特性 正是出于这一原因。
日志压缩机制是更细粒度的、每个记录都保留的机制,而不是基于时间的粗粒度。这个理念是选择性删除那些有更新的变更的记录的日志。这样最终日志至少包含每个 key 的记录的最后一个状态。
这种保留策略可以针对每一个 topci 进行设置,遮掩一个集群中,可以让部分 topic 通过时间和大小保留日志,另一些可以通过压缩策略保留。
日志压缩基础
这是一个高级别的日志逻辑图,展示了 kafka 日志的每条消息的 offset 逻辑结构。
Log head 中包含传统的 Kafka 日志,它包含了连续的 offset 和所有的消息。日志压缩增加了处理 tail Log 的选项。上图展示了日志压缩的的 Log tail 的情况。tail 中的消息保存了初次写入时的 offset。 即使该 offset 的消息被压缩,所有 offset 仍然在日志中是有效的。在这个场景中,无法区分和下一个出现的更高 offset 的位置。如上面的例子中,36、37、38 是属于相同位置的,从他们开始读取日志都将从 38 开始。
压缩也允许删除。通过消息的 key 和空负载(null payload)来标识该消息可从日志中删除。这个删除标记将会引起所有之前拥有相同 key 的消息被移除(包括拥有 key 相同的新消息)。但是删除标记比较特殊,它将在一定周期后被从日志中删除来释放空间。这个时间点被称为“delete retention point”,如上图。
压缩操作通过后台周期性的拷贝日志段来完成。清除操作不会阻塞读取,并且可以被配置不超过一定 IO 吞吐来避免影响 Producer 和 Consumer。实际的日志段压缩过程有点像这样:
日志压缩的保障措施:
- 任何滞留在日志 head 中的所有消费者能看到写入的所有消息;这些消息都是有序的offset。 topic 使用 min.compaction.lag.ms 来保障消息写入之前必须经过的最小时间长度,才能被压缩。 这限制了一条消息在 Log Head 中的最短存在时间。
- 消息始终保持有序。压缩永远不会重新排序消息,只是删除了一些。
- 消息的 Offset 永远不会变更。这是消息在日志中的永久标志。
- 任何从头开始处理日志的 Consumer 至少会拿到每个 key 的最终状态。另外,只要Consumer 在小于 Topic 的 delete.retention.ms 设置(默认 24 小时)的时间段内到达Log head,将会看到所有删除记录的所有删除标记。换句话说,因为移除删除标记和读取是同时发生的,Consumer 可能会因为落后超过 delete.retention.ms 而导致错过删除
标记。
Log压缩的细节:
日志压缩由 log cleaner 执行,log cleaner 是一个后台线程池,它会 recopy 日志段文件,移除那些 key 存在于 Log Head 中的记录。每个压缩线程工作的步骤如下:
- 选择 log head 与 log tail 比率最高的日志
- 在 head log 中为每个 key 最后 offset 创建一个简单概要
- 从日志的开始到结束,删除那些在日志中最新出现的 key 的旧值。新的、干净的日志会被立即提交到日志中,所以只需要一个额外的日志段空间(不是日志的完整副本)
- 日志 head 的概念本质上是一个空间密集的 hash 表,每个条目使用 24 个字节。所以如果有 8G 的整理缓冲区,则能迭代处理大约 336G 的 log head (假设消息大小为 1k)
配置Log Cleaner
log.cleaner.enable=true
这会启动清理线程池。如果要开启特定 topic 的清理功能,需要开启特定的 log-specific 属性
log.cleanup.policy=compact
这个可以通过创建 topic 时配置或者之后使用 topic 命令实现。
2.10 配额
producers 和 consumer 可能会产生和消费大量的消息从而导致独占 broker 资源,进而引起网络饱和,对其他 client 和 broker 造成 DOS 攻击。资源的配额保护可以有效的防止这些问题,大型的多租户集群中,因为一小部分表现不佳的客户端降低了良好的用户体验,这种情况下非常需要资源的配额保护。实际情况中,当把 Kafka 当做一种服务提供时,可以根据客户端和服务端的契约对 API 调用做限制。
默认情况下,每个唯一的客户端分组在集群上配置一个固定的限额,这个限额是基于每台服务器的 (quota.producer.default, quota.consumer.default),每个客户端能发布或获取每台服务器都的最大速率,我们按服务器 (broker) 定义配置,而不是按整个集群定义,是因为如果是集群范围需要额外的机制来共享配额的使用情况,这会导致配额机制的实现比较难。
覆盖 client-ids 默认的配额是可行的。这个机制类似于每一个 topic 日志的配置覆盖。client-id覆盖会被写到 ZooKeeper,这个覆盖会被所有的 broker 读取并且迅速加载生效。这样使得我们可以不需要重启集群中的机器而快速的改变配额。
本文同步分享在 博客“yingziisme”(CSDN)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。