Apache Flink on K8s:四种运行模式,我该选择哪种?

Stella981
• 阅读 1549

1. 前言

Apache Flink 是一个分布式流处理引擎,它提供了丰富且易用的API来处理有状态的流处理应用,并且在支持容错的前提下,高效、大规模的运行此类应用。通过支持事件时间(event-time)、计算状态(state)以及恰好一次(exactly-once)的容错保证,Flink迅速被很多公司采纳,成为了新一代的流计算处理引擎。

2020 年 2 月 11 日,社区发布了 Flink 1.10.0 版本, 该版本对性能和稳定性做了很大的提升,同时引入了 native Kubernetes 的特性。对于 Flink 的下一个稳定版本,社区在 2020 年 4 月底冻结新特性的合入,预计在 2020 年 5 月中旬会推出 Flink 1.11,在新版本中将重点引入新特性,以扩容 Flink 的使用场景。

Kubernetes 项目源自 Google 内部 Borg 项目,基于 Borg 多年来的优秀实践和其超前的设计理念,并凭借众多豪门、大厂的背书,时至今日,Kubernetes 已经成长为容器管理领域的事实标准。在大数据及相关领域,包括 Spark,Hive,Airflow,Kafka 等众多知名产品正在迁往 Kubernetes,Apache Flink 也是其中一员。

Flink 选择 Kubernetes 作为其底层资源管理平台,原因包括两个方面:

1)Flink 特性:流式服务一般是常驻进程,经常用于电信网质量监控、商业数据即席分析、实时风控和实时推荐等对稳定性要求比较高的场景;

2)Kubernetes 优势:为在线业务提供了更好的发布、管理机制,并保证其稳定运行,同时 Kubernetes 具有很好的生态优势,能很方便的和各种运维工具集成,如 prometheus 监控,主流的日志采集工具等;同时 K8S 在资源弹性方面提供了很好的扩缩容机制,很大程度上提高了资源利用率。

在 Flink 的早期发行版 1.2 中,已经引入了 Flink Session 集群模式,用户得以将 Flink 集群部署在 Kubernetes 集群之上。

随着 Flink 的逐渐普及,越来越多的 Flink 任务被提交在用户的集群中,用户发现在 session 模式下,任务之间会互相影响,隔离性比较差,因此在 Flink 1.6 版本中,推出了 Per Job 模式,单个任务独占一个 Flink 集群,很大的程度上提高了任务的稳定性。

在满足了稳定性之后,用户觉得这两种模式,没有做到资源按需创建,往往需要凭用户经验来事先指定 Flink 集群的规格,在这样的背景之下,native session 模式应用而生,在 Flink 1.10 版本进入 Beta 阶段,我们增加了 native per job 模式,在资源按需申请的基础上,提高了应用之间的隔离性。

本文根据 Flink 在 Kubernetes 集群上的运行模式的趋势,依次分析了这些模式的特点,并在最后介绍了 Flink operator 方案及其优势。

2. Flink运行模式

本文首先分析了 Apache Flink 1.10 在 Kubernetes 集群上已经GA(生产可用)的两种部署模式,然后分析了处于 Beta 版本的 native session 部署模式和即将在 Flink 1.11 发布的 native per-job 部署模式,最后根据这些部署模式的利弊,介绍了当前比较 native kubernetes 的部署方式,flink-operator。

我们正在使用的 Flink 版本已经很好的支持了 native session 和 native per-job 两种模式,在 flink-operator 中,我们也对这两种模式也做了支持。

接下来将按照以下顺序分析了 Flink 的运行模式,读者可以结合自身的业务场景,考量适合的 Flink 运行模式。

  • Flink session 模式
  • Flink per-job 模式
  • Flink native session 模式
  • Flink native per-job 模式

这四种部署模式的优缺点对比,可以用如下表格来概括,更多的内容,请参考接下来的详细描述。

Apache Flink on K8s:四种运行模式,我该选择哪种?

2.1 Session Cluster 模式

2.1.1 原理简介

Session 模式下,Flink 集群处于长期运行状态,当集群的Master组件接收到客户端提交的任务后,对任务进行分析并处理。用户将Flink集群的资源描述文件提交到 Kubernetes 之后,Flink 集群的 FlinkMaster 和 TaskManager 会被创建出来,如下图所示,TaskManager 启动后会向 ResourceManager 模块注册,这时 Flink Session 集群已经准备就绪。当用户通过 Flink Clint 端提交了 Job 任务时,Dispatcher 收到该任务请求,将请求转发给 JobMaster,由 JobMaster 将任务分配给具体的 TaskManager。

