前言
本篇文章主要是从作业提交到最后获取到作业结果,
从源码的角度,
但是不涉及源码进行的分析.
其目的是读完本篇文章,
你将对作业的基本流程有个清晰的认识。
当然如果你阅读过源码,
那么读起来应该会比较舒服,
否则可能会有一定不适,
因为本文写的不是那么有逻辑~~~
1.任务提交过程
- 首先,我们知道,一个action算子是触发一个job生成的地方,
当遇见action算子,会执行sparkcontext的runjob方法,
最后会交给dagSchedule的submitjob,
这里会创建一个jobwaiter对象,并发送一个JobSubmitted消息进行作业任务的执行,
同时 waiter.awaitResult() 会等待作业执行结果的返回:成功或者失败。
到这里,我们对于作业应该有个基本的认识了,
那么接下来我们再来深入一点,这个作业submit之后发生了什么呢?
2.划 分 调 度 阶 段
spark是资源调度是粗粒度的,我们这里不讨论资源申请,
当我们提交一个任务之后(此时资源应该都是在集群中申请好了),
Spark首先会对我们的作业任务划分调度阶段,
而这个调度阶段的划分是由 DAGScheduler 负责的,
其调度是基于stage的,那么下面我们看看stage是怎么划分的。一个application中的rdd集合相互依赖形成了一个依赖树,
DAGScheduler 通过其 getParentStages 方法会从最后一个 finalrdd 开始,
判断整颗树中RDD之间的依赖是否有 宽依赖ShuffleDependency,
如果没有,就只生成一个stage,
如果有,调用 getAncestorShuffleDepend,使用广度优先遍历整个依赖树,
当遇到 ShuffleDependency 的时候,就会通过newOrUsedShuffleStag生成一个个stage,
并划分为两个调度阶段,这样一个job也就被划分成了一个或者多个stage了。到这里我们的作业已经被划分成了一个个stage了,
接下来就看看stage是怎么被提交的吧。。。
3.提 交 调 度 阶 段
前面我们提到了JobSubmitted消息,
那么这个消息实际上会触发 DAGScheduler 的 handleJobSubmitted 方法,
首先该方法会在生成 finalStage 的同时建立起所有调度阶段的依赖关系(至于怎么建立的,我们后面慢慢深入),
然后通过 fmalStage 生成一个作业实例ActiveJob,
然后在submitStage(finalStage)开始提交作业。在作业提交调度阶段开始时,
在 submitStage 方法中调用 getMissingParentStages 方法获取finalStage 父调度阶段,
如果不存在父调度阶段,则使用 submitMissingTasks(stage) 方法提交执行;
如果存在父调度阶段,则把该调度阶段存放到 waitingStages 列表中,
同时递归调用 submitStage,
直到找到没有父stage的stage调用 submitMissingTasks(stage),
将该阶段提交去执行。
这样一次调度任务就发送到Excutor开始执行了。当Excutor的task执行完成时发通知消息 CompleteEvent,
会调用到 DAGschedule的handleTaskCompletion更新状态,
并且判断该 task 所属的 stage 是否所有任务都已经完成,
如果完成,则扫描等待运行调度阶段列表,检查它们的父调度阶段是否存在未完成,
如果不存在则表明该调度阶段准备就绪,生成实例并提交运行。(至于其中失败重试的机制不做讨论)到此,stage提交的基本情况我们已经了解,
但是对于一个了解spark的人来说,我们熟悉的task还没有出现,
接下来,我们就来看看stage的task的执行流程吧。
4.提 交 任 务
前面我们说到提交 stage 的方法 submitStage 进行Stage 的提交,
该方法内部会调用到 DAGScheduler 的 submitMissingTasks 方法对每个stage 的 task 进行提交,
其task生成规则如下:
首先根据每个 stage 最后一个rdd的 Partition 个数拆分对应个数的 task ,
这些 task 组成一个任务集 taskset 提交到 TaskScheduler 进行处理。
对于 ResultStage (作业中最后的stage)生 成 ResultTask ,
对 于 ShuffleMapStage 生成 ShuffleMapTask 。当 TaskScheduler 收到发送过来的任务集时,
在 submitTasks 方法中(在 TaskSchedulerlmpl 类中进行实现),
构建一个 TaskSetManager 的实例,用于管理这个任务集的生命周期,
并通过 schedulableBuilder 的 addTaskSetManager 放入系统的调度池中,进行调度。接下来就是将调度池中待调度的任务发往Excutor。
通过调用 SchedulerBackend 的 reviveOffers ,
向 DriverEndPoint 终端点发送 ReviveOffers 消息,
会触发 SchedulerBackend 的 makeOffers 方法,
开始进行资源的检查和分配
该方法首先会获取集群中可用的 Executor ,
并通过 TaskSchedulerlmpl 的 resourceOffers 按照就近原则对进行资源的分配,
并划分 PROCESS _ LOCAL、 NODE LOCAL、 NO PREF 、 RACK_LOCAL和 ANY 五个等级。
这时候每个Task就知道自己将要去往的Excutor在哪里了,
可以直接进行 launchtask 操作,
这样就把分配好资源的 task 一个个发送到 Worker 节点上的 CoarseGrainedExecutorBackend ,
真正将任务提交到了 Excutor。至此,我们的 task 算是正式提交到excutor准备执行了。
5.执 行 任 务
当 CoarseGrainedExecutorBackend(excutor的守护进程) 接收到 LaunchTask 消息时,
会调用 Executor 的 launchTask 方法进行处理。
在 Executor 的 launchTask 方法中,
初始化一个 TaskRunner 来封装任务,
它用于管理任务运行时的细节,
再把 TaskRumier 对象放入到 ThreadPool (线程池)中去执行。
在 TaskRunner 的 run 方法里,
首先会对发送过来的 Task 本身以及它所依赖的 Jar 等文件的反序列,
然后对反序列化的任务调用 Task 的 runTask 方法。
由于 Task 本身是一个抽象类,
具体的 TaskRunner 方法是由它的两个子类 ShuffleMapTask 和 RedultTask 来实现的。对于 ShuffleMapTask 而言,它的计算结果会写到 BlockManager 之中,
最终返回给 DAGScheduler 的是一个 MapStatus 对象。
该对象中保存着了 ShuffleMapTask 的运算结果存储到BlockManager 里的相关存储信息,
而不是计算结果本身,
这些存储信息将会成为下一阶段的任务需要获得的输入数据时的依据。对于 ResultTask 的 runTask 方法而言,
它最终返回的是最后的计算结果。至此,task计算结束,下面我们看看计算的结果是怎么处理的。
6.获 取 执 行 结 果
Excutor 端对结果数据进行处理,
根据处理的Task类型不一样是有不一样的处理方式ShuffleTask
将结果封装成一个 MatStatus 对象,
该 MatStatus 会记录结果的位置信息 和 文件大小,
Driver端的 TaskSchedule 会将 MatStatus 注册到
MapOutputTrackerMaster的 mapStatuses 中进行保存,
当下游的 stage 需要数据的时候,
由其MapOutputTrackerWorker向MapOutputTrackerMaster 查找,
以获取其所需要处理数据的信息。
同时也需要判断该 Stage 的 task 是否已经全部完成,
如果完成,那么将开始下一轮的Stage任务。
ResultTask
如果是使用了类似 Collect 等需要将数据拉回Driver端的算子,
则需要根据结果的大小有不同的策略。(1) 生成结果大小大于1GB结果直接丢弃,
该配置项可以通过 spark . driver.maxResultSize进行设置。(2) 生成结果大小在[128 MB -200 KB,1 GB] :
如果生成的结果大于等于(128 MB -200 KB )时,
会把该结果以taskld 为编号存入到 BlockManager 中,
然后把该编号通过 Netty 发送给 Driver终端点,
该阈值是 Netty 框架传输的最大值spark.akka.frameSize
(默认为128 MB)和 Netty 的预留空间 reservedSizeBytes (200 KB ) 差值。(3) 生成结果大小在(0 , 128 MB -200 KB):
通过 Netty 直接发送到 Driver 终端点。对于(2)的结果,Diver 端收到一个 IndirectTaskResult 的结果,
需要通过 sparkEnv.blockManager.getRemoteBytes(blockld)来获取结果;对于(3)的结果,Diver 端收到一个 DirectTaskResult ,
那么结果就无需远程获取了。如果不需要拉回Driver端
其结果则直接是我们所写的算子决定的,
只需要通知Driver端,
Diver 端会查看当前作业的Task是不是全部完成,
如果完成,
那么作业也就完成了,
Driver 会清除作业依赖的资源,
并发送消息给系统监听总线告知作业执行完毕。
以上是成功消息的处理,如果是失败的任务,
并且在 TaskSchedulerImpl 重试 3 次后还是失败,
那么会 将消息失败的任务通知 DAGScheduler ,
DAGScheduler 会对整个 Stage 进行4次重试,
如果还是失败,那么整个任务就失败了
总结
当我们提交一个job,
首先会被 DAGScheduler 通过宽窄依赖解析成一个个 stage,
然后按顺序将 stage 以 taskset 的形式提交给 TaskScheduler ,
TaskScheduler 将 taskset 构建成 TaskSetManager 对象管理,
并按照调度系统给定的策略向 Executor 提交任务,
Executor 将接受的到 task 以 taskrunner 的方式执行计算出结果,
并储存到 BlockManager ,
然后向 TaskScheduler 返回一个记录了结果信息的MapStatus对象,
并注册到 driver 端的 MapOutputTrackerMaster,
然后进行下一轮的 stage 调度 (如果是ResultTask执行结果,那么数据是我们算子决定了他最后会落地在哪的)
本文同步分享在 博客“code_solve”(JianShu)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。