本文基于spark2.1进行解析
前言
Spark作为分布式的计算框架可支持多种运行模式:
- 本地运行模式 (单机)
- 本地伪集群运行模式(单机模拟集群)
- Standalone Client模式(集群)
- Standalone Cluster模式(集群)
- YARN Client模式(集群)
- YARN Cluster模式(集群)
而Standalone 作为spark自带cluster manager,需要启动Master和Worker守护进程,本文将从源码角度解析两者的启动流程。Master和Worker之间的通信使用的是基于netty的RPC,Spark的Rpc推荐看深入解析Spark中的RPC。
Master 启动
启动Master是通过脚本start-master.sh启动的,里面实际调用的类是:
org.apache.spark.deploy.master.Master
看看其main方法:
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
// 创建RpcEnv,启动Rpc服务
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
//阻塞等待
rpcEnv.awaitTermination()
}
main方法先获取配置参数创建SparkConf,通过startRpcEnvAndEndpoint启动一个RPCEnv并创建一个Endpoint,调用awaitTermination来阻塞服务端监听请求并且处理。下面细看startRpcEnvAndEndpoint方法:
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
// 创建RpcEnv
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
//通过rpcEnv 创建一个Endpoint
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
首先创建了RpcEnv,RpcEnv是整个Spark RPC的核心所在,RPCEndpoint定义了处理消息的逻辑,被创建后就被RpcEnv所管理,整个生命周期顺序为onStart,receive,onStop,其中receive可以被同时调用,ThreadSafeRpcEndpoint中的receive是线程安全的,同一时刻只能被一个线程访问。
该方法中向rpcEnv 注册的Endpoint是Master(继承了ThreadSafeRpcEndpoint),Master的构造器中创建了保存各种信息的变量。
...
//一个HashSet用于保存WorkerInfo
val workers = new HashSet[WorkerInfo]
//一个HashSet用于保存客户端(SparkSubmit)提交的任务
val apps = new HashSet[ApplicationInfo]
//等待调度的App
val waitingApps = new ArrayBuffer[ApplicationInfo]
//保存DriverInfo
val drivers = new HashSet[DriverInfo]
...
由于Master是一个Endpoint并被RpcEnv管理,需要先执行生命周期的onStart方法:
override def onStart(): Unit = {
...
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
...
}
向线程池中加入了一个线程,每隔WORKER_TIMEOUT_MS(默认60秒)时间去检测是否有Worker超时,其实就是向自己发送了一个CheckForWorkerTimeOut事件,稍后再细讲。
Worker启动
多个节点上的Worker是通过脚本start-slaves.sh启动,底层调用的类是:
org.apache.spark.deploy.worker.Worker
看看其main方法:
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
rpcEnv.awaitTermination()
}
和Master类似,也是先获取配置参数创建SparkConf,接着调用startRpcEnvAndEndpoint启动一个RPCEnv并创建一个Endpoint,调用awaitTermination来阻塞服务端监听请求并且处理。
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
这里是通过new了一个Worker实例来作为Endpoint并注册到RpcEnv中,Worker的构造器中初始化了心跳超时时间为Master端的1/4及其他变量
Worker向Master注册
Worker需要根据生命周期执行onStart()方法:
override def onStart() {
...
registerWithMaster()
...
}
在onStart()方法中调用了registerWithMaster来向Master来注册自己:
private def registerWithMaster() {
// onDisconnected may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
registrationRetryTimer match {
case None =>
// 是否已注册
registered = false
// 尝试向所有Master注册自己
registerMasterFutures = tryRegisterAllMasters()
// 尝试连接次数
connectionAttemptCount = 0
// 网络或者Master故障的时候就需要重新注册自己
// 注册重试次数超过阈值则直接退出
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
registrationRetryTimer第一次调用肯定为None,通过tryRegisterAllMasters向Master注册自己,后面还启动了一个线程在有限次数内去尝试重新注册(网络或者Master出现故障是需要重新注册)。这里先看tryRegisterAllMasters方法是如何向Master注册的:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
这里调用了rpcEnv.setupEndpointRef,RpcEndpointRef 是 RpcEnv 中的 RpcEndpoint 的引用,是一个序列化的实体以便于通过网络传送或保存以供之后使用。一个 RpcEndpointRef 有一个地址和名字。可以调用 RpcEndpointRef 的 send 方法发送异步的单向的消息给对应的 RpcEndpoint 。
这里整段代码意思即是:遍历所有masterRpcAddresses,调用registerWithMaster方法,并传入master端的RpcEndpoint引用RpcEndpointRef ,继续看看registerWithMaster方法:
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
通过RpcEndpointRef 和Master建立通信向Master发送RegisterWorker消息,并带入workerid,host,Port,cores,内存等参数信息,并有成功或者失败的回调函数稍后讲解。
Master 接收Worker注册
在Master中通过receiveAndReply方法处理各种需要回应的事件(单向消息通过receive),对于Worker注册消息RegisterWorker处理逻辑:
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
// 当前Master处于STANDBY
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
// Worker已经注册过了
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
// 根据Worker注册信息为Worker创建WorkerInfo
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
// 持久化记录Worker信息
persistenceEngine.addWorker(worker)
// 向Worker回复注册成功消息
context.reply(RegisteredWorker(self, masterWebUiUrl))
// 有了新的Worker,资源新增,为等待的app进行调度
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
// 向Worker回复注册失败消息
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
- 若当前Master处于STANDBY状态,直接返回MasterInStandby消息
- 若Worker已经注册过了,直接返回RegisterWorkerFailed消息
- 根据Worker注册信息为Worker创建WorkerInfo,调用registerWorker方法进行注册:
- 若注册成功则持久化这个Worker信息,并向Worker回复注册成功消息,另外,多了一个Worker意味着资源的增加会通过schedule()去调度等待调度的apps。
- 若注册失败,则直接向Worker回复注册失败消息。
那是怎么判断是否注册成功呢?跟进registerWorker方法:
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
// 获取新worker的workerAddress
val workerAddress = worker.endpoint.address
if (addressToWorker.contains(workerAddress)) {
// 根据workerAddress 获取以前注册的老Worker
val oldWorker = addressToWorker(workerAddress)
// 若为UNKNOWN则说明是Master 处于recovery,Worker处于恢复中
if (oldWorker.state == WorkerState.UNKNOWN) {
// 移除老Worker,接受新注册的Worker
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
// 跟新变量
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
true
}
遍历所有管理的Worker,若有与新注册的Worker相同的host,port且处于Dead(超时)状态的Worker则直接从workers中移除。若管理的addressToWorker已经存在新注册的Worker一样的workerAddress,则获取老Worker,若状态是UNKNOWN说明Master 处于recovery,Worker正处于恢复中,则将老Worker移除,将新Worker直接加入并成功返回,若老Worker是其他状态则说明已经重复注册了,返回失败。
Worker接收Master注册反馈消息
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
在Worker向Master注册的时候就是调用的这个registerWithMaster方法,后随有回调方法处理结果,通过handleRegisterResponse来处理各种类型的反馈消息:
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
// 成功注册
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
// 标记成功注册
registered = true
// 跟新映射,删除其他的registeration retry
changeMaster(masterRef, masterWebUiUrl)
// 向Master发送心跳
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
...
// 注册失败,直接退出进程
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
// Master不是处于Active的Master,忽略
case MasterInStandby =>
// Ignore. Master not yet ready.
}
}
- 当注册Worker失败收到RegisterWorkerFailed消息,则退出。
- 当注册的Master处于Standby状态,直接忽略。
- 注册Worker成功返回RegisteredWorker消息时,先标记注册成功,然后通过changeMaster更改一些变量(如activeMasterUrl,master,connected等),并删除当前其他正在重试的注册。然后新建了一个task到线程池执行,该线程每隔HEARTBEAT_MILLIS时间向自己发送一个SendHeartbeat消息,在消息处理方法receive里面可看到消息处理方法,即向Master发送心跳:
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
Master 接收心跳
case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
master端获取对应的workerInfo,若有则跟新上次获取心跳时间lastHeartbeat,若没有则向Worker发送需要重新建立连接的消息。
Master 检测Worker心跳超时
另外,由上文可知在Master的生命周期onStart里专门启动了一个线程检查worker是否超时,看看Master是如何处理的:
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
private def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}
遍历所有管理的Worker,若上次心跳时间离现在已经超过超时时间则判断为超时,将从worker列表里移除。