Spark Core源码精读计划#30:终于讲到的BlockManager及其管理的块读取流程

目录

前言

如题,在前方做了很多铺垫之后,本文终于可以来看BlockManager了,可谓是千呼万唤始出来。

块管理器BlockManager会运行在Spark集群中的所有节点上。每个节点上的BlockManager通过MemoryManager、MemoryStore、DiskBlockManager、DiskStore来管理其内存、磁盘中的块,并与其他节点进行块的交互,是一个规模庞大的组件。为了避免写太多出不来,本文先聚焦在两个最基础的方面,即BlockManager的初始化与块的读取流程。写入流程和其他逻辑(比如BlockTransferService)会另开坑来讲解。

BlockManager的初始化

构造方法与属性成员

代码#30.1 - o.a.s.storage.BlockManager类的构造方法与属性成员

private[spark] class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    val master: BlockManagerMaster,
    val serializerManager: SerializerManager,
    val conf: SparkConf,
    memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker,
    shuffleManager: ShuffleManager,
    val blockTransferService: BlockTransferService,
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with BlockEvictionHandler with Logging {

  private[spark] val externalShuffleServiceEnabled =
    conf.getBoolean("spark.shuffle.service.enabled", false)

  val diskBlockManager = {
    val deleteFilesOnStop =
      !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
    new DiskBlockManager(conf, deleteFilesOnStop)
  }

  private[storage] val blockInfoManager = new BlockInfoManager

  private val futureExecutionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

  private[spark] val memoryStore =
    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
  private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
  memoryManager.setMemoryStore(memoryStore)

  private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
  private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory

  private val externalShuffleServicePort = {
    val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
    if (tmpPort == 0) {
      conf.get("spark.shuffle.service.port").toInt
    } else {
      tmpPort
    }
  }

  var blockManagerId: BlockManagerId = _
  private[spark] var shuffleServerId: BlockManagerId = _

  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
    new ExternalShuffleClient(transConf, securityManager,
      securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
  } else {
    blockTransferService
  }

  private val maxFailuresBeforeLocationRefresh =
    conf.getInt("spark.block.failures.beforeLocationRefresh", 5)

  private val slaveEndpoint = rpcEnv.setupEndpoint(
    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

  private var asyncReregisterTask: Future[Unit] = null
  private val asyncReregisterLock = new Object

  @volatile private var cachedPeers: Seq[BlockManagerId] = _
  private val peerFetchLock = new Object
  private var lastPeerFetchTime = 0L

  private var blockReplicationPolicy: BlockReplicationPolicy = _

  private[storage] val remoteBlockTempFileManager =
    new BlockManager.RemoteBlockDownloadFileManager(this)
  private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
  // ......
}

BlockManager接受很多构造方法参数,之前已经讲过的类型就不再多说,其中还另外包含三个没有详细讲过的组件:MapOutputTracker,用于跟踪任务执行时Map任务的输出(即Reduce任务的输入),属于调度模块的一部分;ShuffleManager,用于管理Shuffle策略,在本专题之外的文章里详细分析过;BlockTransferService,顾名思义用来在各个节点之间远程传输块,这个在后面的文章中马上就会讲到。

BlockManager实现了BlockDataManager和BlockEvictionHandler两个特征,分别表示BlockManager可以管理块数据,以及从内存中淘汰块。截止目前,BlockManager是这两个特征的唯一的实现类。

下面来看看BlockManager类中的属性成员。看官已经很熟悉的组件(如MemoryStore、DiskStore等)也就不再赘述,只说几个主要的新面孔。

  • externalShuffleServiceEnabled:是否启用外部Shuffle服务,由配置项spark.shuffle.service.enabled来指定,默认不启用。什么叫外部Shuffle服务?我们都知道,传统的Shuffle服务是完全靠Executor来执行的,因此CPU和I/O都非常密集。如果Spark集群是on YARN的话,那么开启外部Shuffle就会在YARN NodeManager上跑一个常驻的YarnShuffleService,用来收取和分配Shuffle数据,降低Executor的压力。
  • futureExecutionContext:用于异步执行BlockManager中某些操作的守护线程池,大小为128。
  • blockManagerId:该BlockManager的ID,结构在上一篇文章中已经说过了。
  • shuffleServerId:用来保存Shuffle中间文件的实体ID。如果不用外部Shuffle服务的话,就与本BlockManagerId相同,否则就新建一个。
  • shuffleClient:用于获取其他Executor上的Shuffle文件的客户端。如果不启用外部Shuffle服务,就是前面提到过的BlockTransferService,否则就是ExternalShuffleClient实例。现在我们暂时不深究。
  • slaveEndpoint:BlockManager的从RPC端点的引用,使用RpcEnv.setupEndpoint()方法来生成。
  • blockReplicationPolicy:Spark中块复制的策略。

初始化方法

SparkEnv中调用了BlockManager的initialize()方法来初始化它,代码如下。

代码#30.2 - o.a.s.storage.BlockManager.initialize()方法

  def initialize(appId: String): Unit = {
    blockTransferService.init(this)
    shuffleClient.init(appId)

    blockReplicationPolicy = {
      val priorityClass = conf.get(
        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
      val clazz = Utils.classForName(priorityClass)
      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
      logInfo(s"Using $priorityClass for block replication policy")
      ret
    }

    val id =
      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
    val idFromMaster = master.registerBlockManager(
      id,
      maxOnHeapMemory,
      maxOffHeapMemory,
      slaveEndpoint)

    blockManagerId = if (idFromMaster != null) idFromMaster else id
    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }

    logInfo(s"Initialized BlockManager: $blockManagerId")
  }

BlockManager初始化的流程如下:

  1. 初始化BlockTransferService和ShuffleClient。
  2. 根据配置项spark.storage.replication.policy确定块复制策略并通过反射创建。默认值为RandomBlockReplicationPolicy,说明是将块的副本随机放到不同的节点上。
  3. 根据Executor ID生成BlockManagerId,并调用BlockManagerMaster.registerBlockManager()方法注册此ID与从RPC端点。注册成功后,BlockManagerMaster会返回另一个正式的ID。
  4. 生成Shuffle服务的ID。如果当前节点是Executor并启用了外部Shuffle服务的话,就调用registerWithExternalShuffleServer()方法注册外部Shuffle服务,代码略去。

前面写了这么多,可能看官还是没有实感(其实窝自己也是)。那么接下来看块读取流程,这是BlockManager的主要任务之一,并且没那么虚。

块读写的入口

在BlockManager中提供了多种对块进行读写的方法,其中一个将读写进行统一的入口是getOrElseUpdate()方法。因为块可以由RDD物化而来,因此我们可以方便地在RDD类中(具体来说是RDD.getOrCompute()方法)找到对它的调用。为了方便分析,本文就由它来入手。先顺便看一下源码吧。

代码#30.3 - o.a.s.storage.BlockManager.getOrElseUpdate()方法

  def getOrElseUpdate[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[T],
      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    get[T](blockId)(classTag) match {
      case Some(block) =>
        return Left(block)
      case _ =>
    }
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
      case None =>
        val blockResult = getLocalValues(blockId).getOrElse {
          releaseLock(blockId)
          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
        }
        releaseLock(blockId)
        Left(blockResult)
      case Some(iter) =>
       Right(iter)
    }
  }

