本文内容以NetworkWordCount代码为例,代码如下
objectNetworkWordCount { defmain(args:Array[String]) { if objectNetworkWordCount {
defmain(args:Array[String]) {
if (args.length< 2) {
System.err.println("Usage: NetworkWordCount<hostname> <port>")
System.exit(1)
}
val sparkConf= newSparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = newStreamingContext(sparkConf,Seconds(1))
val lines= ssc.socketTextStream(args(0), args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
val words= lines.flatMap(_.split(""))
val wordCounts= words.map(x => (x,1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
- 从ssc.socketTextStream开始,我们一步一步研究Dstream的依赖关系。ssc.socketTextStream代码如下
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
第一步调用socketStream方法生成SocketInputDStream,代码如下
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
val lines= ssc.socketTextStream返回一个lines的Dstream
第二步执行flatMap操作,生成一个FlatMappedDStream
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
返回一个words Dstrem
第三步执行map操作,返回一个MappedDStream
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
然后执行reduceByKey操作
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
reduceByKey(reduceFunc, defaultPartitioner())
}
接着还是调用另一个reduceByKey
def reduceByKey(
reduceFunc: (V, V) => V,
partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
combineByKey((v: V) => v, reduceFunc, reduceFunc, partitioner)
}
然后调用combineByKey
def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
val cleanedCreateCombiner = sparkContext.clean(createCombiner)
val cleanedMergeValue = sparkContext.clean(mergeValue)
val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
new ShuffledDStream[K, V, C](
self,
cleanedCreateCombiner,
cleanedMergeValue,
cleanedMergeCombiner,
partitioner,
mapSideCombine)
}
返回的是一个ShuffledDStream。以上所有操作形成了一个Dstream的依赖关系,Dstream的依赖关系其实就是RDD依赖关系的模板,他Spark core中的RDD操作保持高度的一致性,下面是一张运行时的依赖关系图
- 看到这里依赖关系已经清楚了,可是RDD还没有出现。接着看wordCounts.print(),从println方法进入,一路跟踪到new ForEachDStream,ForEachDStream中重新了generateJob方法,那generateJob的调用是在JobGenerator中的graph.generateJobs(time),具体调用参考第六编 job的动态生成。generateJob代码如下
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
看parent.getOrCompute(time)方法是怎样返回RDD的
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach { case newRDD =>
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
}
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}
首先从generatedRDDs中获取RDD,generatedRDDs代码如下
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
generatedRDD就是一个以时间为key,RDD为value的HashMap。这里的Time和应用程序的batchDuration对齐,而RDD就是每一个job最后的一个RDD,因为RDD有依赖关系,所以保存最后一个RDD就可以回溯到所有的RDD。这里调用compute(time)方法,compute(time)有子类去实现。
回到最开始的ssc.socketTextStream,SocketInputDStream被实例化,SocketInputDStream断承自ReceiverInputDStream,看ReceiverInputDStream的computer方法
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
if内代码是为了容错,看else中的代码,关键的一行代码 createBlockRDD(validTime, blockInfos),返回了一个blockRDD,blockInfos是通过ReceiverTracker获取到receiver接收数据的元数据信息,看一下createBlockRDD的代码
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
if (blockInfos.nonEmpty) {
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
val validBlockIds = blockIds.filter { id =>
ssc.sparkContext.env.blockManager.master.contains(id)
}
if (validBlockIds.size != blockIds.size) {
logWarning("Some blocks could not be recovered as they were not found in memory. " +
"To prevent such data loss, enabled Write Ahead Log (see programming guide " +
"for more details.")
}
new BlockRDD[T](ssc.sc, validBlockIds)
}
} else {
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
// according to the configuration
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
} else {
new BlockRDD[T](ssc.sc, Array.empty)
}
}
}
首先判断blockInfos是否为空,有如没有元数据就生成的BlockRDD传递了一个空的Array,代表了blockIds(相当于数据块指针)为空。
如果有数据先判断WAL,WAL以后再看,直接看new BlockRDD[T](ssc.sc, validBlockIds),BlockRDD的代码如下
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
val index = idx
}
private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@volatile private var _isValid = true
override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.length).map(i => {
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}).toArray
}
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
assertValid()
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Could not compute split, block " + blockId + " not found")
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
assertValid()
_locations(split.asInstanceOf[BlockRDDPartition].blockId)
}
/**
* Remove the data blocks that this BlockRDD is made from. NOTE: This is an
* irreversible operation, as the data in the blocks cannot be recovered back
* once removed. Use it with caution.
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
/**
* Whether this BlockRDD is actually usable. This will be false if the data blocks have been
* removed using `this.removeBlocks`.
*/
private[spark] def isValid: Boolean = {
_isValid
}
/** Check if this BlockRDD is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
if (!isValid) {
throw new SparkException(
"Attempted to use %s after its blocks have been removed!".format(toString))
}
}
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
_locations
}
}ocations
}
}
这里的compute返回了一个迭代器,和从hdfs上读取数据是一样的,都是将数据块索引封装到迭代器中,等到触发action操作时被调用。
- 再次看一下这两行代码的操作
val words= lines.flatMap(_.split(""))
val wordCounts= words.map(x => (x,1)).reduceByKey(_ + _)
首先看FlatMappedDStream的compute方法,
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
从父Dstream中获取到RDD(就是blockRDD),然后执行flatMap操作返回Option[RDD[U]]。
然后看MappedDStream的computer方法
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
也是从父Dstream中获取的RDD然后执行map操作,返回 Option[RDD[U]]。
ShuffledRDD的操作也是这种方式。
从Dstream的依赖关系上看,就是RDD的依赖关系。所以说Dstream就是RDD的模板。看一下打印出来的RDD依赖关系
(2) ShuffledRDD[4] at reduceByKey at NetworkWordCount.scala:25 []
+-(0) MapPartitionsRDD[3] at map at NetworkWordCount.scala:25 []
| MapPartitionsRDD[2] at flatMap at NetworkWordCount.scala:24 []
| BlockRDD[1] at socketTextStream at NetworkWordCount.scala:21 []
至于job的提交是调度器完成的,所以RDD的action触发就是job动态生成中讲的Job中的def run() { _result = Try(func())}的执行。