Apache Flink 框架浅析

Stella981
• 阅读 677

集团关于Blink的相关使用文档已经十分齐全,这里不准备再过多赘述。这篇文章准备对Blink所基于的Apache社区开源产品--Flink的架构做一些浅显分析。

一:Flink历史、基本架构及分布式部署

历史

Flink项目最早开始于2010年由柏林技术大学、柏林洪堡大学、哈索普拉特纳研究所共同合作研发的"Stratosphere: Information Management on the Cloud"(平流层:云上的信息管理) 项目,Flink最开始是作为该项目一个分布式执行引擎的Fork,到2014年成为Apache基金会下的一个项目,2014年底成为Apache顶级项目。每年一次的Flink Forward是关于Apache Flink最盛大的年度会议。

基本架构

Flink是原生的流处理系统,提供high level的API。Flink也提供 API来像Spark一样进行批处理,但两者处理的基础是完全不同的。Flink把批处理当作流处理中的一种特殊情况。在Flink中,所有的数据都看作流,是一种很好的抽象,因为这更接近于现实世界。

Apache Flink 框架浅析

Flink的基本架构图

Flink 的主要架构与Spark接近,都基于Master-Slave 的主从模式,从执行顺序上讲:

1:集群启动,启动JobManager 和多个TaskManager;

2:Flink Program程序提交代码,经由优化器/任务图生成器,生成实际需执行的Job,传递至Client;

3:Client将submit提交任务(本质上是发送包含了任务信息的数据流)至JobManager;

4:JobManager分发任务到各个真正执行计算任务的Worker----TaskManager;

5:TaskManager开始执行计算任务,并且定时汇报心跳信息和统计信息给JobManager,TaskManager之间则以流的形式进行数据传输;

在以上步骤中,步骤2与Flink集群之间可以不存在归属关系,即我们可以在任何机器上提交作业,只要它与JobManager相通。Job提交之后,Client甚至可以直接结束进程,都不会影响任务在分布式集群的执行。

Client:

当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。Client会将用户提交的Flink程序组装一个JobGraph, 并且是以JobGraph的形式提交的。一个JobGraph是一个Flink Dataflow,它由多个JobVertex组成的DAG。所以,一个JobGraph包含了一个Flink程序的如下信息:JobID、Job名称、配置信息、一组JobVertex(实际的任务operators)等。

JobManager:

JobManager是Flink系统的协调者,它负责接收Flink Job,调度组成Job的多个Task的执行。同时,JobManager还负责收集Job的状态信息,并管理Flink集群中从节点TaskManager。主要包括:

RegisterTaskManager——在Flink集群启动的时候,TaskManager会向JobManager注册,如果注册成功,则JobManager会向TaskManager回复消息AcknowledgeRegistration;

SubmitJob——Flink程序内部通过Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息;

CancelJob——请求取消一个Flink Job的执行,CancelJob消息中包含了Job的ID,如果成功则返回消息CancellationSuccess,失败则返回消息CancellationFailure;

UpdateTaskExecutionState——TaskManager会向JobManager请求更新ExecutionGraph中的ExecutionVertex的状态信息,即向JobManager汇报operator具体的执行状态,更新成功则返回true;

其他还包括RequestNextInputSplit、JobStatusChanged;

TaskManager:

TaskManager也是一个Actor(掌管者),它是实际负责执行计算的Worker,在其上执行Flink Job的一组Task。它在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。TaskManager从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游(任务上存在依赖关系的上游处理节点)建立 Netty 连接,接收数据并处理。每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。

TaskManager端可以分成两个阶段:

注册阶段——TaskManager会向JobManager注册,发送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以进行初始化过程;

可操作阶段——该阶段TaskManager可以接收并处理与Task有关的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager无法连接到JobManager,这是TaskManager就失去了与JobManager的联系,会自动进入“注册阶段”,只有完成注册才能继续处理Task相关的消息。

基于Yarn层面的结构

Apache Flink 框架浅析

1: Clinet 客户端上传包含Flink和HDFS配置的jars至HDFS,因为YARN客户端需要访问Hadoop的配置以连接YARN资源管理器和HDFS;2: Clinet客户端请求一个YARN容器作为资源管理器-Resource Manager,作用是启动ApplicationMaster;

3: RM分配第一个container去运行AM--AppplicationMaster;

4: AM启动,开始负责资源的监督和管理;

