8 Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

本文内容以NetworkWordCount代码为例,代码如下

objectNetworkWordCount { defmain(args:Array[String]) { if objectNetworkWordCount {

  defmain(args:Array[String]) {
    if (args.length< 2) {
      System.err.println("Usage: NetworkWordCount<hostname> <port>")
      System.exit(1)
    }

    val sparkConf= newSparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val ssc = newStreamingContext(sparkConf,Seconds(1))
    val lines= ssc.socketTextStream(args(0), args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
    val words= lines.flatMap(_.split(""))
    val wordCounts= words.map(x => (x,1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 从ssc.socketTextStream开始,我们一步一步研究Dstream的依赖关系。ssc.socketTextStream代码如下
def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

第一步调用socketStream方法生成SocketInputDStream,代码如下

def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

val lines= ssc.socketTextStream返回一个lines的Dstream
第二步执行flatMap操作,生成一个FlatMappedDStream

def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}

返回一个words Dstrem
第三步执行map操作,返回一个MappedDStream

def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
}

然后执行reduceByKey操作

def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
    reduceByKey(reduceFunc, defaultPartitioner())
}

接着还是调用另一个reduceByKey

def reduceByKey(
  reduceFunc: (V, V) => V,
  partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner)
}

然后调用combineByKey

def combineByKey[C: ClassTag](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiner: (C, C) => C,
  partitioner: Partitioner,
  mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
val cleanedCreateCombiner = sparkContext.clean(createCombiner)
val cleanedMergeValue = sparkContext.clean(mergeValue)
val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
new ShuffledDStream[K, V, C](
  self,
  cleanedCreateCombiner,
  cleanedMergeValue,
  cleanedMergeCombiner,
  partitioner,
  mapSideCombine)
}

返回的是一个ShuffledDStream。以上所有操作形成了一个Dstream的依赖关系,Dstream的依赖关系其实就是RDD依赖关系的模板,他Spark core中的RDD操作保持高度的一致性,下面是一张运行时的依赖关系图


  1. 看到这里依赖关系已经清楚了,可是RDD还没有出现。接着看wordCounts.print(),从println方法进入,一路跟踪到new ForEachDStream,ForEachDStream中重新了generateJob方法,那generateJob的调用是在JobGenerator中的graph.generateJobs(time),具体调用参考第六编 job的动态生成。generateJob代码如下
override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
      foreachFunc(rdd, time)
      }
      Some(new Job(time, jobFunc))
      case None => None
    }
}

看parent.getOrCompute(time)方法是怎样返回RDD的

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
  // Compute the RDD if time is valid (e.g. correct time in a sliding window)
  // of RDD generation, else generate nothing.
  if (isTimeValid(time)) {

    val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
      // Disable checks for existing output directories in jobs launched by the streaming
      // scheduler, since we may need to write output to an existing directory during checkpoint
      // recovery; see SPARK-4835 for more details. We need to have this call here because
      // compute() might cause Spark jobs to be launched.
      PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
        compute(time)
      }
    }

    rddOption.foreach { case newRDD =>
      // Register the generated RDD for caching and checkpointing
      if (storageLevel != StorageLevel.NONE) {
        newRDD.persist(storageLevel)
        logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
      }
      if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
        newRDD.checkpoint()
        logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
      }
      generatedRDDs.put(time, newRDD)
    }
    rddOption
  } else {
    None
  }
}
}

首先从generatedRDDs中获取RDD,generatedRDDs代码如下

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDD就是一个以时间为key,RDD为value的HashMap。这里的Time和应用程序的batchDuration对齐,而RDD就是每一个job最后的一个RDD,因为RDD有依赖关系,所以保存最后一个RDD就可以回溯到所有的RDD。这里调用compute(time)方法,compute(time)有子类去实现。
回到最开始的ssc.socketTextStream,SocketInputDStream被实例化,SocketInputDStream断承自ReceiverInputDStream,看ReceiverInputDStream的computer方法

override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {

  if (validTime < graph.startTime) {
    // If this is called for any time before the start time of the context,
    // then this returns an empty RDD. This may happen when recovering from a
    // driver failure without any write ahead log to recover pre-failure data.
    new BlockRDD[T](ssc.sc, Array.empty)
  } else {
    // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
    // for this batch
    val receiverTracker = ssc.scheduler.receiverTracker
    val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

    // Register the input blocks information into InputInfoTracker
    val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    // Create the BlockRDD
    createBlockRDD(validTime, blockInfos)
  }
}
Some(blockRDD)
}

if内代码是为了容错,看else中的代码,关键的一行代码 createBlockRDD(validTime, blockInfos),返回了一个blockRDD,blockInfos是通过ReceiverTracker获取到receiver接收数据的元数据信息,看一下createBlockRDD的代码

