Spark Core源码精读计划#21:Spark存储中块(Block)的基本实现

目录

前言

前面我们用3篇文章的时间讲解了RDD的基础知识,包括其五要素、算子、依赖、分区以及检查点。实际上,与RDD相关的细节还有很多,渗透在之后的研究过程中。在时机合适时,会再拨出专门的时间更深入地讲解RDD。从本篇开始,进入Spark Core存储子系统。

提起“存储”这个词,自然就包括内部存储(内存)与外部存储(磁盘等)。Spark的存储子系统会同时对内存和外存进行管理,这些管理组件的名称本身就很容易理解,如MemoryManager、DiskBlockManager、MemoryStore、DiskStore等,我们会逐渐接触到它们。

前文已经多次提到过,Spark存储子系统的“司令官”是BlockManager,即块管理器,用主从架构实现。由此可见,“块”(Block)是Spark存储的基本单位,看官如果学过操作系统理论,对这个词应该已经非常熟悉了。不过这里的块与操作系统和JVM都无关,只是Spark体系内的概念而已。

本文先来探索与块相关的基本实现,包括块的ID、实际数据与元信息的封装。

块ID:BlockId

与RDD类似,块也需要一个ID来表明它的身份。不过RDD的ID只是一个整形值而已,块ID包含的东西稍微多点。BlockId抽象类的定义如下。

代码#21.1 - o.a.s.storage.BlockId抽象类

sealed abstract class BlockId {
  def name: String

  def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
  def isRDD: Boolean = isInstanceOf[RDDBlockId]
  def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
  def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]

  override def toString: String = name
}

这个类本身已经很清楚了,name方法返回该BlockId的唯一名称。三个以is为前缀的布尔方法分别判断当前BlockId是否为RDDBlockId、ShuffleBlockId和BroadcastBlockId。这三个实现类(当然还有一些其他的实现)都是BlockId的子类,以下是类图。

图#21.1 - BlockId的子类

代码示例如下。

代码#21.2 - o.a.s.storage.RDDBlockId/ShuffleBlockId/BroadcastBlockId类

@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
  override def name: String = "rdd_" + rddId + "_" + splitIndex
}

@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}

可见,它们都是简单的样例类,覆写了name方法,并且命名都符合一定的规则。比如RDD数据块ID的命名为rdd_[RDD ID]_[分区号],Shuffle数据块ID的命名为shuffle_[Shuffle过程ID]_[Map任务ID]_[Reduce任务ID]等。在BlockId类的伴生对象中,也有所有命名的正则表示。

代码#21.3 - BlockId命名的正则表示

  val RDD = "rdd_([0-9]+)_([0-9]+)".r
  val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
  val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
  val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
  val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
  val TASKRESULT = "taskresult_([0-9]+)".r
  val STREAM = "input-([0-9]+)-([0-9]+)".r
  val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
  val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
  val TEST = "test_(.*)".r

块的ID已经有了,接下来看具体的数据如何封装。

块数据:BlockData

BlockData特征

BlockData是一个松散的Scala特征,其源码如下。

代码#21.4 - o.a.s.storage.BlockData特征

private[spark] trait BlockData {
  def toInputStream(): InputStream
  def toNetty(): Object
  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
  def toByteBuffer(): ByteBuffer

  def size: Long
  def dispose(): Unit
}

其中定义的都是虚方法,它们的含义分别是:

  • toInputStream():将块数据转化为java.io.InputStream。
  • toNetty():将块数据转化为适合Netty上传输的对象格式。
  • toChunkedByteBuffer():将块数据转化为o.a.s.util.io.ChunkedByteBuffer。ChunkedByteBuffer在文章#11讲解广播变量时已出现过,是对多个java.nio.ByteBuffer的封装,表示多个不连续的内存缓冲区中的数据。虽然Chunk这个词在中文中一般也翻译作“块”,但它与上面的Block相比,更是一个逻辑概念而非物理概念。
  • toByteBuffer():将块数据转化为单个java.nio.ByteBuffer。
  • size():返回这个BlockData的长度。
  • dispose():销毁BlockData。

可见,BlockData只是定义了数据转化的规范,并没有涉及具体的存储格式和读写流程,实现起来比较自由,所以前面说它是个松散的特征。BlockData目前有3个实现类:基于内存和ChunkedByteBuffer的ByteBufferBlockData、基于磁盘和File的DiskBlockData,以及加密的EncryptedBlockData。下面来看看最简单的ByteBufferBlockData实现。

