Flink01

Stella981
• 阅读 575

1. 什么是Flink?

1.1 4代大数据计算引擎

第一代: MapReducer

批处理 Mapper, Reducer

Hadoop的MapReducer将计算分为两个阶段, 分别为Map和Reducer. 对于上层应用来说, 就不得不想方设法去拆分算法, 甚至于不得不在上层应用实现多个Job的串联, 以完成一个完整的算法, 例如迭代计算.

第二代: DAG框架 (Tez) + MapReducer

批处理  1个Tez = MR (1) + MR (2) + ... + MR (n)  相比MR效率有所提升

第三代: Spark

批处理, 流处理, SQL高层API支持  自带DAG  内存迭代计算, 性能较之前大幅提升

第四代: Flink

批处理, 流处理, SQL高层API支持  自带DAG  流式计算性能更高, 可靠性更高

1.2 Flink概述:

官网: https://flink.apache.org/

Stateful Computations over Data Streams, 即数据流上的有状态的计算.

  1. 分布式计算

  2. 支持批处理, 即处理静态的数据集, 历史的数据集

  3. 支持流处理, 即实时的处理一些实时数据流

  4. 支持复杂事件处理, Flink CEP

logo介绍: 在德语中, Flink一词表示快速和灵巧, 松鼠具有快速和灵巧的特点. 柏林的松鼠是红棕色的, Flink的松鼠logo尾巴的颜色与Apache软件基金会的logo颜色相呼应.

1.3 流的划分:

  1. 有界流: 对源源不断的流进行边界划分, 有界流的处理也可以成为批处理.

  2. 无界流: 只有开始没有结束. 必须连续的处理无界流数据, 不能等待数据到达了再去全部处理, 因为数据是无界的并且永远不会结束数据注入. 处理无界流数据往往要求事件注入的时候有一定的顺序性,

1.4 其他特点:

  1. 性能优秀 (尤其在流计算领域)

  2. 高扩展性

  3. 支持容错

  4. 纯内存式的计算引擎, 做了内存管理方面的大量优化

  5. 支持 eventtime 的处理

  6. 支持超大状态的Job

  7. 支持 exactly-once 的处理

1.5 性能对比:

纵坐标是秒, 横坐标是次数

Flink01

结论: Spark和Flink全部都运行在Hadoop Yarn上, 性能为Flink > Spark > Hadoop (MR) , 迭代次数越多越明显.

性能上, Flink优于Spark和Hadoop最主要的原因是Flink支持增量迭代, 具有对迭代自动优化的功能.

SparkStreaming和Flink对比

SparkStreaming         Flink

定义      弹性的分布式数据集,        真正的流计算, 就想Storm一样,  但Flink同时

并非真正的实时计算      支持有限的数据流计算 (批处理) 和无限数据流计算 (流处理)

高容错     沉重             非常轻量级

内存管理    JVM相关操作暴露给用户    Flink在JVM中实现的是自己的内存管理

程序调优    只有SQL有自动优化机制    自动的优化一些场景,比如避免一些昂贵的操作 (如shuffle和sorts), 还有一些中间缓存

2. Flink集群:

2.1 Flink支持多种安装模式:

local (本地) -- 单机模式, 一般不使用

standalone -- 独立模式, Flink自带集群, 开发测试环境使用

yarn -- 计算资源统一由Hadoop YARN管理, 生产测试环境使用

2.2 三种模式架构 (本地模式, Standalone模式, YARN模式):

本地模式:

Flink01

  1. Flink程序需要提交给JobClient

  2. JobClient将作业提交给JobManager

  3. JobManager负责协调资源分配和作业执行. 资源分配完成后, 任务将提交给相应的TaskManager

4) TaskManager启动一个线程以开始执行, TaskManager会向JobManager报告状态更改. 例如开始执行, 正在进行或已完成

  1. 作业执行完成后, 结果将发送回客户端 (JobClient)

Standalone集群:

Flink01

  1. Client客户端提交任务给JobManager

2) JobManager负责Flink集群计算资源管理,并分发任务给TaskManager执行

3) TaskManager定期向JobManager汇报状态

  1. TaskManager启动一个线程以开始执行, TaskManager会向JobManager报告状态更改, 例如开始执行, 正在进行将已完成

  2. 多个TaskManager会进行交换中间结果

  3. 作业执行完成后, 结果将发送回客户端 (JobClient)

StandaloneHA集群:

Flink01

YARN集群:

