Spark 之SparkContext 源码精读3

Stella981
• 阅读 411

书接上文,SparkContext的CoarseGrainedSchedulerBackend已创建完毕,并且Driver也可以通过DriverEndpoint发消息了。

让咱们再回到CoarseGrainedSchedulerBackend的子类,SparkDeployScheduler.start方法中。至此super.start执行完毕。

继续往下看,获取driverUrl,也就是前述Rpc的地址;

第87行,创建了一个

// SparkDeploySchedulerBackend.scala line 52
override def start() {
  super.start()
  launcherBackend.connect()

  // The endpoint for executors to talk to us
  val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
    RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
    CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
  val args = Seq(
    "--driver-url", driverUrl,
    "--executor-id", "{{EXECUTOR_ID}}",
    "--hostname", "{{HOSTNAME}}",
    "--cores", "{{CORES}}",
    "--app-id", "{{APP_ID}}",
    "--worker-url", "{{WORKER_URL}}")
  val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
    .map(Utils.splitCommandString).getOrElse(Seq.empty)
  val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
    .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
  val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
    .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

  // When testing, expose the parent class path to the child. This is processed by
  // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
  // when the assembly is built with the "*-provided" profiles enabled.
  val testingClassPath =
    if (sys.props.contains("spark.testing")) {
      sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
    } else {
      Nil
    }

  // Start executors with a few necessary configs for registering with the scheduler
  val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
  val javaOpts = sparkJavaOpts ++ extraJavaOpts
  // line 87
  val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
    args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
  val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
  // line 91 
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
    command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
  // line 93 
  client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  client.start()
  launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  waitForRegistration()
  launcherBackend.setState(SparkAppHandle.State.RUNNING)
}

然后就是一些赋值操作

在第81行,特别关键,创建了Command对象,进入Command定义,一看吓一跳啊。第一个参数名是mainClass,顾名思义,就是入口类。

// org.apache.spark.deploy.Command line 22
private[spark] case class Command(
    mainClass: String,
    arguments: Seq[String],
    environment: Map[String, String],
    classPathEntries: Seq[String],
    libraryPathEntries: Seq[String],
    javaOpts: Seq[String]) {
}

先按住兴奋,回到SparkDeploySchedulerBackend.start继续往下看。至91行,将上面创建的Command及maxCores、sc.executorMemory,coresPerExecutors等一些参数,一起创建了ApplicationDescription。感觉里真相越来越近了。继续下去。

第93行,用上面创建的ApplicationDescription及master【Array[String]】等其他参数,一起创建了AppClient。注意,这个构造中,倒数第二个参数是将this传递进去了。

// AppClient.scala line 33
/**
 * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
 * an app description, and a listener for cluster events, and calls back the listener when various
 * events occur.
 *
 * @param masterUrls Each url should look like spark://host:port.
 */
private[spark] class AppClient(
    rpcEnv: RpcEnv,
    masterUrls: Array[String],
    appDescription: ApplicationDescription,
    listener: AppClientListener,
    conf: SparkConf)

注释很清晰的说明了,AppClient担任App和App所部署到的Spark集群沟通的角色。有master url ,app description,监听集群事件的监听器以及与各种监听到的集群的事件对应的回调。其中,倒数第二个参数是:AppClientListener,是一个trait。

// AppClientListener.scala 
/**
 * Callbacks invoked by deploy client when various events happen. There are currently four events:
 * connecting to the cluster, disconnecting, being given an executor, and having an executor
 * removed (either due to failure or due to revocation).
 * 
 * Users of this API should *not* block inside the callback methods.
 */
private[spark] trait AppClientListener {
  def connected(appId: String): Unit

  /** Disconnection may be a temporary state, as we fail over to a new Master. */
  def disconnected(): Unit

  /** An application death is an unrecoverable failure condition. */
  def dead(reason: String): Unit

  def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)

  def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}

注释很清晰的说明了,处理4件事,连接上集群、与集群断开连接、新增一个Executor,一个Executor被移除。

而实际在代码中,还有一个dead回调。是对断开连接的补充,断开连接是临时状态,若故障转移成功,则又回到连接状态,若失败,则dead。

看到这,再看之前传入的参数,是SparkDeploySchedulerBackend将自实例传入,一定是他或他的父类实现了AppClientListener。

// SparkDeploySchedulerBackend.scala 
private[spark] class SparkDeploySchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
  with AppClientListener
  with Logging

果然如此,实现了AppClientListener。

思路回到创建SparkDeploySchedulerBackend的 第94行。执行了AppClient.start