Apache Flink on K8s:四种运行模式,我该选择哪种?

2.1.2 特点分析

这种类型的 Flink 集群,FlinkMaster 和 TaskManager 是以Kubernetes deployment的形式长期运行在 Kubernetes 集群中。在提交作业之前,必须先创建好 Flink session 集群。多个任务可以同时运行在同一个集群内,任务之间共享 K8sResourceManager 和 Dispatcher,但是 JobMaster 是单独的。这种方式比较适合运行短时作业、即席查询、任务提交频繁、或者对任务启动时长比较敏感的场景。

优点:作业提交的时候,FlinkMaster 和 TaskManager已经准备好了,当资源充足时,作业能够立即被分配到 TaskManager 执行,无需等待 FlinkMaster,TaskManager,Service 等资源的创建;

缺点:1)需要在提交 Job 任务之前先创建 Flink 集群,需要提前指定 TaskManager 的数量,但是在提交任务前,是难以精准把握具体资源需求的,指定的多了,会有大量 TaskManager 处于闲置状态,资源利用率就比较低,指定的少了,则会有任务分配不到资源,只能等集群中其他作业执行完成后,释放了资源,下一个作业才会被正常执行。

  1. 隔离性比较差,多个 Job 任务之间存在资源竞争,互相影响;如果一个 Job 异常导致 TaskManager crash 了,那么所有运行在这个 TaskManager 上的 Job 任务都会被重启;进而,更坏的情况是,多个 Jobs 任务的重启,大量并发的访问文件系统,会导致其他服务的不可用;最后一点是,在 Rest interface 上是可以看到同一个 session 集群里其他人的 Job 任务。

2.2 Per Job Cluster 模式

顾名思义,这种方式会专门为每个 Job 任务创建一个单独的 Flink 集群,当资源描述文件被提交到 Kubernetes 集群, Kubernetes 会依次创建 FlinkMaster Deployment、TaskManagerDeployment 并运行任务,任务完成后,这些 Deployment 会被自动清理。

Apache Flink on K8s:四种运行模式,我该选择哪种?

2.2.1 特点分析

优点:隔离性比较好,任务之间资源不冲突,一个任务单独使用一个 Flink 集群;相对于 Flink session 集群而且,资源随用随建,任务执行完成后立刻销毁资源,资源利用率会高一些;

缺点:需要提前指定 TaskManager 的数量,如果 TaskManager 指定的少了会导致作业运行失败,指定的多了仍会降低资源利用率;资源是实时创建的,用户的作业在被运行前,需要先等待以下过程:

· Kubernetes scheduler为FlinkMaster和 TaskManager 申请资源并调度到宿主机上进行创建;

· Kubernetes kubelet拉取FlinkMaster、TaskManager 镜像,并创建出FlinkMaster、TaskManager容器;

· TaskManager启动后,向Flink ResourceManager 注册。

这种模式比较适合对启动时间不敏感、且长时间运行的作业。不适合对任务启动时间比较敏感的场景。

2.3 Native Session Cluster 模式

2.3.1 原理分析

Apache Flink on K8s:四种运行模式,我该选择哪种?

  1. Flink提供了Kubernetes模式的入口脚本 kubernetes-session.sh,当用户执行了该脚本之后,Flink 客户端会生成 Kubernets 资源描述文件,包括 FlinkMaster Service,FlinkMasterDeloyment,Configmap,Service并设置了owner reference,在 Flink 1.10 版本中,是将 FlinkMaster Service 作为其他资源的 Owner,也就意味着在删除 Flink 集群的时候,只需要删除 FlinkMaster service,其他资源则会被以及联的方式自动删除;

  2. Kubernetes 收到来自 Flink 的资源描述请求后,开始创建FlinkMaster Service,FlinkMaster Deloyment,以及 Configmap 资源,从图中可以看到,伴随着 FlinkMaster 的创建,Dispatch 和K8sResMngr 组件也同时被创建了,这里的 K8sResMngr 就是 Native 方式的核心组件,正是这个组件去和 Kubernetes API server 进行通信,申请 TaskManager 资源;当前,用户已经可以向Flink 集群提交任务请求了;

  3. 用户通过 Flink client 向 Flink 集群提交任务,flink client 会生成 Job graph,然后和 jar 包一起上传;当任务提交成功后,JobSubmitHandler 收到了请求并提交给 Dispatcher并生成 JobMaster, JobMaster 用于向 KubernetesResourceManager 申请 task 资源;

  4. Kubernetes-Resource-Manager 会为 taskmanager 生成一个新的配置文件,包含了 service 的地址,这样当 Flink Master 异常重建后,能保证 taskmanager 通过 Service 仍然能连接到新的 Flink Master;

  5. TaskManager 创建成功后注册到 slotManager,这时 slotManager向TaskManager 申请 slots,TaskManager 提供自己的空闲 slots,任务被部署并运行;

