Flink总结

Stella981
• 阅读 675

参考文章:Flink总结

1. Flink简介

Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处理引擎,是当前实时处理领域的一颗炙手可热的新星

1.1 flink技术栈

Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。

Flink总结

  • 可提供准确的结果产出,即使遇到乱序数据、迟到数据;
  • 有状态可容错(轻量级),可以无感知地从失败中恢复并保持exactly-once的语义(也可以降级为at-least-once进一步降低消息处理延时);
  • 可以大规模地运行在成千上万个节点上并保持高吞吐、低延迟,可以standalone模式运行,也可以在YARN和Mesos等资源管理平台上运行;
  • 灵活地支持多种基于时间、数量、会话的窗口;
  • savepoint提供了状态管理机制;

1.3 flink提供的API

Flink提供了不同层次的API用于streaming/batch应用的开发,如下图所示:

Flink总结

  1. 最底层的抽象仅提供状态流(stateful streaming),它通过处理函数嵌入到DataStream API中。
  2. 实践中用Core API比较多,这些流式的API提供了通用的构建入口用于数据处理,像各种用户自定义的transformation、join、aggregation、window、state等。
    1. core API 分为 DataSet API和 DataStream API
    2. DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。
    3. DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。
  3. Table API是以表为中心的声明式DSL(领域特定语言),当这些Table表示的是stream时,Table是动态变化的。Table API遵循扩展的关系模型,提供了包括select、project、join、group-by、aggregate等操作。
  4. Flink提供的最高层级的API是SQL,它在语义和表达能力上与Table API是类似的。

1.4 统一的批处理与流处理系统

在大数据处理领域,批处理任务与流处理任务一般被认为是两种不同的任务,一个大数据项目一般会被设计为只能处理其中一种任务,例如Apache Storm、Apache Smaza只支持流处理任务,而Aapche MapReduce、Apache Tez、Apache Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统,看似一个特例,实则不然——Spark Streaming采用了一种micro-batch的架构,即把输入的数据流切分成细粒度的batch,并为每一个batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Apache Storm、Apache Smaza等完全流式的数据处理方式完全不同。通过其灵活的执行引擎,Flink能够同时支持批处理任务与流处理任务。

在执行引擎这一层,流处理系统与批处理系统最大不同在于节点间的数据传输方式。对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点。这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求。Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型。Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟。如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量。同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量。

Flink总结

在统一的流式执行引擎基础上,Flink同时支持了流计算和批处理,并对性能(延迟、吞吐量等)有所保障。相对于其他原生的流处理与批处理系统,并没有因为统一执行引擎而受到影响从而大幅度减轻了用户安装、部署、监控、维护等成本。

2. Flink的容错机制

对于一个分布式系统来说,单个进程或是节点崩溃导致整个Job失败是经常发生的事情,在异常发生时不会丢失用户数据并能自动恢复才是分布式系统必须支持的特性之一。本节主要介绍Flink流处理系统任务级别的容错机制。

批处理系统比较容易实现容错机制,由于文件可以重复访问,当某个任务失败后,重启该任务即可。但是到了流处理系统,由于数据源是无限的数据流,从而导致一个流处理任务执行几个月的情况,将所有数据缓存或是持久化,留待以后重复访问基本上是不可行的。Flink基于分布式快照与可部分重发的数据源实现了容错。用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照之后的数据。Flink的分布式快照实现借鉴了Chandy和Lamport在1985年发表的一篇关于分布式快照的论文,其实现的主要思想如下:

按照用户自定义的分布式快照间隔时间,Flink会定时在所有数据源中插入一种特殊的快照标记消息,这些快照标记消息和其他消息一样在DAG中流动,但是不会被用户定义的业务逻辑所处理,每一个快照标记消息都将其所在的数据流分成两部分:本次快照数据和下次快照数据。

Flink总结