ByteBufferBlockData

以下是ByteBufferBlockData类的源码,可见它是直接代理了ChunkedByteBuffer的各种方法。

代码#21.5 - o.a.s.storage.ByteBufferBlockData类

private[spark] class ByteBufferBlockData(
    val buffer: ChunkedByteBuffer,
    val shouldDispose: Boolean) extends BlockData {
  override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)

  override def toNetty(): Object = buffer.toNetty

  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    buffer.copy(allocator)
  }

  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer

  override def size: Long = buffer.size

  override def dispose(): Unit = {
    if (shouldDispose) {
      buffer.dispose()
    }
  }
}

ChunkedByteBuffer实际上就是定义了对Array[ByteBuffer]类型的各种操作,它在Spark存储中是个很常用的类,下面来看一下。

ChunkedByteBuffer简介

ChunkedByteBuffer的构造方法参数是一个名为chunks的Array[ByteBuffer]类型对象,也就是说一个ByteBuffer就是一个Chunk。该类的成员属性如下。

代码#21.6 - o.a.s.util.io.ChunkedByteBuffer类的属性成员

  private val bufferWriteChunkSize =
    Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
      .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt

  private[this] var disposed: Boolean = false

  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
  • bufferWriteChunkSize:将缓存数据写出时的Chunk大小,由spark.buffer.write.chunkSize配置项来确定,默认值64MB。
  • disposed:该ChunkedByteBuffer是否已销毁。
  • size:该ChunkedByteBuffer的大小,通过调用ByteBuffer.limit()方法获取每个Chunk的大小并累加而来。

它提供了一个writeFully()方法,用来将缓存块数据以bufferWriteChunkSize的大小写入NIO Channel。

代码#21.7 - o.a.s.util.io.ChunkedByteBuffer.writeFully()方法

  def writeFully(channel: WritableByteChannel): Unit = {
    for (bytes <- getChunks()) {
      val curChunkLimit = bytes.limit()
      while (bytes.hasRemaining) {
        try {
          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
          bytes.limit(bytes.position() + ioSize)
          channel.write(bytes)
        } finally {
          bytes.limit(curChunkLimit)
        }
      }
    }
  }

关于它的其他方法,我们会在今后的讲解过程中逐渐接触到,不难。

块元信息:BlockInfo

为了方便跟踪块的一些基本数据,需要用一个专门的数据结构BlockInfo来维护。其完整代码如下所示。

代码#21.8 - o.a.s.storage.BlockInfo类

private[storage] class BlockInfo(
    val level: StorageLevel,
    val classTag: ClassTag[_],
    val tellMaster: Boolean) {
  def size: Long = _size
  def size_=(s: Long): Unit = {
    _size = s
    checkInvariants()
  }
  private[this] var _size: Long = 0

  def readerCount: Int = _readerCount
  def readerCount_=(c: Int): Unit = {
    _readerCount = c
    checkInvariants()
  }
  private[this] var _readerCount: Int = 0

  def writerTask: Long = _writerTask
  def writerTask_=(t: Long): Unit = {
    _writerTask = t
    checkInvariants()
  }
  private[this] var _writerTask: Long = BlockInfo.NO_WRITER

  private def checkInvariants(): Unit = {
    assert(_readerCount >= 0)
    assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
  }

  checkInvariants()
}

该类有三个构造方法参数:

  • level:块的期望存储等级,不代表实际的存储情况。例如,如果设定为StorageLevel.MEMORY_AND_DISK,那么这个块有可能只在内存而不在磁盘中,反之同理。
  • classTag:块的类标签。
  • tellMaster:是否要将该块的元信息告知Master。

BlockInfo内定义了3对Getter/Setter:

  • size:块的大小,以字节为单位。
  • readerCount:该块被读取的次数。因为读取块时需要上锁,因此也就相当于加读锁的次数。
  • writerTask:当前持有该块写锁的Task ID。

虽然上面提到了读锁和写锁,但BlockInfo本身并没有提供任何锁机制,而是藉由BlockInfo的管理器BlockInfoManager来实现。关于BlockInfoManager的细节将在下一篇文章讨论。

总结

本文研究了与块相关的三大基本组件:BlockId、BlockData与BlockInfo,它们三者合起来就可以基本完整地描述Spark中的一个块了。理解了它们,我们就可以继续研究块在内存与外存中是分别如何管理的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容