在一个企业中, 为了最大化的利用集群资源, 一般都会在一个集群中同时运行多种类型的Workload, 因此Flink也支持在Yarn上面运行, Flink on Yarn的前提是: HDFS, YARN均启动

Flink运行在YARN上, 可以使用yarn-session来快速提交作业到YARN集群

Flink01

Flink01

  1. Client上传jar包和配置文件到HDFS集群上

  2. 向ResourceManager申请资源和请求AppMaster容器

  3. Yarn分配资源AppMaster容器, 并启动JobManager

JobManager和ApplicationMaster运行在同一个container上. 一旦他们被成功启动, AppMaster就知道JobManager的地址 (AM它自己所在的机器) . 它就会为TaskManager生成一个新的

Flink配置文件 (他们就可以连接到JobManager) . 这个配置文件也被上传到HDFS上. 此外, AppMaster容器也提供了Flink的web服务接口. YARN所分配的所有端口都是临时端口, 这允许用户并行执行多个Flink.

  1. 申请worker资源, 启动TaskManager

5) TaskManager从HDFS上下载所需要的jar包和配置文件

yarn-session提供两种模式: 会话模式和分离模式

    • 会话模式
      • 使用Flink中的yarn-session(yarn客户端),会启动两个必要服务JobManagerTaskManager

      • 客户端通过yarn-session提交作业

      • yarn-session会一直启动,不停地接收客户端提交的作业

      • 有大量的小作业,适合使用这种方式

    • 分离模式
      • 直接提交任务给YARN

      • 大作业,适合使用这种方式

3. Flink架构介绍:

3.1 Flink基石

四大基石:

    • CheckPoint: 基于Chandy-Lamport算法实现了分布式一致性快照, 提供了一致性的语义
    • State: 丰富的StateAPI. ValueState, ListState, MapState, BroadCastState
    • Time: 实现了WaterMark机制, 乱序数据处理, 迟到数据摒容忍
    • Window: 开箱即用的滚动, 滑动, 会话窗口以及灵活的自定义窗口
  1. Checkpoint机制: Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义

  2. Watermark机制: 能够支持基于事件的事件的处理,或者说基于系统时间的处理,能够容忍数据的迟到、容忍乱序的数据。

3.2 组件栈

Flink是一个分层架构的系统, 每一层所包含的组件都提供了特定的抽象, 用来服务于上层组件. Flink分层的组件栈图:

Flink01

部署层Deploy: Flink支持本地运行, 能在独立集群或者在被Yarn管理的集群上运行, 也能部署在云上.

运行时Core: Runtime提供了支持Flink计算的全部核心实现, 为上层API提供基础服务.

API: DataStream, DataSet, Table, SQL API.

扩展库Libraries: Flink还包括用于复杂事件处理, 机器学习, 图形处理和Apache Storm兼容性的专用代码库.

3.3 Flink数据流编程模型抽象级别

Flink提供了不同的抽象级别以开发流式或批处理应用

Flink01

最底层提供了有状态流, 他将通过过程函数 (Process Function) 嵌入到DataStream API中. 他允许用户可以自由的处理来自一个或多个流数据的事件, 并使用一致, 容错的状态. 除此之外, 用户可以注册事件时间和处理事件回调, 从而使程序可以实现复杂的计算.

DataStream / DataSet API是Flink提供的核心API, DataSet处理有界的数据集, DataStream处理有界或者无界的数据流. 用户可以通过各种方法 (map / flatmap / window / keyby / sum / max / min / avg / join等) 将数据进行转换和计算

Table API是以表为中心的声明式DSL, 其中表可能会动态变化 (在表达数据时) . Table API提供了例如select, project, join, groupby, aggregate等操作, 使用起来却更加简洁, 代码量更少. 你可以在表与DataStream / DataSet之间无缝切换, 也允许程序将Table API与DataStream以及DataSet混合使用.

Flink提供的最高层级的抽象是SQL, 这一层抽象在语法与表达能力上与Table API类似, 但是是以SQL查询表达式的形式表现程序, SQL抽象与Table API交互密切, 同时SQL查询可以直接在Table API定义的表上执行.

3.4 Flink程序结构

Flink程序的基本构建块是转换 (注意: Flink的DataSet API中使用的DataSet也是内部流) . 从概念上讲, 流是 (可能永无止境的) 数据记录流, 而转换是将一个或多个流作为一个或多个流的操作. 输入, 并产生一个或多个输出流.