private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {

    if (blockInfos.nonEmpty) {
      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

      // Are WAL record handles present with all the blocks
      val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

      if (areWALRecordHandlesPresent) {
        // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
        val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
        val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
      } else {
        // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
        // others then that is unexpected and log a warning accordingly.
        if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
          if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
            logError("Some blocks do not have Write Ahead Log information; " +
              "this is unexpected and data may not be recoverable after driver failures")
          } else {
            logWarning("Some blocks have Write Ahead Log information; this is unexpected")
          }
        }
        val validBlockIds = blockIds.filter { id =>
          ssc.sparkContext.env.blockManager.master.contains(id)
        }
        if (validBlockIds.size != blockIds.size) {
          logWarning("Some blocks could not be recovered as they were not found in memory. " +
            "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
            "for more details.")
        }
        new BlockRDD[T](ssc.sc, validBlockIds)
      }
    } else {
      // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
      // according to the configuration
      if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, Array.empty, Array.empty, Array.empty)
      } else {
        new BlockRDD[T](ssc.sc, Array.empty)
      }
    }
}

首先判断blockInfos是否为空,有如没有元数据就生成的BlockRDD传递了一个空的Array,代表了blockIds(相当于数据块指针)为空。
如果有数据先判断WAL,WAL以后再看,直接看new BlockRDD[T](ssc.sc, validBlockIds),BlockRDD的代码如下

private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
      val index = idx
    }
    
    private[spark]
    class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
      extends RDD[T](sc, Nil) {
    
      @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
      @volatile private var _isValid = true
    
      override def getPartitions: Array[Partition] = {
        assertValid()
        (0 until blockIds.length).map(i => {
          new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
        }).toArray
      }
    
      override def compute(split: Partition, context: TaskContext): Iterator[T] = {
        assertValid()
        val blockManager = SparkEnv.get.blockManager
        val blockId = split.asInstanceOf[BlockRDDPartition].blockId
        blockManager.get(blockId) match {
          case Some(block) => block.data.asInstanceOf[Iterator[T]]
          case None =>
            throw new Exception("Could not compute split, block " + blockId + " not found")
        }
      }
    
      override def getPreferredLocations(split: Partition): Seq[String] = {
        assertValid()
        _locations(split.asInstanceOf[BlockRDDPartition].blockId)
      }
    
      /**
       * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
       * irreversible operation, as the data in the blocks cannot be recovered back
       * once removed. Use it with caution.
       */
      private[spark] def removeBlocks() {
        blockIds.foreach { blockId =>
          sparkContext.env.blockManager.master.removeBlock(blockId)
        }
        _isValid = false
      }
    
      /**
       * Whether this BlockRDD is actually usable. This will be false if the data blocks have been
       * removed using `this.removeBlocks`.
       */
      private[spark] def isValid: Boolean = {
        _isValid
      }
    
      /** Check if this BlockRDD is valid. If not valid, exception is thrown. */
      private[spark] def assertValid() {
        if (!isValid) {
          throw new SparkException(
            "Attempted to use %s after its blocks have been removed!".format(toString))
        }
      }
    
      protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
        _locations
      }
}ocations
  }
}

这里的compute返回了一个迭代器,和从hdfs上读取数据是一样的,都是将数据块索引封装到迭代器中,等到触发action操作时被调用。

  1. 再次看一下这两行代码的操作
 val words= lines.flatMap(_.split(""))
 val wordCounts= words.map(x => (x,1)).reduceByKey(_ + _)

首先看FlatMappedDStream的compute方法,

override def compute(validTime: Time): Option[RDD[U]] = {  
     parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}

从父Dstream中获取到RDD(就是blockRDD),然后执行flatMap操作返回Option[RDD[U]]。
然后看MappedDStream的computer方法

override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}

也是从父Dstream中获取的RDD然后执行map操作,返回 Option[RDD[U]]。
ShuffledRDD的操作也是这种方式。
从Dstream的依赖关系上看,就是RDD的依赖关系。所以说Dstream就是RDD的模板。看一下打印出来的RDD依赖关系

(2) ShuffledRDD[4] at reduceByKey at NetworkWordCount.scala:25 []
 +-(0) MapPartitionsRDD[3] at map at NetworkWordCount.scala:25 []
    |  MapPartitionsRDD[2] at flatMap at NetworkWordCount.scala:24 []
    |  BlockRDD[1] at socketTextStream at NetworkWordCount.scala:21 []

至于job的提交是调度器完成的,所以RDD的action触发就是job动态生成中讲的Job中的def run() { _result = Try(func())}的执行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容