2.3.2. 特点分析

之前我们提到的两种部署模式,在 Kubernetes 上运行 Flink 任务是需要事先指定好 TaskManager 的数量,但是大部分情况下,用户在任务启动前是无法准确的预知该任务所需的 TaskManager 数量和规格。

指定的多了会资源浪费,指定的少了会导致任务的执行失败。最根本的原因,就是没有 Native 的使用 Kubernetes 资源,这里的 Native,可以理解为 Flink 直接与 Kuberneter 通信来申请资源。

这种类型的集群,也是在提交任务之前就创建好了,不过只包含了 FlinkMaster 及其 Entrypoint(Service),当任务提交的时候,Flink client 会根据任务计算出并行度,进而确定出所需 TaskManager 的数量,然后 Flink 内核会直接向 Kubernetes API server 申请 taskmanager,达到资源动态创建的目的。

  • 优点:相对于前两种集群而言,taskManager 的资源是实时的、按需进行的创建,对资源的利用率更高,所需资源更精准。
  • 缺点:taskManager 是实时创建的,用户的作业真正运行前, 与 Per Job集群一样, 仍需要先等待 taskManager 的创建, 因此对任务启动时间比较敏感的用户,需要进行一定的权衡。

2.4 Native Per Job 模式

在当前的 Apache Flink 1.10 版本里,Flink native per-job 特性尚未发布,预计在后续的 Flink 1.11 版本中提供,我们可以提前一览 native per job 的特性。

2.4.1 原理分析

Apache Flink on K8s:四种运行模式,我该选择哪种?

当任务被提交后,同样由 Flink 来向 kubernetes 申请资源,其过程与之前提到的 native session 模式相似,不同之处在于:

  1. Flink Master是随着任务的提交而动态创建的;
  2. 用户可以将 Flink、作业 Jar 包和 classpath 依赖打包到自己的镜像里;
  3. 作业运行图由 Flink Master 生成,所以无需通过 RestClient 上传 Jar 包(图 2 步骤 3)。

2.4.2. 特点分析

native per-job cluster 也是任务提交的时候才创建 Flink 集群,不同的是,无需用户指定 TaskManager 资源的数量,因为同样借助了 Native 的特性,Flink 直接与 Kubernetes 进行通信并按需申请资源。

  • 优点:资源按需申请,适合一次性任务,任务执行后立即释放资源,保证了资源的利用率;
  • 缺点:资源是在任务提交后开始创建,同样意味着对于提交任务后对延时比较敏感的场景,需要一定的权衡;

3.1 简介

分析以上四种部署模式,我们发现,对于 Flink 集群的使用,往往需要用户自行维护部署脚本,向 Kubernetes 提交各种所需的底层资源描述文件(Flink Master,TaskManager,配置文件,Service)。

在 session cluster 下,如果集群不再使用,还需要用户自行删除这些的资源,因为这类集群的资源使用了 Kubernetes 的垃圾回收机制 owner reference,在删除 Flink 集群的时候,需要通过删除资源的 Owner 来进行及联删除,这对于不熟悉 Kubernetes 的 Flink 用户来说,就显得不是很友好了。

而通过 Flink-operator,我们可以把 Flink 集群描述成 yaml 文件,这样,借助 Kubernetes 的声明式特性和协调控制器,我们可以直接管理 Flink 集群及其作业,而无需关注底层资源如 Deployment,Service,ConfigMap 的创建及维护。

当前 Flink 官方还未给出 flink-operator 方案,不过 GoogleCloudPlatform 提供了一种基于 kubebuilder 构建的 flink-operator方案。接下来,将介绍 flink-operator 的安装方式和对 Flink 集群的管理示例。