Flink应用程序结构:

Flink01

Source: 数据源, Flink在流处理和批处理上的source大概有4类: 基于本地集合的source; 基于文件的source; 基于网络套接字的source; 自定义的source. 自定义的source常见的有Apache Kafka, RabbitMQ等, 当然也可以自己定义source.

Transformation: 数据转换的各种操作, 有map / flatmap / filter / keyby / reducer / fold / aggregations / window / windowall / union / window join / spilt / select 等, 操作很多, 可以将数据转换计算成你想要的数据.

Sink: 接收器, Flink将转换计算后的数据发送的地点, 你可能需要存储下来, Flink常见的sink大概有如下几类: 写入文件, 打印出来, 写入socket, 自定义的sink. 自定义的sink常见的有Apache kafka, RabbitMQ, MySQL, ElasticSearch, Apache Cassandra, Hadoop FileSystem 等, 同理你也可以定义自己的sink.

3.5 Flink并行数据流

Flink程序在执行的时候, 会被映射成一个Streaming Dataflow, 一个Streaming Dataflow是由一组Stream和Transformation Operator组成的. 在启动时从一个或多个Source Operation开始, 结束于一个或多个Sink Operation.

Flink程序本质上是并行的和分布式的, 在执行过程中, 一个流 (Stream) 包含一个或多个流分区, 而每一个operation包含一个或多个operation子任务. 操作子任务间彼此独立, 在不同的线程中执行, 甚至是不同的机器或不同的容器上. operator子任务的数量是这一特定operation的并行度. 相同程序中的不同operation有不同级别的并行度.

Flink01

一个Stream可以被分成多个Stream的分区, 也就是Stream Partition. 一个Operation也可以被分为多个Operation Subtask. 如上图中, Source被分成Source1和Source2, 他们分别为Source的Operation Subtask. 每一个Operation Subtask都是在不同的线程当中独立执行的. 一个Operation的并行度, 就等于Operation Subtask的个数. 上图Source的并行度为2, 而一个Stream的并行度就等于他生成的Operation的并行度.

数据在两个Operation之间传递的时候有两种模式:

One to One 模式: 两个Operation用此模式传递的时候, 会保持数据的分区数和数据的排序, 如上图中的Source1到Map1, 他就保留的Source的分区特性, 以及分区元素处理的有序性.

Redistrbuting (重新分配) 模式: 这种模式会改变数据的分区数. 每个一个Operation Subtask会根据选择transformation把数据发送到不同的目标Subtasks, 比如keyBy () 会通过hashcode重新分区, broadcast () 和rebalance ()方法会随机重新分区.

3.6 Task和Operation Chain

Flink的所有操作都称之为Operation, 客户端在提交任务的时候会对Operation进行优化操作, 能进行合并的Operation会被合并为一个Operation, 合并后的Operation成为Operator Chain, 实际上就是一个执行链, 每个执行链会在TaskManager上一个独立的线程中执行.

Flink01

3.7 任务调度与执行

Flink01

  1. Flink执行executor会自动根据代码生成DAG数据流图

  2. ActorSystem创建Actor将数据流图发送给JobManager中的Actor

  3. JobManager会不断接收TaskManager的心跳信息, 从而可以获取到有效的TaskManager

  4. JobManager通过调度器在TaskManager中调度执行Task (在Flink中, 最小的调度单元就是Task, 对应就是一个线程)

  5. 在程序执行过程中, Task与Task之间是可以进行数据传输的

Flink中的执行图可以分成四层, StreamGraph -> JobGraph -> ExecutionGraph -> 无物理执行图

StreamGraph: 是根据用户通过Stream API编写的代码生成的最初的图. 用来表示程序的拓扑结构.

JobGraph: StreamGraph经过优化后生成了JobGraph, 提交给JobManager的数据结构. 主要的优化为, 将多个符合条件的节点Chain在一起作为一个节点, 这样可以减少数据在节点之间流动所需要的序列化 / 反序列化 / 传输消耗.

ExecutionGraph: JobManager根据JobGraph生成ExecutionGraph. ExecutionGraph是JobGraph的并行化版本, 是调度层最核心的数据结构.

物理执行图: JobManager根据ExecutionGraph对Job进行调度后, 在各个TaskManager上部署Task后形成的"图" , 并不是一个具体的数据结构.

Job Client:

主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回.

Job Client不是Flink程序执行的内部部分, 但他是任务执行的起点.

