Spark Streaming一致性、容错机制分析

Spark Streaming容错机制保障

参考https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html这篇文章,Spark Streaming主要有三处做了数据容错机制:

  • Reciever测:
    • WriteAheadLogBasedStoreResult通过storeBlock()方法保存到blockManager和WAL中;
  • Driver测:
    • ReceivedBlockTracker: 处理收到reciever和driver scheduler的调度信息时,会将触发的时间信息保存至wal中(此处类似mysql的redo日志);
    • Checkpoint机制: 在driver shechuler触发time时间下的generateJob()之后保存这个时间的checkpoint信息,以保障任务突然失败后的恢复逻辑;

Reciever测

WriteAheadLogBasedStoreResult容错逻辑,并行地保存block至blockManager和WAL中,分两步介绍。

Reciever将block保存至blockManager

如果不配置使用wal,保存至blockManager的storeageLevel是用户手动指定的,在kafka中默认的level为:StorageLevel.MEMORY_AND_DISK_SER_2;

如果配置使用wal,则会忽略用户使用的storageLevel,使用如下的storageLevel等级,默认可以使用memory和disk,同时1个备份:

  private val blockStoreTimeout = conf.getInt(
    "spark.streaming.receiver.blockStoreTimeout", 30).seconds

  private val effectiveStorageLevel = {
    if (storageLevel.deserialized) {
      logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
        s" write ahead log is enabled, change to serialization false")
    }
    if (storageLevel.replication > 1) {
      logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
        s"write ahead log is enabled, change to replication 1")
    }

    StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
  }

  if (storageLevel != effectiveStorageLevel) {
    logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
      s"$effectiveStorageLevel when write ahead log is enabled")
  }

写入WAL

该次write,会调用flush()强制落盘,所以一旦返回,一定保障数据写入、备份成功。

问题1: 该wal并不会用于recover,因为在reciver测并没有找到recover的接口,那该wal有什么用途呢?

当然保障数据的安全性了,在driver测会保存blockInfo信息,一定要保障blockInfo信息对应的block存在;

问题2:该wal因为保存真实的数据,会占用不少空间,它的清理逻辑是怎样的?

当该batch完成之后,会触发一个ClearMetadata()事件,程序判定是否开启wal,如果开启则会清理该batch对应的wal;

  def onBatchCompletion(time: Time) {
    eventLoop.post(ClearMetadata(time))
  }

Driver测

ReceivedBlockTracker

ReceivedBlockTracker测的wal是跟配置没有关系的,具体参考该issue:https://issues.apache.org/jira/browse/SPARK-7139,它的作用是将接收到的各个事件(保存的信息很少),输出至wal中(该名字虽然叫wal,跟上述的wal概念还是不一样的);

其保存的具体信息有,在ReceivedBlockTracker类中搜索writeToLog方法即可,可以发现有如下三处:

writeToLog(BlockAdditionEvent(receivedBlockInfo)
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks)
writeToLog(BatchCleanupEvent(timesToCleanup)

// 对应的事件类
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
  extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
  extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time])
  extends ReceivedBlockTrackerLogEvent
  
  
ReceivedBlockInfo(
    streamId: Int,
    numRecords: Option[Long],
    metadataOption: Option[Any],
    blockStoreResult: ReceivedBlockStoreResult
  ) 

private[streaming] trait ReceivedBlockStoreResult {
  // Any implementation of this trait will store a block id
  def blockId: StreamBlockId
  // Any implementation of this trait will have to return the number of records
  def numRecords: Option[Long]
}  

private[streaming] case class WriteAheadLogBasedStoreResult(
    blockId: StreamBlockId,
    numRecords: Option[Long],
    walRecordHandle: WriteAheadLogRecordHandle
  ) 
private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
  extends WriteAheadLogRecordHandle  
  
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
  def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
    streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
  }
}

可以看出其保存的核心信息为ReceivedBlockInfo,其具体包含有:

  • streamId: 每个stream的唯一标示;
  • numRecords: 该batch包含的记录数量;
  • metaDataOption: 可选metaData信息;
  • blockStoreResult: ReceivedBlockStoreResult是一个trait,根据该字段可以判定其在reciever测是否使用wal,同时会保存blockId -> (path, offset, length)的映射;

该实现默认是在初始化时开启恢复逻辑的,其逻辑类似于许多存储引擎的回放,具体实现如下:

  // Recover block information from write ahead logs
  if (recoverFromWriteAheadLog) {
    recoverPastEvents()
  }
  
  llocated block info) prior to failure.
   */
  private def recoverPastEvents(): Unit = synchronized {
    // Insert the recovered block information
    def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
      logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
      receivedBlockInfo.setBlockIdInvalid()
      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    }

    // Insert the recovered block-to-batch allocations and clear the queue of received blocks
    // (when the blocks were originally allocated to the batch, the queue must have been cleared).
    def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) {
      logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
        s"${allocatedBlocks.streamIdToAllocatedBlocks}")
      streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
      lastAllocatedBatchTime = batchTime
    }

    // Cleanup the batch allocations
    def cleanupBatches(batchTimes: Seq[Time]) {
      logTrace(s"Recovery: Cleaning up batches $batchTimes")
      timeToAllocatedBlocks --= batchTimes
    }

    writeAheadLogOption.foreach { writeAheadLog =>
      logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
      writeAheadLog.readAll().asScala.foreach { byteBuffer =>
        logInfo("Recovering record " + byteBuffer)
        Utils.deserialize[ReceivedBlockTrackerLogEvent](
          JavaUtils.bufferToArray(byteBuffer), Thread.currentThread().getContextClassLoader) match {
          case BlockAdditionEvent(receivedBlockInfo) =>
            insertAddedBlock(receivedBlockInfo)
          case BatchAllocationEvent(time, allocatedBlocks) =>
            insertAllocatedBatch(time, allocatedBlocks)
          case BatchCleanupEvent(batchTimes) =>
            cleanupBatches(batchTimes)
        }
      }
    }
  }

Checkpoint

class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val jars = ssc.sc.jars
  val graph = ssc.graph
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val sparkConfPairs = ssc.conf.getAll

  def createSparkConf(): SparkConf = {
  }
}

通过Checkpoint类可以看出,其保存至hdfs的信息有:

  • master: Spark运行master;
  • framework: Spark启动名字;
  • jars: Spark运行依赖jars;
  • graph: Streaming运行依赖graph图(我理解是所依赖的rdd信息);
  • checkpointDir: checkpoint路径;
  • checkpointDuration: checkpoint周期;
  • pendingTimes: 调度pending时间;
  • sparkConfPairs: sparkConf;

其保存和恢复逻辑较为简单:

保存:每个batch时间都会保存该checkpoit(当然checkpoint周期也可以设置);
恢复:启动driver时,会首先尝试从checkpoint中恢复;

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容