Spark 3.0 on Kubernetes 的模式与最新发展

Stella981
• 阅读 1508

Spark 3.0发布后,对Kubernetes的原生支持得到大幅增强,从而方便了Spark在云原生环境中的快速部署和运行实例的管理。这里探讨Spark  on Kubernetes 生态的现状与挑战。

1. Standalone 模式

Spark 运行在 Kubernetes 集群上的第一种可行方式是将 Spark 以 Standalone 模式运行,但是很快社区就提出使用 Kubernetes 原生 Scheduler 的运行模式,也就是 Native 的模式。

2. Kubernetes Native 模式

Native 模式简而言之就是将 Driver 和 Executor Pod 化,用户将之前向 YARN 提交 Spark 作业的方式提交给 Kubernetes 的 apiserver,提交命令如下:

$ bin/spark-submit \
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.container.image=<spark-image> \
    local:///path/to/examples.jar

其中 master 就是 kubernetes 的 apiserver 地址。提交之后整个作业的运行方式如下,先将 Driver 通过 Pod 启动起来,然后 Driver 会启动 Executor 的 Pod。这些方式很多人应该都了解了,就不赘述了,详细信息可以参考:https://spark.apache.org/docs/latest/running-on-kubernetes.html

Spark 3.0 on Kubernetes 的模式与最新发展

3. Spark Operator

除了这种直接向 Kubernetes Scheduler 提交作业的方式,还可以通过 Spark Operator 的方式来提交。Operator 在 Kubernetes 中是一个里程碑似的产物。在 Kubernetes 刚面世的时候,关于有状态的应用如何部署在 Kubernetes 上一直都是官方不愿意谈论的话题,直到 StatefulSet 出现。StatefulSet 为有状态应用的部署实现了一种抽象,简单来说就是保证网络拓扑和存储拓扑。但是状态应用千差万别,并不是所有应用都能抽象成 StatefulSet,强行适配反正加重了开发者的心智负担。

然后 Operator 出现了。我们知道 Kubernetes 给开发者提供了非常开放的一种生态,你可以自定义 CRD,Controller 甚至 Scheduler。而 Operator 就是 CRD + Controller 的组合形式。开发者可以定义自己的 CRD,比如我定义一种 CRD 叫 EtcdCluster 如下:

apiVersion: "etcd.database.coreos.com/v1beta2"
kind: "EtcdCluster"
metadata:
  name: "example-etcd-cluster"
spec:
  size: 3
  version: "3.1.10"
  repository: "quay.io/coreos/etcd"

提交到 Kubernetes 之后 Etcd 的 Operator 就针对这个 yaml 中的各个字段进行处理,最后部署出来一个节点规模为 3 个节点的 etcd 集群。你可以在 github 的这个 repo:https://github.com/operator-framework/awesome-operators 中查看目前实现了 Operator 部署的分布式应用。

Google 云平台,也就是 GCP 在 github 上面开源了 Spark 的 Operator,repo 地址:GoogleCloudPlatform/spark-on-k8s-operator。Operator 部署起来也是非常的方便,使用 Helm Chart 方式部署如下,你可以简单认为就是部署一个 Kubernetes 的 API Object (Deployment)。

$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
$ helm install incubator/sparkoperator --namespace spark-operator

这个 Operator 涉及到的 CRD 如下:

ScheduledSparkApplication
|__ ScheduledSparkApplicationSpec
    |__ SparkApplication
|__ ScheduledSparkApplicationStatus

|__ SparkApplication
|__ SparkApplicationSpec
    |__ DriverSpec
        |__ SparkPodSpec
    |__ ExecutorSpec
        |__ SparkPodSpec
    |__ Dependencies
    |__ MonitoringSpec
        |__ PrometheusSpec
|__ SparkApplicationStatus
    |__ DriverInfo

如果我要提交一个作业,那么我就可以定义如下一个 SparkApplication 的 yaml,关于 yaml 里面的字段含义,可以参考上面的 CRD 文档。

apiVersion: sparkoperator.k8s.io/v1beta1
kind: SparkApplication
metadata:
  ...
spec:
  deps: {}
  driver:
    coreLimit: 200m
    cores: 0.1
    labels:
      version: 2.3.0
    memory: 512m
    serviceAccount: spark
  executor:
    cores: 1
    instances: 1
    labels:
      version: 2.3.0
    memory: 512m
  image: gcr.io/ynli-k8s/spark:v2.4.0
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
  mainClass: org.apache.spark.examples.SparkPi
  mode: cluster
  restartPolicy:
      type: OnFailure
      onFailureRetries: 3
      onFailureRetryInterval: 10
      onSubmissionFailureRetries: 5
      onSubmissionFailureRetryInterval: 20
  type: Scala
status:
  sparkApplicationId: spark-5f4ba921c85ff3f1cb04bef324f9154c9
  applicationState:
    state: COMPLETED
  completionTime: 2018-02-20T23:33:55Z
  driverInfo:
    podName: spark-pi-83ba921c85ff3f1cb04bef324f9154c9-driver
    webUIAddress: 35.192.234.248:31064
    webUIPort: 31064
    webUIServiceName: spark-pi-2402118027-ui-svc
    webUIIngressName: spark-pi-ui-ingress
    webUIIngressAddress: spark-pi.ingress.cluster.com
  executorState:
    spark-pi-83ba921c85ff3f1cb04bef324f9154c9-exec-1: COMPLETED
  LastSubmissionAttemptTime: 2018-02-20T23:32:27Z

