目录
前言
前面我们用3篇文章的时间讲解了RDD的基础知识,包括其五要素、算子、依赖、分区以及检查点。实际上,与RDD相关的细节还有很多,渗透在之后的研究过程中。在时机合适时,会再拨出专门的时间更深入地讲解RDD。从本篇开始,进入Spark Core存储子系统。
提起“存储”这个词,自然就包括内部存储(内存)与外部存储(磁盘等)。Spark的存储子系统会同时对内存和外存进行管理,这些管理组件的名称本身就很容易理解,如MemoryManager、DiskBlockManager、MemoryStore、DiskStore等,我们会逐渐接触到它们。
前文已经多次提到过,Spark存储子系统的“司令官”是BlockManager,即块管理器,用主从架构实现。由此可见,“块”(Block)是Spark存储的基本单位,看官如果学过操作系统理论,对这个词应该已经非常熟悉了。不过这里的块与操作系统和JVM都无关,只是Spark体系内的概念而已。
本文先来探索与块相关的基本实现,包括块的ID、实际数据与元信息的封装。
块ID:BlockId
与RDD类似,块也需要一个ID来表明它的身份。不过RDD的ID只是一个整形值而已,块ID包含的东西稍微多点。BlockId抽象类的定义如下。
代码#21.1 - o.a.s.storage.BlockId抽象类
sealed abstract class BlockId {
def name: String
def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD: Boolean = isInstanceOf[RDDBlockId]
def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
override def toString: String = name
}
这个类本身已经很清楚了,name方法返回该BlockId的唯一名称。三个以is为前缀的布尔方法分别判断当前BlockId是否为RDDBlockId、ShuffleBlockId和BroadcastBlockId。这三个实现类(当然还有一些其他的实现)都是BlockId的子类,以下是类图。
代码示例如下。
代码#21.2 - o.a.s.storage.RDDBlockId/ShuffleBlockId/BroadcastBlockId类
@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex
}
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}
@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}
可见,它们都是简单的样例类,覆写了name方法,并且命名都符合一定的规则。比如RDD数据块ID的命名为rdd_[RDD ID]_[分区号]
,Shuffle数据块ID的命名为shuffle_[Shuffle过程ID]_[Map任务ID]_[Reduce任务ID]
等。在BlockId类的伴生对象中,也有所有命名的正则表示。
代码#21.3 - BlockId命名的正则表示
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r
块的ID已经有了,接下来看具体的数据如何封装。
块数据:BlockData
BlockData特征
BlockData是一个松散的Scala特征,其源码如下。
代码#21.4 - o.a.s.storage.BlockData特征
private[spark] trait BlockData {
def toInputStream(): InputStream
def toNetty(): Object
def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
def toByteBuffer(): ByteBuffer
def size: Long
def dispose(): Unit
}
其中定义的都是虚方法,它们的含义分别是:
- toInputStream():将块数据转化为java.io.InputStream。
- toNetty():将块数据转化为适合Netty上传输的对象格式。
- toChunkedByteBuffer():将块数据转化为o.a.s.util.io.ChunkedByteBuffer。ChunkedByteBuffer在文章#11讲解广播变量时已出现过,是对多个java.nio.ByteBuffer的封装,表示多个不连续的内存缓冲区中的数据。虽然Chunk这个词在中文中一般也翻译作“块”,但它与上面的Block相比,更是一个逻辑概念而非物理概念。
- toByteBuffer():将块数据转化为单个java.nio.ByteBuffer。
- size():返回这个BlockData的长度。
- dispose():销毁BlockData。
可见,BlockData只是定义了数据转化的规范,并没有涉及具体的存储格式和读写流程,实现起来比较自由,所以前面说它是个松散的特征。BlockData目前有3个实现类:基于内存和ChunkedByteBuffer的ByteBufferBlockData、基于磁盘和File的DiskBlockData,以及加密的EncryptedBlockData。下面来看看最简单的ByteBufferBlockData实现。
ByteBufferBlockData
以下是ByteBufferBlockData类的源码,可见它是直接代理了ChunkedByteBuffer的各种方法。
代码#21.5 - o.a.s.storage.ByteBufferBlockData类
private[spark] class ByteBufferBlockData(
val buffer: ChunkedByteBuffer,
val shouldDispose: Boolean) extends BlockData {
override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)
override def toNetty(): Object = buffer.toNetty
override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
buffer.copy(allocator)
}
override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
override def size: Long = buffer.size
override def dispose(): Unit = {
if (shouldDispose) {
buffer.dispose()
}
}
}
ChunkedByteBuffer实际上就是定义了对Array[ByteBuffer]类型的各种操作,它在Spark存储中是个很常用的类,下面来看一下。
ChunkedByteBuffer简介
ChunkedByteBuffer的构造方法参数是一个名为chunks的Array[ByteBuffer]类型对象,也就是说一个ByteBuffer就是一个Chunk。该类的成员属性如下。
代码#21.6 - o.a.s.util.io.ChunkedByteBuffer类的属性成员
private val bufferWriteChunkSize =
Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
.getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt
private[this] var disposed: Boolean = false
val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
- bufferWriteChunkSize:将缓存数据写出时的Chunk大小,由spark.buffer.write.chunkSize配置项来确定,默认值64MB。
- disposed:该ChunkedByteBuffer是否已销毁。
- size:该ChunkedByteBuffer的大小,通过调用ByteBuffer.limit()方法获取每个Chunk的大小并累加而来。
它提供了一个writeFully()方法,用来将缓存块数据以bufferWriteChunkSize的大小写入NIO Channel。
代码#21.7 - o.a.s.util.io.ChunkedByteBuffer.writeFully()方法
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
val curChunkLimit = bytes.limit()
while (bytes.hasRemaining) {
try {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
} finally {
bytes.limit(curChunkLimit)
}
}
}
}
关于它的其他方法,我们会在今后的讲解过程中逐渐接触到,不难。
块元信息:BlockInfo
为了方便跟踪块的一些基本数据,需要用一个专门的数据结构BlockInfo来维护。其完整代码如下所示。
代码#21.8 - o.a.s.storage.BlockInfo类
private[storage] class BlockInfo(
val level: StorageLevel,
val classTag: ClassTag[_],
val tellMaster: Boolean) {
def size: Long = _size
def size_=(s: Long): Unit = {
_size = s
checkInvariants()
}
private[this] var _size: Long = 0
def readerCount: Int = _readerCount
def readerCount_=(c: Int): Unit = {
_readerCount = c
checkInvariants()
}
private[this] var _readerCount: Int = 0
def writerTask: Long = _writerTask
def writerTask_=(t: Long): Unit = {
_writerTask = t
checkInvariants()
}
private[this] var _writerTask: Long = BlockInfo.NO_WRITER
private def checkInvariants(): Unit = {
assert(_readerCount >= 0)
assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
}
checkInvariants()
}
该类有三个构造方法参数:
- level:块的期望存储等级,不代表实际的存储情况。例如,如果设定为StorageLevel.MEMORY_AND_DISK,那么这个块有可能只在内存而不在磁盘中,反之同理。
- classTag:块的类标签。
- tellMaster:是否要将该块的元信息告知Master。
BlockInfo内定义了3对Getter/Setter:
- size:块的大小,以字节为单位。
- readerCount:该块被读取的次数。因为读取块时需要上锁,因此也就相当于加读锁的次数。
- writerTask:当前持有该块写锁的Task ID。
虽然上面提到了读锁和写锁,但BlockInfo本身并没有提供任何锁机制,而是藉由BlockInfo的管理器BlockInfoManager来实现。关于BlockInfoManager的细节将在下一篇文章讨论。
总结
本文研究了与块相关的三大基本组件:BlockId、BlockData与BlockInfo,它们三者合起来就可以基本完整地描述Spark中的一个块了。理解了它们,我们就可以继续研究块在内存与外存中是分别如何管理的。