Flink Kafka 端到端 Exactly

Stella981
• 阅读 1099

摘要: 本文基于 Flink 1.9.0 和 Kafka 2.3 版本,对 Flink kafka 端到端 Exactly-Once 进行分析及 notifyCheckpointComplete 顺序,主要内容分为以下两部分:

1.Flink-kafka 两阶段提交源码分析

  • TwoPhaseCommitSinkFunction 分析

2.Flink 中 notifyCheckpointComplete 方法调用顺序

  • 定义

  • 样例

  • operator 调用 notifyCheckpointComplete

  • 对 Exactly-Once 语义的影响

Tips:Flink 中文社区征稿啦,感兴趣的同学可点击「阅读原文」了解详情~

Flink-kafka 两阶段提交源码分析

FlinkKafkaProducer 实现了 TwoPhaseCommitSinkFunction,也就是两阶段提交。关于两阶段提交的原理,可以参见《An Overview of End-to-End Exactly-Once Processing in Apache Flink》,本文不再赘述两阶段提交的原理,但是会分析 FlinkKafkaProducer 源码中是如何实现两阶段提交的,并保证了在结合 Kafka 的时候做到端到端的 Exactly Once 语义的。

https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

TwoPhaseCommitSinkFunction 分析

public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>


 





 





  


  

 

TwoPhaseCommitSinkFunction 实现了 CheckpointedFunction 和 CheckpointListener 接口,首先就是在 initializeState 方法中开启事务,对于 Flink sink 的两阶段提交,第一阶段就是执行 CheckpointedFunction#snapshotState 当所有 task 的 checkpoint 都完成之后,每个 task 会执行 CheckpointedFunction#notifyCheckpointComplete 也就是所谓的第二阶段。

FlinkKafkaProducer 第一阶段分析

Flink Kafka 端到端 Exactly

@Override


    
    
    
    
      
 
     
     
     
   
     

    
    
    

这部分代码的核心在于:

  1. 先执行 preCommit 方法,EXACTLY_ONCE 模式下会调 flush,立即将数据发送到指定的 topic,这时如果消费这个 topic,需要指定 isolation.level 为 read_committed 表示消费端应用不可以看到未提交的事物内的消息。

    @Override

注意第一次调用的 send 和 flush 的事务都是在 initializeState 方法中开启事务。

transaction.producer.send(record, callback);


 





 






  




transaction.producer.flush();


 





 





  


  

 
  1. pendingCommitTransactions 保存了每个 checkpoint 对应的事务,并为下一次 checkpoint 创建新的 producer 事务,即 currentTransactionHolder = beginTransactionInternal();下一次的 send 和 flush 都会在这个事务中。也就是说第一阶段每一个 checkpoint 都有自己的事务,并保存在 pendingCommitTransactions 中。

FlinkKafkaProducer 第二阶段分析

Flink Kafka 端到端 Exactly

当所有 checkpoint 都完成后,会进入第二阶段的提交。

@Override


 





 





  


  

 

这一阶段会将 pendingCommitTransactions 中的事务全部提交。

@Override


 





 





  


  

 

这时消费端就能看到 read_committed 的数据了,至此整个 producer 的流程全部结束。

Exactly-Once 分析

当输入源和输出都是 kafka 的时候,Flink 之所以能做到端到端的 Exactly-Once 语义,主要是因为第一阶段 FlinkKafkaConsumer 会将消费的 offset 信息通过checkpoint 保存,所有 checkpoint 都成功之后,第二阶段 FlinkKafkaProducer 才会提交事务,结束 producer 的流程。这个过程中很大程度依赖了 kafka producer 事务的机制。

Flink 中 notifyCheckpointComplete 方法调用顺序

定义

notifyCheckpointComplete 方法在 CheckpointListener 接口中定义。