提交作业。

$ kubectl apply -f spark-pi.yaml

对比来看 Operator 的作业提交方式似乎显得更加的冗长复杂,但是这也是一种更 kubernetes 化的 api 部署方式,也就是 Declarative API,声明式 API。

4. 挑战

基本上,目前市面的大部门公司都是使用上面两种方式来做 Spark on Kubernetes 的,但是我们也知道在 Spark Core 里面对 Kubernetes 的这种 Native 方式支持其实并不是特别成熟,还有很多可以改善的地方,下面简单举例几个地方:

1.scheduler 差异

资源调度器可以简单分类成集中式资源调度器和两级资源调度器。两级资源调度器有一个中央调度器负责宏观资源调度,对于某个应用的调度则由下面分区资源调度器来做。两级资源调度器对于大规模应用的管理调度往往能有一个良好的支持,比如性能方面,缺点也很明显,实现复杂。其实这种设计思想在很多地方都有应用,比如内存管理里面的 tcmalloc 算法,Go 语言的内存管理实现。大数据的资源调度器 Mesos/Yarn,某种程度上都可以归类为两级资源调度器。

集中式资源调度器对于所有的资源请求进行响应和决策,这在集群规模大了之后难免会导致一个单点瓶颈,毋庸置疑。但是 Kubernetes 的 scheduler 还有一点不同的是,它是一种升级版,一种基于共享状态的集中式资源调度器。Kubernetes 通过将整个集群的资源缓存到 scheduler 本地,在进行资源调度的时候在根据缓存的资源状态来做一个 “乐观” 分配(assume + commit)来实现调度器的高性能。

Kubernetes 的默认调度器在某种程度上并不能很好的 match Spark 的 job 调度需求,对此一种可行的技术方案是再提供一种 custom scheduler 或者直接重写,比如 Spark on Kubernetes Native 方式的参与者之一的大数据公司 Palantir 就开源了他们的 custom scheduler,github repo: https://github.com/palantir/k8s-spark-scheduler

2. Shuffle 处理

由于 Kubernetes 的 Executor Pod 的 Shuffle 数据是存储在 PV 里面,一旦作业失败就需要重新挂载新的 PV 从头开始计算。针对这个问题,Facebook 提出了一种 Remote Shuffle Service 的方案,简单来说就是将 Shuffle 数据写在远端。直观感受上来说写远端怎么可能比写本地快呢?而写在远端的一个好处是 Failover 的时候不需要重新计算,这个特性在作业的数据规模异常大的时候比较有用。

3. 集群规模

基本上现在可以确定的是 Kubernetes 会在集群规模达到五千台的时候出现瓶颈,但是在很早期的时候 Spark 发表论文的时候就声称 Spark Standalone 模式可以支持一万台规模。Kubernetes 的瓶颈主要体现在 master 上,比如用来做元数据存储的基于 raft 一致性协议的 etcd 和 apiserver 等。对此在刚过去的 2019 上海 KubeCon 大会上,阿里巴巴做了一个关于提高 master 性能的 session: 了解 Kubernetes Master 的可扩展性和性能,感兴趣的可以自行了解。

4.Pod 驱逐(Eviction)问题

在 Kubernetes 中,资源分为可压缩资源(比如 CPU)和不可压缩资源(比如内存),当不可压缩资源不足的时候就会将一些 Pod 驱逐出当前 Node 节点。国内某个大厂在使用 Spark on kubernetes 的时候就遇到因为磁盘 IO 不足导致 Spark 作业失败,从而间接导致整个测试集都没有跑出来结果。如何保证 Spark 的作业 Pod (Driver/Executor) 不被驱逐呢?这就涉及到优先级的问题,1.10 之后开始支持。但是说到优先级,有一个不可避免的问题就是如何设置我们的应用的优先级?常规来说,在线应用或者 long-running 应用优先级要高于 batch job,但是显然对于 Spark 作业来说这并不是一种好的方式。

5. 作业日志

Spark on Yarn 的模式下,我们可以将日志进行 aggregation 然后查看,但是在 Kubernetes 中暂时还是只能通过 Pod 的日志查看,这块如果要对接 Kubernetes 生态的话可以考虑使用 fluentd 或者 filebeat 将 Driver 和 Executor Pod 的日志汇总到 ELK 中进行查看。

6.Prometheus 生态

Prometheus 作为 CNCF 毕业的第二个项目,基本是 Kubernetes 监控的标配,目前 Spark 并没有提供 Prometheus Sink。而且 Prometheus 的数据读取方式是 pull 的方式,对于 Spark 中 batch job 并不适合使用 pull 的方式,可能需要引入 Prometheus 的 pushgateway。

5. 结语

被称为云上 OS 的 Kubernetes 是 Cloud Native 理念的一种技术承载与体现,但是如何通过 Kubernetes 来助力大数据应用还是有很多可以探索的地方。欢迎交流。

6. 参考:

  1. https://spark.apache.org/docs/latest/running-on-kubernetes.html
  2. https://medium.com/palantir/spark-scheduling-in-kubernetes-4976333235f3
  3. https://github.com/palantir/k8s-spark-scheduler
  4. Kubecon: 了解 Kubernetes Master 的可扩展性和性能
  5. Kubecon: SIG-Scheduling Deep Dive
  6. GoogleCloudPlatform/spark-on-k8s-operator
  7. https://github.com/operator-framework/awesome-operators
  8. https://github.com/coreos/etcd-operator
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这