// AppClient.scala line 281
def start() {
  // Just launch an rpcEndpoint; it will call back into the listener.
  endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}

又创建了ClientEndpoint。

// AppClient.scala line 57
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
  with Logging

请回忆前面的内容,是否有点眼熟?没错,Endpoint,实现了ThreadSafeRpcEndpoint。有印象吗?没错,是DriverEndpoint

同样的RpcEndpoint的生命周期是什么?复习下,没错,构造-> onStart ->receive* ->onStop。

了解下onStart,顾名思义,registerWithMaster,向Master注册。进入registerWithMaster瞅瞅。

// AppClient.scala line 85
override def onStart(): Unit = {
  try {
    registerWithMaster(1)
  } catch {
    case e: Exception =>
      logWarning("Failed to connect to master", e)
      markDisconnected()
      stop()
  }
}

进入registerWithMaster瞅瞅。

结合下面的代码和注释,很清晰的知道,Driver以多线程异步的方式向Master注册,一旦任意一个注册成功,则其他注册取消。若注册超时 ${REGISTRATION_TIMEOUT_SECONDS},会重试${REGISTRATION_RETRIES}次。

// AppClient.scala line 125
/**
 * Register with all masters asynchronously. It will call `registerWithMaster` every
 * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
 * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
 *
 * 
 */
private def registerWithMaster(nthRetry: Int) {
  registerMasterFutures.set(tryRegisterAllMasters())
  registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = {
      Utils.tryOrExit {
        if (registered.get) {
          registerMasterFutures.get.foreach(_.cancel(true))
          registerMasterThreadPool.shutdownNow()
        } else if (nthRetry >= REGISTRATION_RETRIES) {
          markDead("All masters are unresponsive! Giving up.")
        } else {
          registerMasterFutures.get.foreach(_.cancel(true))
          registerWithMaster(nthRetry + 1)
        }
      }
    }
  }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}

// AppClient.scala line 96
/**
 *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
 */
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  for (masterAddress <- masterRpcAddresses) yield {
    registerMasterThreadPool.submit(new Runnable {
      override def run(): Unit = try {
        if (registered.get) {
          return
        }
        logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
        val masterRef =
          rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
// line 109
        masterRef.send(RegisterApplication(appDescription, self))
      } catch {
        case ie: InterruptedException => // Cancelled
        case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
      }
    })
  }
}

具体看下消息的发收,AppClient的109 行,向master发送了一个RegisterApplication类型的消息。

// DeployMessages.scala line 108
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
  extends DeployMessage

接收者是Master,回忆前面的RpcEndpoint,接收消息是receive*

// Master.scala line 244
case RegisterApplication(description, driver) => {
  // TODO Prevent repeated registrations from some driver
  if (state == RecoveryState.STANDBY) {
    // ignore, don't send response
  } else {
    logInfo("Registering app " + description.name)
    val app = createApplication(description, driver)
// line 251
    registerApplication(app)
    logInfo("Registered app " + description.name + " with ID " + app.id)
// line 253
    persistenceEngine.addApplication(app)
    driver.send(RegisteredApplication(app.id, self))
    schedule()
  }
}

将Application注册->持久化信息以备恢复->发一个RegisteredApplication消息给Driver->资源调度。

// Master.scala line 795
private def registerApplication(app: ApplicationInfo): Unit = {
  val appAddress = app.driver.address
  if (addressToApp.contains(appAddress)) {
    logInfo("Attempted to re-register application at same address: " + appAddress)
    return
  }
  applicationMetricsSystem.registerSource(app.appSource)
  apps += app
  idToApp(app.id) = app
  endpointToApp(app.driver) = app
  addressToApp(appAddress) = app
  waitingApps += app
}

简单的数据设置。master持久化driver后续会细细分析。

持久化成功后,Master会发送一条RegisteredApplication类型的消息给Driver,理解为你让我办的事,妥了。

// DeployMessages.scala line 121
case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage

Driver接收到消息

// AppClient.scala line 160
case RegisteredApplication(appId_, masterRef) =>
  appId.set(appId_)
  registered.set(true)
  master = Some(masterRef)
  listener.connected(appId.get)

Driver收到消息后,简单的做了些设置。理解为 "哦"。

至此,Driver向Master注册已经成功。

其实,本篇博客另一个关键的信息在于,展示了部署类消息都在 org.apache.spark.deploy.DeployMessages.scala。可以通过不同类型的消息,寻找对应的发送者和接收者,将流程贯穿起来。

如有理解错误,请大家回复我修改。谢谢。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这