5: Job Manager和AM运行在同一个容器里,都成功启动后,AM知道job管理器(它拥有的主机)的地址;

6: Job Manager为Task Manager生成一个新的Flink配置, 这样task可连接Job Manager;

7: AM容器可以作为Flink的web接口服务,YARN代码的所有端口是分配的临时端口, 这可让用户并行执行多个yarn会话;

8: AM启动分配到的容器,这些容器作为Flink的Task Manager,将会从HDFS下载jar和更新配置,集群Run,可接收Job;

Flink集群的HA方案:

在Flink的基本架构图中,我们发现这一Master-Slave模式存在单点问题,即:JobManager这个点万一down掉,整个集群也就全完了。Flink一共提供了三种部署模式:Local、Standalone、YARN,除第一种为本地单机模式外,后两者都为集群模式。对于Standalone和YARN,Flink提供了HA机制避免上述单点失败问题,使得集群能够从失败中恢复。

YARN模式:

上段中介绍到Yarn层面的机构,注意到Flink的JobManager与YARN的Application Master(简称AM)是在同一个进程下的。YARN的ResourceManager对AM有监控,当AM异常时,YARN会将AM重新启动,启动后,所有JobManager的元数据从HDFS恢复。但恢复期间,旧的业务不能运行,新的业务不能提交。ZooKeeper(Apache ZooKeeper™)上还是存有JobManager的元数据,比如运行Job的信息,会提供给新的JobManager使用。对于TaskManager的失败,由JobManager上Akka的DeathWatch机制监听处理。当TaskManager失败后,重新向YARN申请容器,创建TaskManager。

Standalone模式:

对于Standalone模式的集群,可以启动多个JobManager,然后通过ZooKeeper选举出leader作为实际使用的JobManager。该模式下可以配置一个主JobManager(Leader JobManager)和多个备JobManager(Standby JobManager),这能够保证当主JobManager失败后,备的某个JobManager可以承担主的职责。下图为主备JobManager的恢复过程。

Apache Flink 框架浅析

二:Flink的流式计算架构

分层栈

Apache Flink 框架浅析

Deployment层:

本地、集群,以及商用的云模式,不再赘述;

runtime层:

Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务;

API层:

API层主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API. 简单来说,DataSet和DataStream都是包含了重复项数据的immutable集合,不同的是,在DataSet里,数据是有限的,而对于DataStream,元素的数量可以是无限的。对程序而言,最初的数据集合来源是Flink program 中的源数据,如双11支付数据大屏的线上实时数据来源;然后通过filter、map、flatmap等API,可以对它们进行转换,从而由初始数据集合派生出新集合。注意,集合是immutable的,只可派生出新的,不能修改原有的;

Libraries层:

Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。

三:特性分析

高吞吐&低延迟

简单来说,Flink在流式计算上相比于Spark Streaming & Storm,突出的优势主要是高吞吐&低延迟,如下图所示:

Apache Flink 框架浅析

支持 Event Time 和乱序事件

Apache Flink 框架浅析

Flink 支持了流处理和 Event Time 语义的窗口机制。 在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

  • Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;
  • Processing time :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。

许多流处理场景中,事件发生的时间和事件到达待处理的消息队列时间有各种延迟:

  1. 各种网络延迟;
  2. 数据流消费者导致的队列阻塞和反压影响;
  3. 数据流毛刺,即,数据波动;
  4. 事件生产者(移动设备、传感器等)离线;

上述诸多原因会导致队列中的消息频繁乱序。事件发生的时间和事件到达待处理的消息队列时间的不同随着时间在不断变化,这常被称为时间偏移(event time skew),表示成:“processing time – event time”。

Apache Flink 框架浅析

对大部分应用来讲,基于事件的创建时间分析数据比基于事件的处理时间分析数据要更有意义。Flink允许用户定义基于事件时间(event time)的窗口,而不是处理时间。

Flink使用事件时间 clock来跟踪事件时间,其是以watermarks来实现的。watermarks是Flink 源流基于事件时间点生成的特殊事件。 T 时间点的watermarks意味着,小于 T 的时间戳的事件不会再到达。Flink的所有操作都基于watermarks来跟踪事件时间。

状态计算的exactly-once和容错机制

流程序可以在计算过程中维护自定义状态。

Apache Flink 框架浅析

