书接上文,SparkContext的CoarseGrainedSchedulerBackend已创建完毕,并且Driver也可以通过DriverEndpoint发消息了。
让咱们再回到CoarseGrainedSchedulerBackend的子类,SparkDeployScheduler.start方法中。至此super.start执行完毕。
继续往下看,获取driverUrl,也就是前述Rpc的地址;
第87行,创建了一个
// SparkDeploySchedulerBackend.scala line 52
override def start() {
super.start()
launcherBackend.connect()
// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
// line 87
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
// line 91
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
// line 93
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
然后就是一些赋值操作
在第81行,特别关键,创建了Command对象,进入Command定义,一看吓一跳啊。第一个参数名是mainClass,顾名思义,就是入口类。
// org.apache.spark.deploy.Command line 22
private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
javaOpts: Seq[String]) {
}
先按住兴奋,回到SparkDeploySchedulerBackend.start继续往下看。至91行,将上面创建的Command及maxCores、sc.executorMemory,coresPerExecutors等一些参数,一起创建了ApplicationDescription。感觉里真相越来越近了。继续下去。
第93行,用上面创建的ApplicationDescription及master【Array[String]】等其他参数,一起创建了AppClient。注意,这个构造中,倒数第二个参数是将this传递进去了。
// AppClient.scala line 33
/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
* an app description, and a listener for cluster events, and calls back the listener when various
* events occur.
*
* @param masterUrls Each url should look like spark://host:port.
*/
private[spark] class AppClient(
rpcEnv: RpcEnv,
masterUrls: Array[String],
appDescription: ApplicationDescription,
listener: AppClientListener,
conf: SparkConf)
注释很清晰的说明了,AppClient担任App和App所部署到的Spark集群沟通的角色。有master url ,app description,监听集群事件的监听器以及与各种监听到的集群的事件对应的回调。其中,倒数第二个参数是:AppClientListener,是一个trait。
// AppClientListener.scala
/**
* Callbacks invoked by deploy client when various events happen. There are currently four events:
* connecting to the cluster, disconnecting, being given an executor, and having an executor
* removed (either due to failure or due to revocation).
*
* Users of this API should *not* block inside the callback methods.
*/
private[spark] trait AppClientListener {
def connected(appId: String): Unit
/** Disconnection may be a temporary state, as we fail over to a new Master. */
def disconnected(): Unit
/** An application death is an unrecoverable failure condition. */
def dead(reason: String): Unit
def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
注释很清晰的说明了,处理4件事,连接上集群、与集群断开连接、新增一个Executor,一个Executor被移除。
而实际在代码中,还有一个dead回调。是对断开连接的补充,断开连接是临时状态,若故障转移成功,则又回到连接状态,若失败,则dead。
看到这,再看之前传入的参数,是SparkDeploySchedulerBackend将自实例传入,一定是他或他的父类实现了AppClientListener。
// SparkDeploySchedulerBackend.scala
private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with AppClientListener
with Logging
果然如此,实现了AppClientListener。
思路回到创建SparkDeploySchedulerBackend的 第94行。执行了AppClient.start
// AppClient.scala line 281
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
又创建了ClientEndpoint。
// AppClient.scala line 57
private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint
with Logging
请回忆前面的内容,是否有点眼熟?没错,Endpoint,实现了ThreadSafeRpcEndpoint。有印象吗?没错,是DriverEndpoint。
同样的RpcEndpoint的生命周期是什么?复习下,没错,构造-> onStart ->receive* ->onStop。
了解下onStart,顾名思义,registerWithMaster,向Master注册。进入registerWithMaster瞅瞅。
// AppClient.scala line 85
override def onStart(): Unit = {
try {
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
进入registerWithMaster瞅瞅。
结合下面的代码和注释,很清晰的知道,Driver以多线程异步的方式向Master注册,一旦任意一个注册成功,则其他注册取消。若注册超时 ${REGISTRATION_TIMEOUT_SECONDS},会重试${REGISTRATION_RETRIES}次。
// AppClient.scala line 125
/**
* Register with all masters asynchronously. It will call `registerWithMaster` every
* REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
* Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
*
*
*/
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
// AppClient.scala line 96
/**
* Register with all masters asynchronously and returns an array `Future`s for cancellation.
*/
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
// line 109
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
具体看下消息的发收,AppClient的109 行,向master发送了一个RegisterApplication类型的消息。
// DeployMessages.scala line 108
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
extends DeployMessage
接收者是Master,回忆前面的RpcEndpoint,接收消息是receive*
// Master.scala line 244
case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
// line 251
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
// line 253
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
}
将Application注册->持久化信息以备恢复->发一个RegisteredApplication消息给Driver->资源调度。
// Master.scala line 795
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
}
简单的数据设置。master持久化driver后续会细细分析。
持久化成功后,Master会发送一条RegisteredApplication类型的消息给Driver,理解为你让我办的事,妥了。
// DeployMessages.scala line 121
case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
Driver接收到消息
// AppClient.scala line 160
case RegisteredApplication(appId_, masterRef) =>
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
Driver收到消息后,简单的做了些设置。理解为 "哦"。
至此,Driver向Master注册已经成功。
其实,本篇博客另一个关键的信息在于,展示了部署类消息都在 org.apache.spark.deploy.DeployMessages.scala。可以通过不同类型的消息,寻找对应的发送者和接收者,将流程贯穿起来。
如有理解错误,请大家回复我修改。谢谢。