该方法会首先根据块ID尝试读取数据(先从本地,后从远端)。如果获取不到,就调用传入的makeIterator函数将数据转化为迭代器并写入之。最终将读取或写入的数据封装在BlockResult结构中返回。

块读取流程

以下就是代码#30.3中调用的get()方法。

代码#30.4 - o.a.s.storage.BlockManager.get()方法

  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
    val local = getLocalValues(blockId)
    if (local.isDefined) {
      logInfo(s"Found block $blockId locally")
      return local
    }
    val remote = getRemoteValues[T](blockId)
    if (remote.isDefined) {
      logInfo(s"Found block $blockId remotely")
      return remote
    }
    None
  }

该方法先调用getLocalValues()方法从本地(注意是本地Executor)读取数据,如果读取不到,就继续调用getRemoteValues()方法从远端获取数据。下面分别来看。

从本地读取数据

代码#30.5 - o.a.s.storage.BlockManager.getLocalValues()方法

  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
    logDebug(s"Getting local block $blockId")
    blockInfoManager.lockForReading(blockId) match {
      case None =>
        logDebug(s"Block $blockId was not found")
        None
      case Some(info) =>
        val level = info.level
        logDebug(s"Level for block $blockId is $level")
        val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
        if (level.useMemory && memoryStore.contains(blockId)) {
          val iter: Iterator[Any] = if (level.deserialized) {
            memoryStore.getValues(blockId).get
          } else {
            serializerManager.dataDeserializeStream(
              blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
          }

          val ci = CompletionIterator[Any, Iterator[Any]](iter, {
            releaseLock(blockId, taskAttemptId)
          })
          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
        } else if (level.useDisk && diskStore.contains(blockId)) {
          val diskData = diskStore.getBytes(blockId)
          val iterToReturn: Iterator[Any] = {
            if (level.deserialized) {
              val diskValues = serializerManager.dataDeserializeStream(
                blockId,
                diskData.toInputStream())(info.classTag)
              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
            } else {
              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
                .map { _.toInputStream(dispose = false) }
                .getOrElse { diskData.toInputStream() }
              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
            }
          }
          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
            releaseLockAndDispose(blockId, diskData, taskAttemptId)
          })
          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
        } else {
          handleLocalReadFailure(blockId)
        }
    }
  }