快照标记消息沿着DAG流经各个操作符,当操作符处理到快照标记消息时,会对自己的状态进行快照,并存储起来。当一个操作符有多个输入的时候,Flink会将先抵达的快照标记消息及其之后的消息缓存起来,当所有的输入中对应该次快照的快照标记消息全部抵达后,操作符对自己的状态快照并存储,之后处理所有快照标记消息之后的已缓存消息。操作符对自己的状态快照并存储可以是异步与增量的操作,并不需要阻塞消息的处理。

Flink总结

当所有的Data Sink(终点操作符)都收到快照标记信息并对自己的状态快照和存储后,整个分布式快照就完成了,同时通知数据源释放该快照标记消息之前的所有消息。若之后发生节点崩溃等异常情况时,只需要恢复之前存储的分布式快照状态,并从数据源重发该快照以后的消息就可以了。

Exactly-Once是流处理系统需要支持的一个非常重要的特性,它保证每一条消息只被流处理系统处理一次,许多流处理任务的业务逻辑都依赖于Exactly-Once特性。相对于At-Least-Once或是At-Most-Once, Exactly-Once特性对流处理系统的要求更为严格,实现也更加困难。Flink基于分布式快照实现了Exactly-Once特性。

相对于其他流处理系统的容错方案,Flink基于分布式快照的方案在功能和性能方面都具有很多优点,包括:

  • 低延迟。由于操作符状态的存储可以异步,所以进行快照的过程基本上不会阻塞消息的处理,因此不会对消息延迟产生负面影响。

  • 高吞吐量。当操作符状态较少时,对吞吐量基本没有影响。当操作符状态较多时,相对于其他的容错机制,分布式快照的时间间隔是用户自定义的,所以用户可以权衡错误恢复时间和吞吐量要求来调整分布式快照的时间间隔。

  • 与业务逻辑的隔离。Flink的分布式快照机制与用户的业务逻辑是完全隔离的,用户的业务逻辑不会依赖或是对分布式快照产生任何影响。

  • 错误恢复代价。分布式快照的时间间隔越短,错误恢复的时间越少,与吞吐量负相关。

3. flink窗口机制

对于流处理系统来说,流入的消息不存在上限,所以对于聚合或是连接等操作,流处理系统需要对流入的消息进行分段,然后基于每一段数据进行聚合或是连接。消息的分段即称为窗口,流处理系统支持的窗口有很多类型,最常见的就是时间窗口,基于时间间隔对消息进行分段处理。本节主要介绍Flink流处理系统支持的各种时间窗口。

对于目前大部分流处理系统来说,时间窗口一般是根据Task所在节点的本地时钟进行切分,这种方式实现起来比较容易,不会产生阻塞。但是可能无法满足某些应用需求,比如:

消息本身带有时间戳,用户希望按照消息本身的时间特性进行分段处理。

由于不同节点的时钟可能不同,以及消息在流经各个节点的延迟不同,在某个节点属于同一个时间窗口处理的消息,流到下一个节点时可能被切分到不同的时间窗口中,从而产生不符合预期的结果。

Flink总结

Flink支持3种类型的时间窗口,分别适用于用户对于时间窗口不同类型的要求:

  • Process Time。根据Task所在节点的本地时钟来切分的时间窗口。

  • Event Time。消息自带时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的所有消息一定会被正确处理。由于消息可能乱序流入Task,所以Task需要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的所有消息都被处理,才可以释放,如果乱序的消息延迟很高会影响分布式系统的吞吐量和延迟。

  • Ingress Time。有时消息本身并不带有时间戳信息,但用户依然希望按照消息而不是节点时钟划分时间窗口,例如避免上面提到的第二个问题,此时可以在消息源流入Flink流处理系统时自动生成增量的时间戳赋予消息,之后处理的流程与Event Time相同。Ingress Time可以看成是Event Time的一个特例,由于其在消息源处时间戳一定是有序的,所以在流处理系统中,相对于Event Time,其乱序的消息延迟不会很高,因此对Flink分布式系统的吞吐量和延迟的影响也会更小。

4. Event Time时间窗口的实现