Apache Flink 提供了可以恢复数据流应用到一致状态的容错机制。确保在发生故障时,程序的每条记录只会作用于状态一次(exactly-once),不过也可以降级为至少一次(at-least-once)。这一容错机制通过持续创建分布式数据流的快照来实现。对于状态占用空间小的流应用,这些快照非常轻量,可以高频率创建而对性能影响很小。流计算应用的状态保存在一个可配置的环境,如:master 节点或者 HDFS上。

在遇到程序故障时(如机器、网络、软件等故障),Flink 停止分布式数据流。系统重启所有 operator ,重置其到最近成功的 checkpoint。输入重置到相应的状态快照位置。保证被重启的并行数据流中处理的任何一个 record 都不是 checkpoint 状态之前的一部分。

为了能保证容错机制生效,数据源(例如消息队列或者broker)需要能重放数据流。Apache Kafka 有这个特性,Flink 中 Kafka 的 connector 利用了这个功能。集团的TT系统也有同样功能。

Apache Flink 框架浅析

Flink 分布式快照的核心概念之一就是数据栅栏(barrier)。如上图所示,这些 barrier 被插入到数据流中,作为数据流的一部分和数据一起向下流动。Barrier 不会干扰正常数据,数据流严格有序。一个 barrier 把数据流分割成两部分:一部分进入到当前快照,另一部分进入下一个快照。每一个 barrier 都带有快照 ID,并且 barrier 之前的数据都进入了此快照。Barrier 不会干扰数据流处理,所以非常轻量。多个不同快照的多个 barrier 会在流中同时出现,即多个快照可能同时创建。

Barrier 在数据源端插入,当 snapshot N 的 barrier 插入后,系统会记录当前 snapshot 位置值N (用Sn表示)。例如,在 Apache Kafka 中,这个变量表示某个分区中最后一条数据的偏移量。这个位置值 Sn 会被发送到一个称为 Checkpoint Coordinator 的模块(即 Flink 的 JobManager).

然后 barrier 继续往下流动,当一个 operator 从其输入流接收到所有标识 snapshot N 的 barrier 时,它会向其所有输出流插入一个标识 snapshot N 的 barrier。当 sink operator(DAG 流的终点)从其输入流接收到所有 barrier N 时,它向Checkpoint Coordinator 确认 snapshot N 已完成。当所有 sink 都确认了这个快照,快照就被标识为完成。

高度灵活的流式窗口Window

Flink支持在时间窗口,统计窗口,session 窗口,以及数据驱动的窗口,窗口(Window)可以通过灵活的触发条件来定制,以支持复杂的流计算模式。

Apache Flink 框架浅析

来自云邪的描述 ——:“在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。”

窗口可以是时间驱动的(Time Window,例如:每30秒钟),也可以是数据驱动的(Count Window,例如:每一百个元素)。一种经典的窗口分类可以分成:翻滚窗口(Tumbling Window),滚动窗口(Sliding Window),和会话窗口(Session Window)。

带反压(BackPressure)的连续流模型

数据流应用执行的是不间断的(常驻)operators。

Flink streaming 在运行时有着天然的流控:慢的数据 sink 节点会反压(backpressure)快的数据源(sources)。

Apache Flink 框架浅析

反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或者遇到大促或秒杀活动导致流量陡增。反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃。

Flink的反压:

如果你看到一个task的back pressure告警(比如,high),这意味着生产数据比下游操作算子消费的速度快。Record的在你工作流的传输方向是向下游,比如从source到sink,而back pressure正好是沿着反方向,往上游传播。

举个简单的例子,一个工作流,只有source到sink两个步骤。假如你看到source端有个告警,这意味着sink消费数据速率慢于生产者的生产数据速率。Sink正在向上游进行back pressure。

绝妙的是,在Spark Streaming和Storm是棘手问题的BackPressure,在Flink中并不成问题。简单来说,Flink无需进行反压,因为系统接收数据的速率和处理数据的速率是自然匹配的。系统接收数据的前提是接收数据的Task必须有空闲可用的Buffer,该数据被继续处理的前提是下游Task也有空闲可用的Buffer。因此,不存在系统接受了过多的数据,导致超过了系统处理的能力。这有点像Java线程中的通用阻塞队列: 一个较慢的接受者会降低发送者的发送速率,因为一旦队列满了(有界队列)发送者会被阻塞。

原文链接

本文为阿里云原创内容,未经允许不得转载。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
List的Select 和Select().tolist()
List<PersondelpnewList<Person{newPerson{Id1,Name"小明1",Age11,Sign0},newPerson{Id2,Name"小明2",Age12,
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这