Spark Streaming源码解读之Driver容错安全性

从数据层面,ReceivedBlockTracker为整个SparkStreaming应用程序记录元数据信息。

从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。

ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然后在写入内存中。

ReceivedBlockTracker:

/** Add received block. This event will get written to the write ahead log (if enabled). */

defaddBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

try {

val writeResult =writeToLog(BlockAdditionEvent(receivedBlockInfo))

if (writeResult) {

synchronized {

getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

}

logDebug(s"Stream ${receivedBlockInfo.streamId} received " +

s"block ${receivedBlockInfo.blockStoreResult.blockId}")

} else {

logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +

s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

}

writeResult

} catch {

case NonFatal(e) =>

logError(s"Error adding block $receivedBlockInfo", e)

false

}

}

ReceivedBlockTracker:

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

// 为分配的ReceivedBlock

private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]

// 已分配的ReceivedBlock

private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]

private val writeAheadLogOption = createWriteAheadLog()

根据batchTime分配属于当前BatchDuration要处理的数据到timToAllocatedBlocks数据结构中。

ReceivedBlockTracker:

/**

* Allocate all unallocated blocks to the given batch.

* This event will get written to the write ahead log (if enabled).

*/

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {

val streamIdToBlocks = streamIds.map { streamId =>

(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))

}.toMap

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

lastAllocatedBatchTime = batchTime

} else {

...

Time类的是一个case Class,记录时间,重载了操作符,隐式转换。

case class Time(private val millis: Long) {

def milliseconds: Long = millis

def < (that: Time): Boolean = (this.millis < that.millis)

def <= (that: Time): Boolean = (this.millis <= that.millis)

def > (that: Time): Boolean = (this.millis > that.millis)

def >= (that: Time): Boolean = (this.millis >= that.millis)

def + (that: Duration): Time = new Time(millis + that.milliseconds)

def - (that: Time): Duration = new Duration(millis - that.millis)

def - (that: Duration): Time = new Time(millis - that.milliseconds)

// Java-friendlier versions of the above.

def less(that: Time): Boolean = this < that

def lessEq(that: Time): Boolean = this <= that

def greater(that: Time): Boolean = this > that

def greaterEq(that: Time): Boolean = this >= that

def plus(that: Duration): Time = this + that

def minus(that: Time): Duration = this - that

def minus(that: Duration): Time = this - that

def floor(that: Duration): Time = {

val t = that.milliseconds

new Time((this.millis / t) * t)

}

def floor(that: Duration, zeroTime: Time): Time = {

val t = that.milliseconds

new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)

}

def isMultipleOf(that: Duration): Boolean =

(this.millis % that.milliseconds == 0)

def min(that: Time): Time = if (this < that) this else that

def max(that: Time): Time = if (this > that) this else that

def until(that: Time, interval: Duration): Seq[Time] = {

(this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))

}

def to(that: Time, interval: Duration): Seq[Time] = {

(this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))

}

override def toString: String = (millis.toString + " ms")

}

object Time {

implicit val ordering = Ordering.by((time: Time) => time.millis)

}

跟踪Time对象,ReceiverTracker的allocateBlocksToBatch方法中的入参batchTime是被JobGenerator的generateJobs方法调用的。

ReceiverTracker:

/** Allocate all unallocated blocks to the given batch. */

def allocateBlocksToBatch(batchTime: Time): Unit = {

if (receiverInputStreams.nonEmpty) {

receivedBlockTracker.allocateBlocksToBatch(batchTime)

}

}

JobGenerator的generateJobs方法是被定时器发送GenerateJobs消息调用的。

JobGenerator:

/** Generate jobs and perform checkpoint for the given `time`.  */