Flink借鉴了Google的MillWheel项目,通过WaterMark来支持基于Event Time的时间窗口。

当操作符通过基于Event Time的时间窗口来处理数据时,它必须在确定所有属于该时间窗口的消息全部流入此操作符后才能开始数据处理。但是由于消息可能是乱序的,所以操作符无法直接确认何时所有属于该时间窗口的消息全部流入此操作符。WaterMark包含一个时间戳,Flink使用WaterMark标记所有小于该时间戳的消息都已流入,Flink的数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的WaterMark,插入到消息流中输出到Flink流处理系统中,Flink操作符按照时间窗口缓存所有流入的消息,当操作符处理到WaterMark时,它对所有小于该WaterMark时间戳的时间窗口数据进行处理并发送到下一个操作符节点,然后也将WaterMark发送到下一个操作符节点。

为了保证能够处理所有属于某个时间窗口的消息,操作符必须等到大于这个时间窗口的WaterMark之后才能开始对该时间窗口的消息进行处理,相对于基于Operator Time的时间窗口,Flink需要占用更多内存,且会直接影响消息处理的延迟时间。对此,一个可能的优化措施是,对于聚合类的操作符,可以提前对部分消息进行聚合操作,当有属于该时间窗口的新消息流入时,基于之前的部分聚合结果继续计算,这样的话,只需缓存中间计算结果即可,无需缓存该时间窗口的所有消息。

对于基于Event Time时间窗口的操作符来说,流入WaterMark的时间戳与当前节点的时钟一致是最简单理想的状况,但是在实际环境中是不可能的,由于消息的乱序以及前面节点处理效率的不同,总是会有某些消息流入时间大于其本身的时间戳,真实WaterMark时间戳与理想情况下WaterMark时间戳的差别称为Time Skew

Flink总结

Time Skew决定了该WaterMark与上一个WaterMark之间的时间窗口所有数据需要缓存的时间,Time Skew时间越长,该时间窗口数据的延迟越长,占用内存的时间也越长,同时会对流处理系统的吞吐量产生负面影响。

5. Flink反压

反压的意思是当某一个task的处理性能跟不上输入速率的时候,其输入端的Buffer就会被填满,当输入端Buffer被填满的时候就会导致TCP的读取被暂停。TCP的读取被暂停之后,就会导致上游输出端的Buffer池越积越多,因为下游此时已经不再进行消费。

当上游输出端的Buffer池也堆满的时候, TCP通道就会被关闭,其内部所有的TCP channel也会被关闭。从而上游task就会逐级的向上游进行反压,这是整体的反压流程,所以说Flink以前的反压机制是比较原生态、比较粗暴的,因为其控制力度很大,整个TCP中一旦某一个Task性能跟不上,就会把整个TCP连接关掉。如下图所示:

Flink总结

右下角的task虽然处理跟不上了,但上面的task仍然可以继续进行处理。左边这些上游数据可以继续发给右上角的task进行处理。但是由于现在整个的TCP连接都被关闭,导致右上角task同样收不到数据,整体吞吐量实际上是下降的趋势。为了优化这个功能就需要做到更加细密度的流控,目前是关闭整个TCP连接,优化措施就是需要对TCP channel进行控制,当某个task处理不过来时只需要该Task对应的TCP channel,其它TCP channel不受影响。优化实现方式就是基于信用的流控。

基于信用的流控的核心思想就是基于信用额度的消费。比如银行做贷款,为了防止坏账太多,它会对每一个人评估其信用额度,当发放贷款时贷款不会超过这个人能承受的额度。基于这种方式,它能够一方面不会产生太多坏账,另一方面可以充分地把银行的资金利用起来。基于信用的流控就是基于这种思想,Flink中所谓的信用额度,就是指这个下游消费端的可用的Buffer数。如下图:

Flink总结