当 Fink operator 部署至 Kubernetes 集群后, FlinkCluster 资源和 Flink Controller 被创建。其中 FlinkCluster 用于描述 Flink 集群,如 JobMaster 规格、TaskManager 和 TaskSlot 数量等;Flink Controller 实时处理针对 FlinkCluster 资源的 CRUD 操作,用户可以像管理内置 Kubernetes 资源一样管理 Flink 集群。

例如,用户通过 yaml 文件描述期望的 Flink 集群并向 Kubernetes 提交,Flink controller 分析用户的 yaml,得到 FlinkCluster CR,然后调用 API server 创建底层资源,如JobMaster Service, JobMaster Deployment,TaskManager Deployment。

Apache Flink on K8s:四种运行模式,我该选择哪种?

通过使用 Flink Operator,有如下优势:

1. 管理 Flink 集群更加便捷

flink-operator 更便于我们管理 Flink 集群,我们不需要针对不同的 Flink 集群维护 Kubenretes 底层各种资源的部署脚本,唯一需要的,就是 FlinkCluster 的一个自定义资源的描述文件。创建一个 Flink session 集群,只需要一条 kubectl apply 命令即可,下图是 Flink Session集群的 yaml 文件,用户只需要在该文件中声明期望的 Flink 集群配置,flink-operator 会自动完成 Flink 集群的创建和维护工作。如果创建 Per Job 集群,也只需要在该 yaml 中声明 Job 的属性,如 Job 名称,Jar 包路径即可。通过 flink-operator,上文提到的四种 Flink 运行模式,分别对应一个 yaml 文件即可,非常方便。

apiVersion: flinkoperator.k8s.io/v1beta1kind: FlinkClustermetadata:  name: flinksessioncluster-samplespec:  image:    name: flink:1.10.0    pullPolicy: IfNotPresent  jobManager:    accessScope: Cluster    ports:      ui: 8081    resources:      limits:        memory: "1024Mi"        cpu: "200m"  taskManager:    replicas: 1    resources:      limits:        memory: "2024Mi"        cpu: "200m"    volumes:      - name: cache-volume        emptyDir: {}    volumeMounts:      - mountPath: /cache        name: cache-volume  envVars:    - name: FOO      value: bar  flinkProperties:    taskmanager.numberOfTaskSlots: "1"

2. 声明式

通过执行脚本命令式的创建 Flink 集群各个底层资源,需要用户保证资源是否依次创建成功,往往伴随着辅助的检查脚本。借助 flink operator 的控制器模式,用户只需声明所期望的 Flink 集群的状态,剩下的工作全部由 Flink operator 来保证。在 Flink 集群运行的过程中,如果出现资源异常,如 JobMaster 意外停止甚至被删除,Flink operator 都会重建这些资源,自动的修复 Flink 集群。

3. 自定义保存点

用户可以指定 autoSavePointSeconds 和保存路径,Flink operator 会自动为用户定期保存快照。

4. 自动恢复

流式任务往往是长期运行的,甚至 2-3 年不停止都是常见的。在任务执行的过程中,可能会有各种各样的原因导致任务失败。用户可以指定任务重启策略,当指定为 FromSavePointOnFailure,Flink operator 自动从最近的保存点重新执行任务。

5. sidecar containers

sidecar 容器也是 Kubernetes 提供的一种设计模式,用户可以在 TaskManager Pod 里运行 sidecar 容器,为 Job 提供辅助的自定义服务或者代理服务。

6. Ingress 集成

用户可以定义 Ingress 资源,flink operator 将会自动创建 Ingress 资源。云厂商托管的 Kubernetes 集群一般都有 Ingress 控制器,否则需要用户自行实现 Ingress controller。

7. Prometheus 集成

通过在 Flink 集群的 yaml 文件里指定 metric exporter 和 metric port,可以与 Kubernetes 集群中的 Prometheus 进行集成。

最后

通过本文,我们了解了 Flink 在 Kubernetes 上运行的不同模式,其中 Native 模式在资源按需申请方面比较突出,借助 kubernetes operator,我们可以将 Flink 集群当成Kubernetes原生的资源一样进行 CRUD 操作。限于篇幅,本文主要分析了 Flink 在 Kubernetes 上的运行模式的区别,后续将会有更多的文章来对 Flink 在 Kubernetes 上的最佳实践进行描述,敬请期待。

参考文档

Kubernetes native integration

https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit#heading=h.thxqqaj3vxmz

Flink operator 使用文档

https://github.com/tkestack/flink-on-k8s-operator/tree/nativePerJob

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这