private defgenerateJobs(time: Time) {

// Set the SparkEnv in this thread, so that job generation code can access the environment

// Example: BlockRDDs are created in this thread, and it needs to access BlockManager

// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

SparkEnv.set(ssc.env)

Try {

jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

graph.generateJobs(time) // generate jobs using allocated block

} match {

case Success(jobs) =>

val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

case Failure(e) =>

jobScheduler.reportError("Error generating jobs for time " + time, e)

}

eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

JobGenerator:

/** Processes all events */

private def processEvent(event: JobGeneratorEvent) {

logDebug("Got event " + event)

event match {

caseGenerateJobs(time) =>generateJobs(time)

case ClearMetadata(time) => clearMetadata(time)

case DoCheckpoint(time, clearCheckpointDataLater) =>

doCheckpoint(time, clearCheckpointDataLater)

case ClearCheckpointData(time) => clearCheckpointData(time)

}

}

JobGenerator:

private val timer = newRecurringTimer(clock, ssc.graph.batchDuration.milliseconds,

longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

GenerateJobs中的时间参数就是nextTime,而nextTime+=period,这个period就是ssc.graph.batchDuration.milliseconds。

RecurringTimer:

private def triggerActionForNextInterval(): Unit = {

clock.waitTillTime(nextTime)

callback(nextTime)

prevTime = nextTime

nextTime += period

logDebug("Callback for " + name + " called at time " + prevTime)

}

nextTime的初始值是在start方法中传入的startTime赋值的,即RecurringTimer的getStartTime方法的返回值,是当前时间period的(整数倍+1)。

RecurringTimer:

/**

* Start at the given start time.

*/

def start(startTime: Long): Long = synchronized {

nextTime = startTime

thread.start()

logInfo("Started timer for " + name + " at time " + nextTime)

nextTime

}

JobGenerator:

/** Starts the generator for the first time */

private def startFirstTime() {

val startTime = newTime(timer.getStartTime())

graph.start(startTime - graph.batchDuration)

timer.start(startTime.milliseconds)

logInfo("Started JobGenerator at " + startTime)

}

RecurringTimer:

/**

* Get the time when this timer will fire if it is started right now.

* The time will be a multiple of this timer's period and more than

* current system time.

*/

def getStartTime(): Long = {

(math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period

}

Period这个值是我们调用new StreamingContext来构造StreamingContext时传入的Duration值。

DStreamGraph:

def setBatchDuration(duration: Duration) {

this.synchronized {

require(batchDuration == null,

s"Batch duration already set as $batchDuration. Cannot set it again.")

batchDuration = duration

}

}

StreamingContext:

private[streaming] val graph: DStreamGraph = {

if (isCheckpointPresent) {

cp_.graph.setContext(this)

cp_.graph.restoreCheckpointData()

cp_.graph

} else {

require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")

val newGraph = new DStreamGraph()

newGraph.setBatchDuration(batchDur_)

newGraph

}

}

ReceivedBlockTracker会清除过期的元数据信息,从HashMap中移除,也是先写入磁盘,然后在写入内存。

StreamingContext:

class StreamingContext private[streaming] (

sc_ : SparkContext,

cp_ : Checkpoint,

batchDur_ : Duration

) extends Logging {

/**

* Create a StreamingContext using an existing SparkContext.

* @param sparkContext existing SparkContext

* @param batchDuration the time interval at which streaming data will be divided into batches

*/

def this(sparkContext: SparkContext, batchDuration: Duration) = {

this(sparkContext, null, batchDuration)

}

元数据的生成,消费和销毁都有WAL,所以失败时就可以从日志中恢复。从源码分析中得出只有设置了checkpoint目录,才进行WAL机制。

ReceiverTracker:

class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {

private val receiverInputStreams = ssc.graph.getReceiverInputStreams()

private val receiverInputStreamIds = receiverInputStreams.map { _.id }

private val receivedBlockTracker = newReceivedBlockTracker(

ssc.sparkContext.conf,

ssc.sparkContext.hadoopConfiguration,

receiverInputStreamIds,

ssc.scheduler.clock,

ssc.isCheckpointPresent,

Option(ssc.checkpointDir)

)

private val listenerBus = ssc.scheduler.listenerBus

对传入的checkpoint目录来创建日志目录进行WAL。

ReceivedBlockTracker:

/** Optionally create the write ahead log manager only if the feature is enabled */

private defcreateWriteAheadLog(): Option[WriteAheadLog] = {

checkpointDirOption.map { checkpointDir =>

val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)

}

}

这里是在checkpoint目录下创建文件夹名为receivedBlockMetadata的文件夹来保存WAL记录的数据。

ReceivedBlockTracker:

private[streaming] object ReceivedBlockTracker {

def checkpointDirToLogDir(checkpointDir: String): String = {

new Path(checkpointDir, "receivedBlockMetadata").toString

}

}

ReceivedBlockTracker:

/** Write an update to the tracker to the write ahead log */

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

if (isWriteAheadLogEnabled) {

logTrace(s"Writing record: $record")

try {

writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

clock.getTimeMillis())

true

} catch {

case NonFatal(e) =>

logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)

false

}

} else {

true

}

}

把当前的DStream和JobGenerator的状态进行checkpoint,该方法是在generateJobs方法最后通过发送DoCheckpoint消息,来调用的。

JobGenerator:

/** Perform checkpoint for the give `time`. */

private defdoCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

logInfo("Checkpointing graph for time " + time)

ssc.graph.updateCheckpointData(time)

checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

}

}

JobGenerator:

/** Processes all events */

private def processEvent(event: JobGeneratorEvent) {

logDebug("Got event " + event)

event match {

case GenerateJobs(time) => generateJobs(time)

case ClearMetadata(time) => clearMetadata(time)

case DoCheckpoint(time, clearCheckpointDataLater) =>

doCheckpoint(time, clearCheckpointDataLater)

case ClearCheckpointData(time) => clearCheckpointData(time)

}

}

JobGenerator:

/** Generate jobs and perform checkpoint for the given `time`.  */

private defgenerateJobs(time: Time) {

// Set the SparkEnv in this thread, so that job generation code can access the environment

// Example: BlockRDDs are created in this thread, and it needs to access BlockManager

// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

SparkEnv.set(ssc.env)

Try {

jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

graph.generateJobs(time) // generate jobs using allocated block

} match {

case Success(jobs) =>

val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

case Failure(e) =>

jobScheduler.reportError("Error generating jobs for time " + time, e)

}

eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

总结:

ReceivedBlockTracker是通过WAL方式来进行数据容错的。

DStreamGraph和JobGenerator是通过checkpoint方式来进行数据容错的。


备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341

推荐阅读更多精彩内容