/**


 





 





  


  

 

简单说这个方法的含义就是在 checkpoint 做完之后,JobMaster 会通知 task 执行这个方法,例如在 FlinkKafkaProducer 中 notifyCheckpointComplete 中做了事务的提交。

样例

下面的程序会被分为两个 task,task1 是 Source: Example Source 和 task2 是 Map -> Sink: Example Sink。

DataStream<KafkaEvent> input = env.addSource(

■ operator 调用 notifyCheckpointComplete

根据上面的例子,task1 中只有一个 source 的 operator,但是 task2 中有两个operator,分别是 map 和 sink。

在 StreamTask 中,调用 task 的 notifyCheckpointComplete 方法。

@Override

其中关键的部分就是:

for (StreamOperator<?> operator : operatorChain.getAllOperators()) {


 





 





  


  

 

operator 的调用顺序取决于 allOperators 变量,可以看到源码中的注释,operator 是以逆序存放的。

/**


 

也就是说上面客户端的代码,虽然先调用了 map 后调用的 sink,但是实际执行的时候,确实先调用 sink 的 notifyCheckpointComplete 方法,后调用 map 的。

对 Exactly-Once 语义的影响

上面的例子,是先执行 source 的 notifyCheckpointComplete 方法,再执行 sink 的 notifyCheckpointComplete 方法。但是如果把 .keyBy("word") 去掉,那么只会有一个 task,所有 operator 逆序执行,也就是先调用 sink 的 notifyCheckpointComplete 方法再调用 source 的。

为了方便理解整个流程,下文只考察并发度为1的情况,不考虑部分 subtask 成功部分不成功的情况。

Tips: 以下讨论的都是基于 kafka source 和 sink

■ 先 sink 后 source

sink 成功之后 source 执行之前

sink 成功之前

checkpoint 恢复

exactly-once

__consumer_offsets 恢复

重复消费

sink 成功之后 source 执行之前,表示 sink 的 notifyCheckpointComplete 方法执行成功了,但是在执行 source 的 notifyCheckpointComplete 方法之前任务失败。

sink 成功之前,表示 sink 的 notifyCheckpointComplete 方法执行失败,提交事务失败。

  • 测试用例

测试代码主体架构如下:

DataStream<KafkaEvent> input = env.addSource(


 

测试环境采用的是 Flink 1.9.0 Standalone Cluster 模式,一个 JobManager,一个TaskManager,默认只保存一个 checkpoint。

模拟异常的方法,通过 kill -9 杀掉 JobManager 和 TaskManager 进程。

  1. 在 FlinkKafkaProducer#commit 方法第一行设置断点,当程序走到这个断点的时候 kill -9 杀掉 JobManager 和 TaskManager 进程,模拟 sink 的notifyCheckpointComplete 方法执行失败的场景;

  2. 监控1,通过 bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092 监控 producer 是否 flush 数据;监控2,通过 bin/kafka-console-consumer.sh --topic bar --bootstrap-server 10.1.236.66:9092 --isolation-level read_committed 监控 producer 的事务是否成功提交;监控3,通过 bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 10.1.236.66:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /tmp/Consumer.properties 监控 consumer 的offset 是否提交到 kafka;

  3. 发送数据一条数据 a,5,1572845161023,当走到断点的时候,说明 consumer 的 checkpoint 已经生成,但是还没有将 offset 提交到 kafka,也就是checkpoint 认为 offset 已经成功发送,但是 kafka 认为并没有发送,监控1有数据,监控2和监控3都没有数据。kill -9 杀掉 JobManager 和 TaskManager进程;

  4. 重新启动,并提交作业,不指定 checkpoint 路径。监控1,2,3,都有数据,所以这种情况,监控2,只收到了一次数据,也就是 exactly-once。这时候监控3收到的数据为:partition0 的 offset=37,partition1 的 offset=43,partition2 的 offset=39;

  5. 同样1-3步骤,发送数据一条数据 b,6,1572845161023,第4步,启动作业的时候通过-s指定要恢复的 checkpoint 路径,启动后监控1,2都没有数据,但是监控3的数据为:partition0 的 offset=37,partition1 的 offset=43,partition2 的 offset=40,再查看 task 的日志 FlinkKafkaConsumerBase - Consumer subtask 0 restored state: {KafkaTopicPartition{topic='foo', partition=0}=36, KafkaTopicPartition{topic='foo', partition=1}=42, KafkaTopicPartition{topic='foo', partition=2}=39}.,说明 checkpoint 认为上一次 partition2 的 offset=39 已经成功消费,所以恢复之后向 kafka 发送的offset 为 40。这样就导致了 partition2 的 offset=39 这条数据丢失。

同样的方法可以测试 sink 成功之后 source 执行之前的场景,只是这时候需要将断点设置在 TwoPhaseCommitSinkFunction#notifyCheckpointComplete 方法的最后一行,这样就会发现故障之前,监控1,2都是有数据的,监控3没有数据。不指定 checkpoint 路径恢复,监控1,2都会收到数据,这样就导致了重复消费。如果指定 checkpoint 路径消费,那么监控1,2就不会收到数据,保证了 exactly-once。

  • 原因分析

产生上面情况的原因主要就是因为 checkpoint 存储的 offset 和 kafka 中的 offset 不一致导致的。

■ 先 source 后 sink

需要说明的一点这个场景的两个 task 实际是并行的,并没有绝对的先后关系,只是会有这种前后关系的可能。

source 成功之后 sink 执行之前

source 成功之前

checkpoint 恢复

丢数据

__consumer_offsets 恢复

丢数据

source 成功之后 sink 执行之前,表示 source 的 notifyCheckpointComplete 方法执行成功了,但是在执行 sink 的 notifyCheckpointComplete 方法之前任务失败。

source 成功之前,表示 source 的 notifyCheckpointComplete 方法执行失败,提交事务失败。

  • 测试用例

模拟 source 成功之后 sink 执行之前:

  1. 需要在上面的用例中加入 keyby 算子,确保生成两个 task,监控3收到数据的时候说明 consumer 的 notifyCheckpointComplete 方法已经执行完。在FlinkKafkaProducer#commit 方法第一行设置断点,当程序走到这个断点并且监控3收到数据的时候,kill -9 杀掉 JobManager 和 TaskManager 进程,模拟 sink 执行 notifyCheckpointComplete 方法失败的场景;

  2. 这时候重启作业,checkpoint 和 kafka 中 offset 已经是一致的了,无论是从checkpoint 还是 kafka,都是一样的。所以 source 认为已经成功消费了,不会再读上次的 offset,都会导致数据丢失。

source 成功之前:

对于在 source 之前程序就挂掉,相当于所有的 operator 都没有执行notifyCheckpointComplete 方法,但是 source 的 checkpoint 已经做过了,只是没有将 offset 发送到 kafka,这样只有从 __consumer_offsets 恢复才能保证不丢数据。

 

小结:本节通过一种极端的测试场景希望让读者可以更深入的理解 Flink 中的  Exactly-Once 语义。在程序挂了以后需要排查是什么原因和什么阶段导致的,才能通过合适的方式恢复作业。在实际的生产环境中,会有重试或者更多的方式保证高可用,也建议保留多个 checkpoint,以便业务上可以恢复正确的数据。

作者介绍:

吴鹏 ,亚信科技资深工程师,Apache Flink Contributor。先后就职于中兴,IBM,华为。目前在亚信科技负责实时流处理引擎产品的研发。

▼ 更多技术文章 ▼

Flink 生态:一个案例快速上手 PyFlink

Demo: 基于 Flink SQL 构建流式应用

Flink 1.10 Native Kubernetes 原理与实践

从开发到生产上线,如何确定集群大小?

在 Flink 算子中使用多线程如何保证不丢数据?

Flink 1.10 和 Hive 3.0 性能对比(附 Demo 演示 PPT)

Flink on Zeppelin (3) - Streaming 篇

Flink on Zeppelin (2) - Batch 篇

Flink on Zeppelin (1) - 入门篇


关注 Flink 中文社区,获取更多技术干货

Flink Kafka 端到端 Exactly

你也「 在看 」吗? 👇

本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这