我们知道我们经常启动在Spark启动时, 会去调用sbin/start-all.sh脚本,这个脚本实际上是执行了spark-config.sh, start-master.sh, start-slaves.sh, spark-config.sh没什么看的,就是设置一些spark环境变量,主要看后面两个,可知Master启动在Worker之前。
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
# Load the Spark configuration
. "${SPARK_HOME}/sbin/spark-config.sh"
# Start Master
"${SPARK_HOME}/sbin"/start-master.sh
# Start Workers
"${SPARK_HOME}/sbin"/start-slaves.sh
start-master.sh脚本主要完成了一下工作:设置默认SPARK-MASTER-PORT为7077,默认WEBUI port 8080,然后执行spark-daemon.sh中的start方法,
nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null
command=$1即org.apache.spark.deploy.master.Master,$@为之前start-master.sh调用spark-daemon.sh的所有参数。sbin/spark-daemon.sh->bin/spark-class
最终执行的是
spark-daemon.sh start org.apache.spark.deploy.master.Master
1 Master Worker启动
首先看看Master的main方法:这个里面主要的就是一个方法startRpcEnvAndEndpoint,它是启动Master端消息通信框架的代码:
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
//这句master终端点send a message to the corresponding [[RpcEndpoint]],这个RpcEndpoint就是Master
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
上面创建了消息通信框架使用的RpcEnv, 终端店MasterEndpoint
之后Master会在receiveAndReply方法进行回复
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
}
当调用Master的构造器时new Master此时会执行RPCEndpoint的一系列方法
Master本身是个RpcEndpoint, RpcEndpoint的执行顺序是如下Spark代码所述
/**
- An end point for the RPC that defines what functions to trigger given a message.
- It is guaranteed that
onStart
,receive
andonStop
will be called in sequence. - The life-cycle of an endpoint is:
- constructor -> onStart -> receive* -> onStop
- Note:
receive
can be called concurrently. If you wantreceive
to be thread-safe, please use - [[ThreadSafeRpcEndpoint]]
- If any error is thrown from one of [[RpcEndpoint]] methods except
onError
,onError
will be - invoked with the cause. If
onError
throws an error, [[RpcEnv]] will ignore it.
*/
先执行构造器, 然后调用onStart, 在然后是receive, 最后是onStop
意味着Worker启动时即可以向Master发送信息。
之后Worker启动时,也是调用的main方法, 也会调用startRpcEnvAndEndpoint方法,Worker实际上也是一个RpcEndpoint, 这样Worker也可以发可以收。
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
创建了通信环境rpcEnv和终端店Worker End point(rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
2 Spark Master Worker启动消息通信
其流程图如下
Spark启动时主要是进行Master与Worker节点的通信, 一开始Master与Worker执行完Main方法后将会创建消息通信环境RpcEnv和终端店RpcEndpoint,之后worker便会向Master发送注册信息, 然后Master接收到请求并处理完之后,则会返回注册成功或者注册失败信息给Worker, 如果注册成功,那么Worker便会定时发送心跳包给Master,让Master能够检测Worker是否状态良好。
(1)当Worker执行构造器时, 会接着调用onStart,里面会有注册Worker的方法registerWithMaster
override def onStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
registerWithMaster() //向Master进行注册Worker
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
private def registerWithMaster() {
// onDisconnected may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
registrationRetryTimer match {
case None =>
registered = false
registerMasterFutures = tryRegisterAllMasters() //真正注册的方法
connectionAttemptCount = 0
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
可以看到此时会转向调用tryRegisterAllMasters。之所以是tryRegisterAllMasters, 是因为SparkStandalone模式下可能存在HA master, 通过ZK来实现master的高可用,解决Master的单点问题,HA模式会有一个master出于Active状态一个出于StandBy状态。
在tryRegisterAllMasters方法中会先创建线程池registerMasterThreadPool.原因是向Master注册是一个阻塞的Action, 这个线程池需要同时创建masterRpcAddresses.size个线程进而实现向所有的master进行注册。其中线程池的名字是worker-register-master-threadpool,线程池大小是masterRpcAddresses.length。
其注册流程简而言之就是获取Master的终端点引用,接着调用registerWithMaster方法,根据Master终端点引用的send方法发送注册RegisterWorker消息供Master的receiver接收。
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"worker-register-master-threadpool",
masterRpcAddresses.length // Make sure we can register with all masters at the same time
)
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
可以看到调用了ask方法, 看看ask方法是怎么回事:
此处的ask方法实际上是RpcEndpointRef的方法, 这里实际是Master的终端点引用,因为Master的终端点继承自它。
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within a default timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
可以知道它向终端点RpcEndpoint发送消息(此处就是Master终端点), Master终端点RpcEndpoint会通过receiveAndReply方法接收并处理消息, 此方法会会返回一个Future方法来让Worker来接受Master RpcEndpoint的返回
(2)待Worker发送注册消息之后, 我们去看Master的receiveAndReply方法,
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
此处Master会处理workerid, workerhost, workerPort, workRef终端点引用,worker的cores, memory,worker的workerWebUiUrl地址
里面的逻辑大概是首先判断Master是不是standBy,是的话返回MasterInStandby,在检查idToWorker是否注册过该Worker,不能重复注册。如果以上两种情况均未发生,则会去注册worker, 创建WorkerInfo封住该Worker的具体信息,然后调用registerWorker(worker)方法:
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
val workerAddress = worker.endpoint.address
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
true
}
大意是判断是否有DEAD的worker,如果有则删除, 其次判断是否已经包含此workAddress了,如果有是否状态为UNKNOWN,若是则表明old worker dead, 删除之接收新的worker,最终将新的worker加入idToWorker和addressToWorker中。
这些操作执行完毕后,会执行 context.reply(RegisteredWorker(self, masterWebUiUrl))向Worker返回信息, worker在线程池submit返回的Future的OnCompletef方法中处理
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
(3)当Worker接收到注册成功后,会定时发送心跳message Heartbeat给Master, 方便Master能够实时了解到Worker的状态,间隔时间在spark.worker.timeout中设置,
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
registered = true
changeMaster(masterRef, masterWebUiUrl)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
case MasterInStandby =>
// Ignore. Master not yet ready.
}
}
Worker收到注册成功后会先设置registered = true表明注册成功,然后更新Master信息, 记录此Worker现在注册给哪个Master,之后就会启动定时任务发送心跳, 同时Worker还会向Master汇报Worker中Executor的最新状态如每个Executor的对应处理的appid, executor本身id,executer使用的cores, executor的状态以及Driver的信息.
val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
可能你们会关注Worker封装了什么信息,就是WorkerInfo里面的那个信息。
private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
val webUiAddress: String)
extends Serializable {
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
@transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
@transient var memoryUsed: Int = _
@transient var lastHeartbeat: Long = _
init()
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
private def init() {
executors = new mutable.HashMap
drivers = new mutable.HashMap
state = WorkerState.ALIVE
coresUsed = 0
memoryUsed = 0
lastHeartbeat = System.currentTimeMillis()
}
def hostPort: String = {
assert (port > 0)
host + ":" + port
}
def addExecutor(exec: ExecutorDesc) {
executors(exec.fullId) = exec
coresUsed += exec.cores
memoryUsed += exec.memory
}
def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.fullId)) {
executors -= exec.fullId
coresUsed -= exec.cores
memoryUsed -= exec.memory
}
}
def hasExecutor(app: ApplicationInfo): Boolean = {
executors.values.exists(_.application == app)
}
def addDriver(driver: DriverInfo) {
drivers(driver.id) = driver
memoryUsed += driver.desc.mem
coresUsed += driver.desc.cores
}
def removeDriver(driver: DriverInfo) {
drivers -= driver.id
memoryUsed -= driver.desc.mem
coresUsed -= driver.desc.cores
}
def setState(state: WorkerState.Value): Unit = {
this.state = state
}
def isAlive(): Boolean = this.state == WorkerState.ALIVE
}
Master端对心跳包的处理
case Heartbeat(workerId, worker) =>
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
Master对收到的Worker的executor信息的处理:
case WorkerLatestState(workerId, executors, driverIds) =>
idToWorker.get(workerId) match {
case Some(worker) =>
for (exec <- executors) {
val executorMatches = worker.executors.exists {
case (_, e) => e.application.id == exec.appId && e.id == exec.execId
}
if (!executorMatches) {
// master doesn't recognize this executor. So just tell worker to kill it.
worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
}
}
for (driverId <- driverIds) {
val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
if (!driverMatches) {
// master doesn't recognize this driver. So just tell worker to kill it.
worker.endpoint.send(KillDriver(driverId))
}
}
case None =>
logWarning("Worker state from unknown worker: " + workerId)
}
(4)Master在收到Worker注册请求,返回注册成功之后还会执行一步骤:
schedule()
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
意思是当新加入了Worker节点,获取所有可用的Alive的Worker, 查看是否有waiting的App没有分到资源的, 有的话遍历这个waitingDrivers(对应wainting状态的APP), 根据内存和核数是否满足if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) 来判断是否launch driver, 然后就调用startExecutorsOnWorkers()来启动Worker的Executors,其图形如下:
Msater Worker启动时通信就说明完了, 再往后就是运行时消息通信篇