基于图遍历的Flink任务画布模式下零代码开发实现方案

京东云开发者
• 阅读 54

作者:京东物流 吴云涛

前言

提交一个DataSteam 的 Flink应用,需要经过 StreamGraph、JobGraph、ExecutionGraph 三个阶段的转换生成可成执行的有向无环图(DAG),并在 Flink 集群上运行。而提交一个 Flink SQL 应用,其执行流程也类似,只是多了一步使用 flink-table-planer 模块从SQL转换成 StreamGraph 的过程。以下是利用Flink的 StreamGraph 通过低代码的方式,来实现StreamGraph的生成,并最终实现 Flink 程序零代码开发的解决方案。

一、Flink 相关概念

在Flink程序中,每个算子被称作Operator,通过各个算子的处理最终得到期望的加工后数据。比如下面这段程序中,增加了Source, Fiter, Map, Sink 4个算子。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataStream = env.addSource(new FlinkKafkaConsumer("topic"));

DataStream filteredStream = dataStream.filter(new FilterFunction() {
    @Override
    public boolean filter(Object value) throws Exception {return true;}
});

DataStream mapedStream = filteredStream.map(new MapFunction() {
    @Override
    public Object map(Object value) throws Exception {return value;}
});

mapedStream.addSink(new DiscardingSink());
env.execute("test-job");

StreamGraph

Flink的逻辑执行图,描述了整个流处理任务的流程和数据流转递规则,包括了数据源(Source)、转换算子(Transform)、数据目的端(Sink)等元素,以及它们之间的依赖关系和传输规则。StreamGraph是通过Flink的API或者DSL来构建的向无环图(DAG),它与JobGraph之间是一一对应的关系。StreamGraph中的顶点称为streamNode,是用来表示Operator算子的类,包含了算子uid、并行度,是否共享slot(SlotSharingGroup)等信息。边称作streamEdge。通过StreamingJobGraphGenerator类生成JobGraph。\

基于图遍历的Flink任务画布模式下零代码开发实现方案

JobGraph

StreamGraph 经过 flink-optimizer 模块优化后生成 JobGraph。生成 JobGraph 时,会将多个满足条件的算子chain 链接到一起作为一个顶点(JobVertex), 在运行时对应1个 Task。Task 是 Flink 程序的基本执行单元,任务调度时将Task分配到TaskManager上执行。
基于图遍历的Flink任务画布模式下零代码开发实现方案

ExecutionGraph

物理执行图是由JobGraph转换而来,描述了整个流处理任务的物理执行细节,包括了任务的调度、任务的执行顺序、任务之间的数据传输、任务的状态管理等。Task会在步骤中拆分为多个SubTask。对应Task中的每个并行度。
基于图遍历的Flink任务画布模式下零代码开发实现方案

Physical Graph

PhysicalGraph是在执行时的ExecutionGraph。ExecutionGraph中的每一个顶点ExecutionJobVertex都对应一个或多个顶点ExecutionVertex,它们是物理执行图中的节点。

二、画布模式实现思路

实现流程

首先,我们采用画布模式(拖拉拽方式)来实现Flink程序的组装,将极大程度上方便我们复用部分加工的算子,最终实现零代码的Flink应用开发。我们通过绘图的方式,直接将内置的算子绘制在图标上。如下所示:
基于图遍历的Flink任务画布模式下零代码开发实现方案

  1. 构建有向无环图(DAG),并持久化。通过拖拉拽的方式(画布模式)构建你的Flink应用,后端的持久化存储采用邻接表方式。我们在 mysql 关系数据库中将 Node(算子:Source、Sink、中间加工逻辑算子)存储到 flink_node 表中;将边存到一张 flink_realation 表中。
  2. 重新组将Flink作业
    要组装以上画布模式的Flink应用,首先需要初始化好 StreamExecutionEnvironment 相关参数,其次将上述表中的 flink_node 和flink_edge 转化为DataStream,并将转化出的 DataStream 合理地拼接成一个 DataStream API Flink 应用程序。
    在将flink_node、flink_edge转为为DataStream时选择何种遍历算法来组装呢?我们知道有向无环图的遍历最常用的有:深度优先遍历(DFS)和广度优先遍历(BFS)。这里我们采用了BFS算法+层序遍历的方式,BFS便于在组装的过程中将已visit到的node节点拼装到其parent 的节点上。

总结

在实际的实现过程中,遇到的问题往往比以上复杂很多。比如需要将更多的信息存储在node节点和edge边上。node上需要存储并行度、算子处理前后的表schema等;edge需要存储keyby的字段、上下游之间的数据shuffle的方式等等。此外在内置的算子无法满足用户需求时,还需要考虑如何友好的支持自定义算子(UDF)的嵌入等问题。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
C语言数据结构之图的基本操作
本博文是是博主在学习数据结构图的这一章知识时做的一些总结,代码运行环境:visualstudio2017纯C语言,当然掌握了方法,你也可以试着用其它的语言来实现同样的功能。下面的程序主要实现了对有向图,有向网,无向图,无向网,无向图的深度优先遍历,广度优先遍历,有向无环图的拓扑排序功能等。主要代码实现如下:1pragmao
Stella981 Stella981
3年前
Apache Flink结合Apache Kafka实现端到端的一致性语义
本次分享来自阿里巴巴的工程师在ApacheKafkaxApacheFlink·北京会议上的分享,关于Apache Flink结合ApacheKafka实现端到端的一致性语义的原理。2017年12月Apache Flink社区发布了1.4版本。该版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSin
Stella981 Stella981
3年前
Flink整合Oozie Shell Action 提交任务带Kerberos认证
最近这段时间一直在忙新集群迁移,上了最新的cdh6.3.0于是Flink提交遇到了许多的问题,还好有clouderaLicense有了原厂的帮助和社区的伙伴,问题解决起来快了不少。集群具体情况是CDH6.3.0Flink1.8.1,整个数据平台全部组件都上了kerberos和ldap因为要过认证,所以任务提交方法我们选择统一Oozie提交任务
Stella981 Stella981
3年前
Flink架构,源码及debug
序    工作中用Flink做批量和流式处理有段时间了,感觉只看Flink文档是对FlinkProgramRuntime的细节描述不是很多,程序员还是看代码最简单和有效。所以想写点东西,记录一下,如果能对别人有所帮助,善莫大焉。    说一下我的工作,在一个项目里我们在FlinkSQL基础上构建了一个SQLEngine,
Stella981 Stella981
3年前
Apache Flink 结合 Kafka 构建端到端的 Exactly
文章目录:1.ApacheFlink应用程序中的ExactlyOnce语义2.Flink应用程序端到端的ExactlyOnce语义3.示例Flink应用程序启动预提交阶段4.在Flink中实现两阶段提交Operator5.总结ApacheFlink(https://www.os
Stella981 Stella981
3年前
Flink使用RestApi
flink是一个非常好用的流任务计算框架,这次我们来试用flink的restApi来提交任务.主要阐述几个常用的restapi,包括上传jar包,查询jar包,提交任务,查询任务,删除任务等,其它的比如删除jar包,查询jobmanager,查询taskmanager等等,类推就可以得出了,不在这里进行重复介绍了1,上传
京东云开发者 京东云开发者
11个月前
Flink State 状态原理解析 | 京东物流技术团队
一、FlinkState概念State用于记录Flink应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的Flink应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如Join、窗口聚合场景。Flink应用运行中会保存状态信息到