基于spark1.6
创建完SparkContext,然后执行Action算子
当RDD执行Action算子时(形成一个job),会将代码提交到Master上运行,
例如wordcount的action 算子 collect方法 def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
sc是SparkContext对象,上面 runJob 如下
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
........................
}
//该方法调用多次重载的方法后,最终会调用dagScheduler的runJob,形成和切分stage
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
。。。。。。。
//dagScheduler出现了,可以切分stage
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
dagScheduler的runJob 是我们比较关心的
def runJob[T, U: ClassTag](
。。。。。
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
}
这里面的我们主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)提交任务,括号里面的是任务信息
def submitJob[T, U](。。。): JobWaiter[U] = {
//在这儿才封装任务提交事件,把该事件对象加入到任务队列里面 eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
}
JobSubmitted:// 封装job事件对象,放入DAGScheduler阻塞的事件队列,例如:任务id,数据RDD,fun,jobId(可见一个action就是一个job)
从队列中取出事件对象,调用 onReceive方法,即调用子类 DAGSchedulerEventProcessLoop 的onReceive方法,该方法的匹配模式如下:
(1)先生成finalStage。
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
//调用dagScheduler来出来提交任务
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)
调用了handleJobSubmitted方法,接下来查看该方法
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite, listener: JobListener,
properties: Properties) {
var finalStage: Stage = null
//最终的stage,从后往前划分
finalStage = newResultStage(finalRDD, partitions.size, None, jobId, callSite)
。。。。
submitStage(finalStage)
// 提交其他正在等待的stage
submitWaitingStages() }
}
/**
* 创建一个 ResultStage ,形成有向无环图
*/
private def newResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
//下面这个函数会生成我们的DAG,需重点关注
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage = new ResultStage(id,rdd, func, partitions,parentStages, jobId, callSite)
stageIdToStage(id) = stage //将Stage的id放入stageIdToStage结构中。
updateJobIdStageIdMaps(jobId, stage) //更新JobIdStageIdMaps
stage
}
}
上面的代码中,调用了newResultStage方法,该方法是划分任务的核心方法,任务划分是根据最后一个依赖关系作为开始,通过递归,将每个宽依赖做为切分Stage的依据,切分Stage的过程是流程中的一环(详见 day29_spark-源码-Stage划分算法,并最终得到了DAG图中的Result Stage(final Stage)),但在这里不详细阐述,当任务切分完毕后,代码继续执行来到submitStage(finalStage)这里开始进行任务提交
(2)提交resultStage
//提交Stage,如果有未提交的ParentStage,则会递归提交这些ParentStage,只有所有ParentStage都计算完了,才能提交当前Stage
private def submitStage(stage: Stage) { // 此stage是 result stage
// 根据stage获取jobId
val jobId = activeJobForStage(stage) //查找该Stage的所有激活的job
if (jobId.isDefined) { // jobId 存在就执行,如果不存在就停止
// 记录Debug日志信息:submitStage(stage)
logDebug("submitStage(" + stage + ")")
//如果当前Stage没有在等待parent Stage的返回,也不是正在运行的Stage,并且也没有提 示提交失败,说明未处理,那么我们就尝试提交Stage
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//得到还未执行的父 stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) { //如果没有父 Stage
//当前stage 拆分成task,形成taskSet 并提交
submitMissingTasks(stage, jobId.get) // 注意这个stage会是两种类型 1、shufflerMapStage 2、resultStage
} else {
//有父Stage没进行计算,就递归提交这些父Stage
for (parent <- missing) { // 该stage的所有父stage
submitStage(parent)// 递归调用本身
}
waitingStages += stage
}
}
} else {//无效作业,停止它。
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
********************getMissingParentStages方法如下****************
针对 stage的执行要记住2个判断点 1、getmissingParentStages()方法为核心方法。这里我们要懂得这样一个逻辑:我们都知道,Stage是通过shuffle划分的,所以,每一Stage都是以shuffle开始的,若一个RDD是宽依赖,则必然说明该RDD的父RDD在另一个Stage中,若一个RDD是窄依赖,则该RDD所依赖的父RDD还在同一个Stage中,我们可以根据这个逻辑,找到该Stage的父Stage。
// DAGScheduler.scala
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage] //用于存放父Stage
val visited = new HashSet[RDD[_]] //用于存放已访问过的RDD
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) { //如果RDD没有被访问过,则进行访问
visited += rdd //添加到已访问RDD的HashSet中
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) { //获取该RDD的依赖
dep match {
case shufDep: ShuffleDependency[_, _, _] =>//若为宽依赖,则该RDD依赖的RDD所在的stage必为父stage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)//生成父Stage
if (!mapStage.isAvailable) {//若父Stage task没有完全执行,则添加到父Stage的HashSET中
missing += mapStage // 如果是宽依赖,那么就表示找到了,不存在宽依赖前还有宽依赖
}
case narrowDep: NarrowDependency[_] =>//若为窄依赖,则需要再判断,其父有无宽依赖
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {//循环遍历所有RDD
visit(waitingForVisit.pop())
}
missing.toList
}
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
针对 stage的执行要记住2个判断点 2、每当执行完一个Task会对变量_numAvailableOutputs加1,直至所有Task执行完,_numAvailableOutputs等于分区数。
(3)提交MissingTask
stage根据 parition 拆分成task(决定每个Task的最佳位置)生成TaskSet,并提交到TaskScheduler
private def submitMissingTasks(stage: Stage, jobId: Int) {
//首先根据stage所依赖的RDD的partition的分布,会产生出与partition数量相等的task
var tasks = ArrayBuffer[Task[_]]()
//对于resultStage或是shufflerMapStage会产生不同的task。
//检查该stage时是否ShuffleMapStage,如果是则生成ShuffleMapTask
if (stage.isShuffleMapStage) { //生成ShuffleMapStage
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
//task根据partition的locality进行分布
val locs = getPreferredLocs(stage.rdd, p)
tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
} else { //resultStage:该类型stage直接输出结果生成ResultTask
val job = resultStageToJob(stage)
for (id <- 0 until job.numPartitions if !job.finished(id)) {
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
//由于是ResultTask,因此需要传入定义的func,也就是处理结果返回
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
//向TaskSchuduler提交任务,以stage为单位,一个stage对应一个TaskSet
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
}
Task任务调度
taskScheduler.submitTasks方法比较重要,主要将任务加入调度池(taskschduler 创建时初始一个调度池),最后调用了CoarseGrainedSchedulerBackend.reviveOffers()
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
this.synchronized {
//将TaskSet 封装成TaskSetManger val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
//用 schedulableBuilder去添加TaskSetManager到队列中
//schedulableBuilder有两种形态:FIFOSchedulableBuilder: 单一pool ,FairSchedulableBuilder: 多个pool schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
。。。。。。。。。。。
//在TaskSchedulerImpl在submitTasks添加TaskSetManager到pool后,调用了 backend.reviveOffers()
//fifo 直接将可调度对象TaskSetManager加入SchedulerQueue的尾端。
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}
override def reviveOffers() {
//自己给自己发消息(告诉它我要提交task)
driverActor ! ReviveOffers }
这里用了内部的DriverActor对象发送了一个内部消息给自己,接下来查看receiver方法接受的消息
收到消息后调用了makeOffers()方法
case ReviveOffers =>
makeOffers()
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
makeOffers方法中,将Executor的信息集合与调度池中的Tasks封装成WokerOffers,调用
launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
。。。。。。
//把task序列化
val serializedTask = ser.serialize(task)
。。。。。
//向executor进程 发送创建TaskRunner(extends Runnable)
val executorData = executorDataMap(task.executorId)(这是之前注册过了的)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//把序列化好的task发送给Executor
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
会由CoarseGrainedSchedulerBackend来接受执行指令,内部封装DriverActor
launchTasks方法将遍历Tasks集合,每个Task任务序列化,发送启动Task执行消息的给Executor