Spark Streaming 初始化过程分析

—————☼—————☼—————☼—————☼—————☼—————
Spark Streaming概述
Spark Streaming 初始化过程
Spark Streaming Receiver启动过程分析
Spark Streaming 数据准备阶段分析(Receiver方式)
Spark Streaming 数据计算阶段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 机制分析

—————☼—————☼—————☼—————☼—————☼—————

Spark Streaming是一种构建在Spark上的实时计算框架。Spark Streaming应用以Spark应用的方式提交到Spark平台,其组件以长期批处理任务的形式在Spark平台运行。这些任务主要负责接收实时数据流及定期产生批作业并提交至Spark集群,本文要说明的是以下几个功能模块运行前的准备工作。

  • 数据接收
  • Job 生成
  • 流量控制
  • 动态资源伸缩

下面我们以WordCount程序为例分析Spark Streaming运行环境的初始化过程。

val conf = new SparkConf().setAppName("wordCount").setMaster("local[4]") 
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(10)) 
val lines = ssc.socketTextStream("localhost", 8585, StorageLevel.MEMORY_ONLY) 
val words = lines.flatMap(_.split(" ")).map(w => (w,1)) 
val wordCount = words.reduceByKey(_+_) 
wordCount.print 
ssc.start()
ssc.awaitTermination()

以下流程,皆以上述WordCount源码为例。

1、StreamingContext的初始化过程

StreamingContext是Spark Streaming应用的执行环境,其定义很多Streaming功能的入口,如:它提供从多种数据源创建DStream的方法等。
在创建Streaming应用时,首先应创建StreamingContext(WordCount应用可知),伴随StreamingContext的创建将会创建以下主要组件:

1.1 DStreamGraph

DStreamGraph的主要功能是记录InputDStream及OutputStream及从InputDStream中抽取出ReceiverInputStreams。因为DStream之间的依赖关系类似于RDD,并在任务执行时转换成RDD,因此,可以认为DStream Graph与RDD Graph存在对应关系. 即:DStreamGraph以批处理间隔为周期转换成RDDGraph.

  • ReceiverInputStreams: 包含用于接收数据的Receiver信息,并在启动Receiver时提供相关信息
  • OutputStream:每个OutputStream会在批作业生成时,生成一个Job.

1.2 JobScheduler

JobScheduler是Spark Streaming中最核心的组件,其负载Streaming各功作组件的启动。

  • 数据接收
  • Job 生成
  • 流量控制
  • 动态资源伸缩
    以及负责生成的批Job的调度及状态管理工作。

2、 DStream的创建与转换

StreamingContext初始化完毕后,通过调用其提供的创建InputDStream的方法创建SocketInputDStream.

SocketInputDStream的继承关系为:
SocketInputDStream->ReceiverInputDStream->InputDStream->DStream.
在InputDStream中 提供如下功

 ssc.graph.addInputStream(this)

JAVA中初始化子类时,会先初始化其父类。所以在创建SocketInputDStream时,会先初始化InputDStream,在InputDStream中实现将自身加入DStreamGraph中,以标识其为输入数据源。
DStream中算子的转换,类似于RDD中的转换,都是延迟计算,仅形成pipeline链。当上述应用遇到print(Output算子)时,会将DStream转换为ForEachDStream,并调register方法作为OutputStream注册到DStreamGraph的outputStreams列表,以待生成Job。
print算子实现方法如下:

/**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
 def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   * @param foreachFunc foreachRDD function
   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
   *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
   *                           only the scopes and callsites of `foreachRDD` will override those
   *                           of the RDDs on the display.
   */
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

ForEachDStream 不同于其它DStream的地方为其重写了generateJob方法,以使DStream Graph操作转换成RDD Graph操作,并生成Job.

3、SparkContext启动

/**
   * Start the execution of the streams.
   *
   * @throws IllegalStateException if the StreamingContext is already stopped.
   */
  def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        ......
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

在此方法中,最核心的代码是以线程的方式启动JobScheduler,从而开启各功能组件。

3.1 JobScheduler的启动

JobScheduler主要负责以下几种任务:

  • 数据接收相关组件的初始化及启动
    ReceiverTracker的初始化及启动。ReceiverTracker负责管理Receiver,包括Receiver的启停,状态维护 等。
  • Job生成相关组件的启动
    JobGenerator的启动。JobGenerator负责以BatchInterval为周期生成Job.
  • Streaming监听的注册与启动
  • 作业监听
  • 反压机制
    BackPressure机制,通过RateController控制数据摄取速率。
  • Executor DynamicAllocation 的启动
    Executor 动态伸缩管理, 动态增加或减少Executor,来达到使用系统稳定运行 或减少资源开销的目的。
  • Job的调度及状态维护。

JobScheduler的start方法的代码如下所示:

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }

    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

代码中存在的 eventLoop: EventLoop[JobSchedulerEvent]对象,用以接收和处理事件。调用者通过调用其post方法向事件队列注册事件。EventLoop开始执行时,会开启一deamon线程用于处理队列中的事件。EventLoop是一个抽象类,JobScheduler中初始化EventLoop时实现了其OnReceive方法。该方法中指定接收的事件由processEvent(event)方法处理。

小结

JobScheduler是Spark Streaming中核心的组件,在其开始执行时,会开启数据接收相关组件及Job生成相关组件,从而使数据准备和数据计算两个流程开始工作。
另外,其还负责BackPressure, Executor DynamicAllocation 等优化机制的启动工作。
下面的章节,将对数据准备和数据计算阶段的流程进行分析,以及BackPressure, Executor DynamicAllocation 机制进行分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容