Job Client负责接受用户的程序代码, 然后创建数据流, 将数据流提交给Job Manager以便进一步执行. 执行完成后, Job Client将结果返回给用户

JobManager:

主要职责是调度工作并协调任务做检查点.

集群中至少要有一个master, master负责调度Task, 协调checkpoint和容错.

高可用设置的话可以有多个master, 但要保证一个是leader, 其他是standby.

Job Manager包含Actor System, Scheduler, CheckPoint协调器三个重要组件.

Job Manager从客户端接收到任务以后, 首先生成优化过的执行计划, 再调度到TaskManager中执行.

TaskManager:

主要职责是从JobManager接收任务, 并部署和启动任务, 接收上游的数据并处理.

Task Manager是在JVM中的一个或多个线程中执行任务的工作节点.

TaskManager在创建之初就设置好了Slot, 每个Slot可以执行一个任务.

3.8任务槽 (Task-Slot) 和槽共享 (Slot Sharing)

Flink01

每个TaskManager是一个JVM进程, 可以在不同的线程中执行一个或多个子任务.

为了控制一个TaskManager能接收多个Task, TaskManager通过Task Slot来进行控制 (一个TaskManager至少有一个Task Slot).

每个Task Slot表示TaskManager拥有资源的一个固定大小的子集.

Flink将进程的内存进行了划分到多个Slot中.

图中有2个TaskManager, 每个TaskManager有3个Slot, 每个Slot占有1 / 3的内存.

内存被划分到不同的Slot之后可以获得如下好处:

  1. TaskManager最多能同时并发执行的任务是可以控制的,  那就是3个, 因为不能超过Slot的数量.

  2. Slot有独占的内存空间, 这样在一个TaskManager中可以运行多个不同的作业, 作业之间不受影响.

槽共享 (Slot Sharing)

  1. Flink默认允许同一个Job下的Subtask可以共享Slot.

  2. 这样可以使得同一个Slot运行整个Job的流水线  (pipleline)

  3. 槽共享可以获得如下好处:

a. 只需计算Job中最高并行度 (parallelism) 的Task Slot, 只要这个满足, 其他的Job也能满足.

b. 资源分配更加公平, 如果有比较空闲的Slot可以将更多的任务分配给他. 图中若没有任务槽共享, 负载不高的Source / Map等Subtask将会占据许多资源, 而负载较高的窗口Subtask则会缺乏资源.

c. 有了任务槽共享, 可以提高分槽资源你的利用率. 同时他还可以保障TaskManager给Subtask的分配的Slot方案更加公平.

  1. 经验上讲Slot的数量与CPU-core的数量一致为好. 但考虑到超线程, 可以让slotNumber = 2 * cpuCore

Flink01

3.9 Flink应用场景

Flink01

阿里在Flink的应用主要包含四个模块: 实时监控, 实时报表, 流数据分析, 实时仓库

实时监控:

    • 用户行为预警, app crash预警, 服务器攻击预警
    • 对用户行为或者相关事件进行实时监测和分析, 基于风控规则进行预警

实时报表:

    • 双11, 双12等活动直播大屏
    • 对外数据产品: 生意参谋等
    • 数据化运营

流数据分析:

    • 实时计算相关指标反馈及时调整决策
    • 内容投放, 无线智能推送, 实时个性化推荐等

实时仓库:

    • 数据实时清洗, 归并, 结构化
    • 数仓的补充和优化

4. 思考题:

问: 假设你是一个电商公司, 经常搞运营活动, 但收效甚微, 经过细致排查, 发现原来是羊毛党在薅平台的羊毛, 把补给用户的补贴都薅走了, 钱花了不少, 效果却没达到. 我们应该怎么办呢?

答: 你可以做一个实时的异常检测系统,监控用户的高危行为,及时发现高危行为并采取措施,降低损失。

系统流程:

  1. 用户的行为经由app上报或web日志记录下来, 发送到一个消息队列里去
  2. 然后流计算订阅消息队列, 过滤出感兴趣的行为, 比如: 购买, 领券, 浏览等
  3. 流计算把这个行为特征化
  4. 流计算通过UDF调用外部一个风险模型, 判断这次行为是否有问题 (单次行为)
  5. 流计算里通过CEP功能, 跨多条记录分析用户行为 (比如用户先做了a, 又做了b, 又做了3次c) , 整体识别是否有风险
  6. 综合风险模型和CEP的结果, 产出预警信息
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
6小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(