Spark源码阅读 (一) - Spark 初始化

1. 从word count程序开始

代码如下:

代码1
    def main(args:Array[String]){
        val sparkConf = new SparkConf().setAppName("WordCount")
        val sc = new SparkContext(sparkConf)
        val lines = sc.textFile("README.md",1)
        val words = lines.flatMap(line => line.split(" "))
        val wordOne = words.map(word => (word,1))
        val wordCount = wordOne.reduceByKey(_ + _)
        wordCount.foreach(println)

1.1 创建SparkConf

SparkConf持有spark的运行时的配置信息,不给SparkConf构造方法提供参数表示采用默认参数,会调用下面代码加载默认配置(加载所有以spark.开始java环境变量)。

 if (loadDefaults) {
     loadFromSystemProperties(false)
 }
    
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
           // Load any spark.* system properties
           for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
             set(key, value, silent)
           }
           this
         }

SparkConf 内部使用private val settings = new ConcurrentHashMap[String, String]()保存了配置name到value的映射。

1.2 创建SparkContext

SparkContext是基于RDD的编程模型的核心,也是开发spark 应用的入口:从数据源创建RDD,job运行都会使用到SparkContext。在SparkStreaming和SparkSQL编程中,会先创建StreamingContext(SparkStreaming)和SQLContext,实际上它们内部都持有SparkContext,毕竟streaming和sql最终都会转换成RDD上的操作。由于SparkContext实例化时会涉及到多个组件的创建,这里只会大概讲一下SparkContext,后面会细讲每一个组件的工作。

SparkContext提供的基本方法
上面代码块1中val lines = sc.textFile("README.md",1),从文件README.md创建只有一个partition的RDD,这是SparkContext提供的基本功能,从数据源创建RDD。

SparkContext提供了一系列从各种数据源创建RDD的方法,比如 hadoop,text,sequenceFile以及集合等等。

此外SparkContext还提供了一些runJob方法提交作业,代码1中wordCount.foreach(println)这种action操作,触发job的提交,就是通过调用SparkContext#runJob来提交的,当然最终是通过DAGScheduler组件来实现job的提交(参考Spark job提交

SparkContext初始化流程

  1. 创建JobProgressListener
 _jobProgressListener = new JobProgressListener(_conf)
 listenerBus.addListener(jobProgressListener)

JobProgressListener注册到listenerBus上,spark集群中job状态变化,stage变化,executor变化等信息都会被注册的XXXListener处理,这个JobProgreeListener顾名思义是用来追踪job执行过程中的信息,这些信息给Spark UI用的。ListenerBus可以注册多个listener,投送到listenerBus上的事件(Event)会被每一个listener处理。

  1. 创建SparkEnv
    SparkEnv是一个很重要的类,它持有一个Spark job运行期间存储块管理(BlockManager),任务状态管理(MapOutputTracker),数据块传输服务(BlockTransferService),各Task输出中间文件管理等等服务。当创建一个Spark job并提交到各个executor上运行时,SparkEnv会在driver端和executor端创建,但是driver端和executor端具体实例化过程不一样,简单的说,上面提到的SparkEnv持有的各种服务,基本都是一个server-client的模型,driver端运行rpc server,executor运行rpc client,比如说executor想要汇报任务的状态,会通过rpc client发起调用汇报到driver端。关于spark的rpc可以参考Spark rpc实现

SparkEnv会在下一节讲到。

  1. 创建TaskScheduler和SchedulerBackend
 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts

SchedulerBackend持有一些executor的信息(executorId,所在host,分配的cpu数量等),TaskScheduler和SchedulerBackend合作根据一定规则将task发送到executor上运行,关于TaskScheduler和SchedulerBackend具体参考.

  1. 创建DAGScheduler
    _dagScheduler = new DAGScheduler(this)

一个job对应一个RDD转换形成的DAG,当job提交时,会首先由DAGScheduler讲DAG转换成Stage,再由stage生成task,task经TaskScheduler提交,DAGScheduler参考spark job提交

  1. 创建并启动ExecutorAllocationManager
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        schedulerBackend match {
          case b: ExecutorAllocationClient =>
            Some(new ExecutorAllocationManager(
              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
          case _ =>
            None
        }
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

它会在job运行期间监控task的运行进度判断需不需要动态扩展executor,由配置项spark.dynamicAllocation.enabled决定,为true且当前spark部署模式不是local情况下起作用。

1.2.1 SparkEnv

???????????????????

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容