1. 从word count程序开始
代码如下:
代码1
def main(args:Array[String]){
val sparkConf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile("README.md",1)
val words = lines.flatMap(line => line.split(" "))
val wordOne = words.map(word => (word,1))
val wordCount = wordOne.reduceByKey(_ + _)
wordCount.foreach(println)
1.1 创建SparkConf
SparkConf持有spark的运行时的配置信息,不给SparkConf构造方法提供参数表示采用默认参数,会调用下面代码加载默认配置(加载所有以spark.开始java环境变量)。
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
SparkConf 内部使用private val settings = new ConcurrentHashMap[String, String]()
保存了配置name到value的映射。
1.2 创建SparkContext
SparkContext是基于RDD的编程模型的核心,也是开发spark 应用的入口:从数据源创建RDD,job运行都会使用到SparkContext。在SparkStreaming和SparkSQL编程中,会先创建StreamingContext(SparkStreaming)和SQLContext,实际上它们内部都持有SparkContext,毕竟streaming和sql最终都会转换成RDD上的操作。由于SparkContext实例化时会涉及到多个组件的创建,这里只会大概讲一下SparkContext,后面会细讲每一个组件的工作。
SparkContext提供的基本方法
上面代码块1中val lines = sc.textFile("README.md",1)
,从文件README.md
创建只有一个partition的RDD,这是SparkContext提供的基本功能,从数据源创建RDD。
SparkContext提供了一系列从各种数据源创建RDD的方法,比如 hadoop,text,sequenceFile以及集合等等。
此外SparkContext还提供了一些runJob方法提交作业,代码1中wordCount.foreach(println)
这种action操作,触发job的提交,就是通过调用SparkContext#runJob来提交的,当然最终是通过DAGScheduler组件来实现job的提交(参考Spark job提交)
SparkContext初始化流程
- 创建JobProgressListener
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
JobProgressListener注册到listenerBus上,spark集群中job状态变化,stage变化,executor变化等信息都会被注册的XXXListener处理,这个JobProgreeListener顾名思义是用来追踪job执行过程中的信息,这些信息给Spark UI用的。ListenerBus可以注册多个listener,投送到listenerBus上的事件(Event)会被每一个listener处理。
- 创建SparkEnv
SparkEnv是一个很重要的类,它持有一个Spark job运行期间存储块管理(BlockManager),任务状态管理(MapOutputTracker),数据块传输服务(BlockTransferService),各Task输出中间文件管理等等服务。当创建一个Spark job并提交到各个executor上运行时,SparkEnv会在driver端和executor端创建,但是driver端和executor端具体实例化过程不一样,简单的说,上面提到的SparkEnv持有的各种服务,基本都是一个server-client的模型,driver端运行rpc server,executor运行rpc client,比如说executor想要汇报任务的状态,会通过rpc client发起调用汇报到driver端。关于spark的rpc可以参考Spark rpc实现。
SparkEnv会在下一节讲到。
- 创建TaskScheduler和SchedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
SchedulerBackend持有一些executor的信息(executorId,所在host,分配的cpu数量等),TaskScheduler和SchedulerBackend合作根据一定规则将task发送到executor上运行,关于TaskScheduler和SchedulerBackend具体参考.
- 创建DAGScheduler
_dagScheduler = new DAGScheduler(this)
一个job对应一个RDD转换形成的DAG,当job提交时,会首先由DAGScheduler讲DAG转换成Stage,再由stage生成task,task经TaskScheduler提交,DAGScheduler参考spark job提交
- 创建并启动ExecutorAllocationManager
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
它会在job运行期间监控task的运行进度判断需不需要动态扩展executor,由配置项spark.dynamicAllocation.enabled
决定,为true且当前spark部署模式不是local情况下起作用。
1.2.1 SparkEnv
???????????????????