有点长,但是思路比较清晰,以下简述这个方法的执行流程:

  1. 调用BlockInfoManager.lockForReading()方法,为这个块加读锁,并试图返回对应的块元数据BlockInfo。
  2. 如果没有BlockInfo,说明该块在本地不存在。反之,检查它的StorageLevel,按优先内存、其次磁盘的顺序考虑。
  3. 若该块的StorageLevel显示会利用内存,并且数据在MemoryStore中,就根据该数据是否序列化的情况,调用MemoryStore.getValues()或getBytes()方法,最终获得块数据的迭代器表示。
  4. 若该块的StorageLevel显示会利用磁盘,并且数据在DiskStore中,就先用DiskStore.getBytes()方法获得磁盘中块数据的字节流,然后根据是否序列化做不同的处理。其中还会用到maybeCacheDiskValuesInMemory()/maybeCacheDiskBytesInMemory()试图将读取到的磁盘数据cache到内存,以加快速度。
  5. 调用releaseLock()或releaseLockAndDispose()方法,释放块的读锁。
  6. 将块数据的迭代器、读取方法和块的字节数封装在BlockResult结构中返回。如果从内存读取和从磁盘读取都失败,就调用handleLocalReadFailure()方法处理本地读取的错误。

希望说的还算明白哈。继续看从远端读取块数据的方法。

从远端读取数据

代码#30.6 - o.a.s.storage.BlockManager.getRemoteValues()方法

  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
    val ct = implicitly[ClassTag[T]]
    getRemoteBytes(blockId).map { data =>
      val values =
        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
      new BlockResult(values, DataReadMethod.Network, data.size)
    }
  }

这个方法很短,是因为主要逻辑都在getRemoteBytes()方法中实现了。这是很显然的,因为远端的块数据必须要序列化之后才能传输,来到本地之后再反序列化为对象,所以实际上获取的是字节流。以下则是getRemoteBytes()方法的源码。

代码#30.7 - o.a.s.storage.BlockManager.getRemoteBytes()方法

  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    logDebug(s"Getting remote block $blockId")
    require(blockId != null, "BlockId is null")
    var runningFailureCount = 0
    var totalFailureCount = 0

    val locationsAndStatus = master.getLocationsAndStatus(blockId)
    val blockSize = locationsAndStatus.map { b =>
      b.status.diskSize.max(b.status.memSize)
    }.getOrElse(0L)
    val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)

    val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
      remoteBlockTempFileManager
    } else {
      null
    }

    val locations = sortLocations(blockLocations)
    val maxFetchFailures = locations.size
    var locationIterator = locations.iterator
    while (locationIterator.hasNext) {
      val loc = locationIterator.next()
      logDebug(s"Getting remote block $blockId from $loc")
      val data = try {
        blockTransferService.fetchBlockSync(
          loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager).nioByteBuffer()
      } catch {
        case NonFatal(e) =>
          runningFailureCount += 1
          totalFailureCount += 1

          if (totalFailureCount >= maxFetchFailures) {
            logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
              s"Most recent failure cause:", e)
            return None
          }

          logWarning(s"Failed to fetch remote block $blockId " +
            s"from $loc (failed attempt $runningFailureCount)", e)

          if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
            locationIterator = sortLocations(master.getLocations(blockId)).iterator
            logDebug(s"Refreshed locations from the driver " +
              s"after ${runningFailureCount} fetch failures.")
            runningFailureCount = 0
          }

          null
      }

      if (data != null) {
        return Some(new ChunkedByteBuffer(data))
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }

该方法的执行流程如下:

  1. 调用BlockManagerMaster.getLocationsAndStatus()方法,获取所有持有该块数据的远端BlockManager位置。
  2. 调用sortLocations()方法,根据BlockManagerId中的拓扑信息对BlockManager的位置进行排序。处于同一台服务器上的BlockManager排在最前,然后是同一机架上的节点的BlockManager(前提是能够感知到机架),最后才是不同机架的节点上的BlockManager。
  3. 对于每个远端BlockManager,调用BlockTransferService.fetchBlockSync()方法,同步地获取块数据,并以ChunkedByteBuffer形式返回。
  4. 如果从某个远端BlockManager获取不到块数据,就继续尝试下一个。当失败的尝试次数达到spark.block.failures.beforeLocationRefresh参数规定的阈值(默认值5)时,就主动刷新一次远端BlockManager的位置,防止过期。
  5. 若已经尝试了所有的远端BlockManager仍然未获取到,就认为此次读取失败。

总结

本文详细叙述了BlockManager的初始化过程,以及从本地、远端读取块数据的过程。下两篇文章会将写入块与BlockTransferService的相关细节补齐,这样我们就可以整理出BlockManager读写流程的全貌了。

晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容