该图左边是指发送端,有四个输出的队列,每个队列里面的方块代表输出Buffer,即准备丢给下游处理的Buffer。右边是消费端,消费端也有四个队列,这四个队列里面也有一些Buffer块,这些Buffer块是空闲的Buffer,准备用来接收上游发给自己的数据。

上面提到基于数据的流控中所谓的信用就是指这个消费端它可用的Buffer数,代表当前还能够消费多少数据,消费端首先会向上游反馈当前的信用是多少, producer端只会向信用额度大于0的下游进行发送,对于信用额度如果为0的就不再发送数据。这样整个网络的利用率便得到了很大的提升,不会发生某些Buffer被长时间的停留在网络的链路上的情况。

基于信用的流控主要有以下两方面的优化提升:

  • 一个是当某一个task发生反压处理跟不上的时候,不会发生所有的task都卡住,这种做法使吞吐量得到了很大的提升,在阿里内部用双11大屏作业进行测试,这种新的流控算法会得到20%的提升;
  • 另一个是基于事件的I/O,Flink在网络端写数据时会先往一个Buffer块里面写数据,这个Buffer块是一个32K的长度的单位,即32K的大小,当这个Buffer块被填满的时候就会输出到网络里面,或者如果数据流比较慢,没办法很快填满的话,那么会等待一个超时,默认一个100毫秒,即如果100毫秒内还没被填满那么这个Buffer也会被输出到网络里面。此时若是在以前版本中Flink延迟可能是在100毫秒以内,最差的情况下是到100毫秒,因为需要到100毫秒等这个Buffer发出去。
    如果要得到更低的延时,现在的做法就会将这个Buffer直接加入到输出的队列,但是还是保持继续往这个Buffer块里面写数据,当网络里面有容量时这个Buffer块便会立刻被发出去,如果网络现在也比较繁忙,那就继续填充这个Buffer,这样吞吐也会比较好一点。基于这种算法,Flink的延时几乎是完美的,可以看到它的曲线基本上是低于10毫秒的,这也充分利用了网络的容量,几乎对吞吐没有影响。

6. Flink程序与Streaming Dataflow

Flink程序的基本元素包括:

  • stream:由连续不断的data record组成的数据流。
  • transformation:是一种转换操作,作用在一个或多个stream上,输出一个或多个stream。

每个Flink程序可以映射为一个streaming dataflow,这个dataflow由stream和transformation operator组成。每个dataflow是一个DAG,从一个或多个source开始,结束于一个或多个sink。

Flink程序/Streaming dataflow的结构如下图所示:

Flink总结

一个标准Flink程序的组成:

  • 获取一个执行环境(StreamExecutionEnvironment用于流处理,ExecutionEnvironment用于批处理),执行环境可以决定将下面的计算放在本地jvm运行还是提交到Flink集群中运行;
  • 加载初始数据;
  • 在数据上指定需要执行的transformation;
  • 指定将计算结果写到哪里;
  • 触发程序执行;

7. 并行的dataflow

Flink程序在实际运行中是并行的、分布式的:

  • 一个stream会被拆分为一个或多个stream partitions。
  • 一个transformation operator可以拆分为一个或多个operator subtask(subtask的数量称为这个operator的并行度)。每个operator subtask和其它的operator subtask相互独立,并运行在不同的线程中(甚至在不同的机器上)Flink程序中的source、sink也都属于transformation operator。
  • 一个transformation operator中的operator subtask个数就是这个operator的并行度;
  • 一个stream的并行度为对应的producing operator(从数据源读数据的operator)的个数
  • 一个程序中的不同operator可能会有不同的并行度。

一个dataflow的运行结构如下图所示:

Flink总结

流中的数据在不同operator之间的传递方式有两种:

  • one-to-one:像上图的Source[1] -> map[1]。
  • redistributing:像上图的map[1] -> keyBy()/window()/apply() [1]和[2]。
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
List的Select 和Select().tolist()
List<PersondelpnewList<Person{newPerson{Id1,Name"小明1",Age11,Sign0},newPerson{Id2,Name"小明2",Age12,
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这