1.ShuffleManager
Spark在初始化SparkEnv的时候,会在create()方法里面初始化ShuffleManager
// Let the user specify short names for shuffle managersvalshortShuffleMgrNames =Map("sort"-> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,"tungsten-sort"-> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)valshuffleMgrName = conf.get(config.SHUFFLE_MANAGER)valshuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)valshuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
这里可以看到包含sort和tungsten-sort两种shuffle,通过反射创建了ShuffleManager,ShuffleManager是一个特质,核心方法有下面几个:
private[spark]traitShuffleManager{/**
* 注册一个shuffle返回句柄
*/defregisterShuffle[K,V,C]( shuffleId:Int, dependency:ShuffleDependency[K,V,C]):ShuffleHandle/** 获取一个Writer根据给定的分区,在executors执行map任务时被调用 */defgetWriter[K,V]( handle:ShuffleHandle, mapId:Long, context:TaskContext, metrics:ShuffleWriteMetricsReporter):ShuffleWriter[K,V]/**
* 获取一个Reader根据reduce分区的范围,在executors执行reduce任务时被调用
*/defgetReader[K,C]( handle:ShuffleHandle, startPartition:Int, endPartition:Int, context:TaskContext, metrics:ShuffleReadMetricsReporter):ShuffleReader[K,C]...}
2.SortShuffleManager
SortShuffleManager是ShuffleManager的唯一实现类,对于以上三个方法的实现如下:
2.1 registerShuffle
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/overridedefregisterShuffle[K,V,C]( shuffleId:Int, dependency:ShuffleDependency[K,V,C]):ShuffleHandle= {// 1.首先检查是否符合BypassMergeSortif(SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't// need map-side aggregation, then write numPartitions files directly and just concatenate// them at the end. This avoids doing serialization and deserialization twice to merge// together the spilled files, which would happen with the normal code path. The downside is// having multiple files open at a time and thus more memory allocated to buffers.newBypassMergeSortShuffleHandle[K,V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K,V,V]])// 2.否则检查是否能够序列化}elseif(SortShuffleManager.canUseSerializedShuffle(dependency)) {// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:newSerializedShuffleHandle[K,V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K,V,V]]) }else{// Otherwise, buffer map outputs in a deserialized form:newBaseShuffleHandle(shuffleId, dependency) } }
1.首先检查是否符合BypassMergeSort,这里需要满足两个条件,首先是当前shuffle依赖中没有map端的聚合操作,其次是分区数要小于spark.shuffle.sort.bypassMergeThreshold的值,默认为200,如果满足这两个条件,会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制
defshouldBypassMergeSort(conf:SparkConf, dep:ShuffleDependency[_, _, _]):Boolean= {// We cannot bypass sorting if we need to do map-side aggregation.if(dep.mapSideCombine) {false}else{// 默认值为200valbypassMergeThreshold:Int= conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions <= bypassMergeThreshold }}
2.如果不满足上面条件,检查是否满足canUseSerializedShuffle()方法,如果满足该方法中的3个条件,则会返回SerializedShuffleHandle,启用tungsten-sort shuffle机制
defcanUseSerializedShuffle(dependency:ShuffleDependency[_, _, _]):Boolean= {valshufId = dependency.shuffleIdvalnumPartitions = dependency.partitioner.numPartitions// 序列化器需要支持Relocationif(!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle$shufIdbecause the serializer, "+s"${dependency.serializer.getClass.getName}, does not support object relocation")false// 不能有map端聚合操作}elseif(dependency.mapSideCombine) { log.debug(s"Can't use serialized shuffle for shuffle$shufIdbecause we need to do "+s"map-side aggregation")false// 分区数不能大于16777215+1}elseif(numPartitions >MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle$shufIdbecause it has more than "+s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODEpartitions")false}else{ log.debug(s"Can use serialized shuffle for shuffle$shufId")true}}
3.如果以上两个条件都不满足的话,会返回BaseShuffleHandle,采用基本sort shuffle机制
2.2 getReader
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/overridedefgetReader[K,C]( handle:ShuffleHandle, startPartition:Int, endPartition:Int, context:TaskContext, metrics:ShuffleReadMetricsReporter):ShuffleReader[K,C] = {valblocksByAddress =SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startPartition, endPartition)newBlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _,C]], blocksByAddress, context, metrics, shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))}
这里返回BlockStoreShuffleReader
2.3 getWriter
/** Get a writer for a given partition. Called on executors by map tasks. */overridedefgetWriter[K,V]( handle:ShuffleHandle, mapId:Long, context:TaskContext, metrics:ShuffleWriteMetricsReporter):ShuffleWriter[K,V] = {valmapTaskIds = taskIdMapsForShuffle.computeIfAbsent( handle.shuffleId, _ =>newOpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }valenv =SparkEnv.get// 根据handle获取不同ShuffleWritehandlematch{caseunsafeShuffleHandle:SerializedShuffleHandle[K@unchecked,V@unchecked] =>newUnsafeShuffleWriter( env.blockManager, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf, metrics, shuffleExecutorComponents)casebypassMergeSortHandle:BypassMergeSortShuffleHandle[K@unchecked,V@unchecked] =>newBypassMergeSortShuffleWriter( env.blockManager, bypassMergeSortHandle, mapId, env.conf, metrics, shuffleExecutorComponents)caseother:BaseShuffleHandle[K@unchecked,V@unchecked, _] =>newSortShuffleWriter( shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents) }}
这里会根据handle获取不同ShuffleWrite,如果是SerializedShuffleHandle,使用UnsafeShuffleWriter,如果是BypassMergeSortShuffleHandle,采用BypassMergeSortShuffleWriter,否则使用SortShuffleWriter
3.三种Writer的实现
如上文所说,当开启bypass机制后,会使用BypassMergeSortShuffleWriter,如果serializer支持relocation并且map端没有聚合同时分区数目不大于16777215+1三个条件都满足,使用UnsafeShuffleWriter,否则使用SortShuffleWriter
3.1 BypassMergeSortShuffleWriter
BypassMergeSortShuffleWriter继承ShuffleWriter,用java实现,会将map端的多个输出文件合并为一个文件,同时生成一个索引文件,索引记录到每个分区的初始地址,write()方法如下:
@Overridepublic void write(Iterator> records)throwsIOException{ assert (partitionWriters ==null);// 新建一个ShuffleMapOutputWriterShuffleMapOutputWritermapOutputWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, numPartitions);try{// 如果没有数据的话if(!records.hasNext()) {// 返回所有分区的写入长度partitionLengths = mapOutputWriter.commitAllPartitions();// 更新mapStatusmapStatus =MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId);return; }finalSerializerInstanceserInstance = serializer.newInstance();finallong openStartTime =System.nanoTime();// 创建和分区数相等的DiskBlockObjectWriter FileSegmentpartitionWriters =newDiskBlockObjectWriter[numPartitions]; partitionWriterSegments =newFileSegment[numPartitions];// 对于每个分区for(int i =0; i < numPartitions; i++) {// 创建一个临时的blockfinalTuple2 tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock();// 获取temp block的file和idfinalFilefile = tempShuffleBlockIdPlusFile._2();finalBlockIdblockId = tempShuffleBlockIdPlusFile._1();// 对于每个分区,创建一个DiskBlockObjectWriterpartitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); }// Creating the file to write to and creating a disk writer both involve interacting with// the disk, and can take a long time in aggregate when we open many files, so should be// included in the shuffle write time.// 创建文件和写入文件都需要大量时间,也需要包含在shuffle写入时间里面writeMetrics.incWriteTime(System.nanoTime() - openStartTime);// 如果有数据的话while(records.hasNext()) {finalProduct2 record = records.next();finalKkey = record._1();// 对于每条数据按key写入相应分区对应的文件partitionWriters[partitioner.getPartition(key)].write(key, record._2()); }for(int i =0; i < numPartitions; i++) {try(DiskBlockObjectWriterwriter = partitionWriters[i]) {// 提交partitionWriterSegments[i] = writer.commitAndGet(); } }// 将所有分区文件合并成一个文件partitionLengths = writePartitionedData(mapOutputWriter);// 更新mapStatusmapStatus =MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); }catch(Exceptione) {try{ mapOutputWriter.abort(e); }catch(Exceptione2) { logger.error("Failed to abort the writer after failing to write map output.", e2); e.addSuppressed(e2); }throwe; }}
合并文件的方法writePartitionedData()如下,默认采用零拷贝的方式来合并文件:
privatelong[] writePartitionedData(ShuffleMapOutputWritermapOutputWriter)throwsIOException{// Track location of the partition starts in the output fileif(partitionWriters !=null) {// 开始时间finallong writeStartTime =System.nanoTime();try{for(int i =0; i < numPartitions; i++) {// 获取每个文件finalFilefile = partitionWriterSegments[i].file();ShufflePartitionWriterwriter = mapOutputWriter.getPartitionWriter(i);if(file.exists()) {// 采取零拷贝方式if(transferToEnabled) {// Using WritableByteChannelWrapper to make resource closing consistent between// this implementation and UnsafeShuffleWriter.Optional maybeOutputChannel = writer.openChannelWrapper();// 在这里会调用Utils.copyFileStreamNIO方法,最终调用FileChannel.transferTo方法拷贝文件if(maybeOutputChannel.isPresent()) { writePartitionedDataWithChannel(file, maybeOutputChannel.get()); }else{ writePartitionedDataWithStream(file, writer); } }else{// 否则采取流的方式拷贝writePartitionedDataWithStream(file, writer); }if(!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } }finally{ writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters =null; }returnmapOutputWriter.commitAllPartitions();}
3.2 UnsafeShuffleWriter
UnsafeShuffleWriter也是继承ShuffleWriter,用java实现,write方法如下:
@Overridepublic void write(scala.collection.Iterator> records)throwsIOException{// Keep track of success so we know if we encountered an exception// We do this rather than a standard try/catch/re-throw to handle// generic throwables.// 跟踪异常boolean success =false;try{while(records.hasNext()) {// 将数据插入ShuffleExternalSorter进行外部排序insertRecordIntoSorter(records.next()); }// 合并并输出文件closeAndWriteOutput(); success =true; }finally{if(sorter !=null) {try{ sorter.cleanupResources(); }catch(Exceptione) {// Only throw this error if we won't be masking another// error.if(success) {throwe; }else{ logger.error("In addition to a failure during writing, we failed during "+"cleanup.", e); } } } }}
这里主要有两个方法:
3.2.1 insertRecordIntoSorter()
@VisibleForTestingvoid insertRecordIntoSorter(Product2 record)throwsIOException{ assert(sorter !=null);// 获取key和分区finalKkey = record._1();finalint partitionId = partitioner.getPartition(key);// 重置缓冲区serBuffer.reset();// 将key和value写入缓冲区serOutputStream.writeKey(key,OBJECT_CLASS_TAG); serOutputStream.writeValue(record._2(),OBJECT_CLASS_TAG); serOutputStream.flush();// 获取序列化数据大小finalint serializedRecordSize = serBuffer.size(); assert (serializedRecordSize >0);// 将序列化后的数据插入ShuffleExternalSorter处理sorter.insertRecord( serBuffer.getBuf(),Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);}
该方法会将数据进行序列化,并且将序列化后的数据通过insertRecord()方法插入外部排序器中,insertRecord()方法如下:
public void insertRecord(ObjectrecordBase, long recordOffset, int length, int partitionId)throwsIOException{// for testsassert(inMemSorter !=null);// 如果数据条数超过溢写阈值,直接溢写磁盘if(inMemSorter.numRecords() >= numElementsForSpillThreshold) { logger.info("Spilling data because number of spilledRecords crossed the threshold "+ numElementsForSpillThreshold); spill(); }// Checks whether there is enough space to insert an additional record in to the sort pointer// array and grows the array if additional space is required. If the required space cannot be// obtained, then the in-memory data will be spilled to disk.// 检查是否有足够的空间插入额外的记录到排序指针数组中,如果需要额外的空间对数组进行扩容,如果空间不够,内存中的数据将会被溢写到磁盘上growPointerArrayIfNecessary();finalint uaoSize =UnsafeAlignedOffset.getUaoSize();// Need 4 or 8 bytes to store the record length.// 需要额外的4或8个字节存储数据长度finalint required = length + uaoSize;// 如果需要更多的内存,会想TaskMemoryManager申请新的pageacquireNewPageIfNecessary(required); assert(currentPage !=null);finalObjectbase = currentPage.getBaseObject();//Given a memory page and offset within that page, encode this address into a 64-bit long.//This address will remain valid as long as the corresponding page has not been freed.// 通过给定的内存页和偏移量,将当前数据的逻辑地址编码成一个long型finallong recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);// 写长度值UnsafeAlignedOffset.putSize(base, pageCursor, length);// 移动指针pageCursor += uaoSize;// 写数据Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);// 移动指针pageCursor += length;// 将编码的逻辑地址和分区id传给ShuffleInMemorySorter进行排序inMemSorter.insertRecord(recordAddress, partitionId);}
在这里对于数据的缓存和溢写不借助于其他高级数据结构,而是直接操作内存空间
growPointerArrayIfNecessary()方法如下:
/**
* Checks whether there is enough space to insert an additional record in to the sort pointer
* array and grows the array if additional space is required. If the required space cannot be
* obtained, then the in-memory data will be spilled to disk.
*/privatevoid growPointerArrayIfNecessary()throwsIOException{ assert(inMemSorter !=null);// 如果没有空间容纳新的数据if(!inMemSorter.hasSpaceForAnotherRecord()) {// 获取当前内存使用量long used = inMemSorter.getMemoryUsage();LongArrayarray;try{// could trigger spilling// 分配给缓存原来两倍的容量array = allocateArray(used /8*2); }catch(TooLargePageExceptione) {// The pointer array is too big to fix in a single page, spill.// 如果超出了一页的大小,直接溢写,溢写方法见后面// 一页的大小为128M,在PackedRecordPointer类中// static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytesspill();return; }catch(SparkOutOfMemoryErrore) {// should have trigger spillingif(!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array");throwe; }return; }// check if spilling is triggered or notif(inMemSorter.hasSpaceForAnotherRecord()) {// 如果有了剩余空间,则表明没必要扩容,释放分配的空间freeArray(array); }else{// 否则把原来的数组复制到新的数组inMemSorter.expandPointerArray(array); } }}
spill()方法如下:
@Overridepublic long spill(long size,MemoryConsumertrigger)throwsIOException{if(trigger !=this|| inMemSorter ==null|| inMemSorter.numRecords() ==0) {return0L; } logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",Thread.currentThread().getId(),Utils.bytesToString(getMemoryUsage()), spills.size(), spills.size() >1?" times":" time");// Sorts the in-memory records and writes the sorted records to an on-disk file.// This method does not free the sort data structures.// 对内存中的数据进行排序并且将有序记录写到一个磁盘文件中,这个方法不会释放排序的数据结构writeSortedFile(false);finallong spillSize = freeMemory();// 重置ShuffleInMemorySorterinMemSorter.reset();// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the// records. Otherwise, if the task is over allocated memory, then without freeing the memory// pages, we might not be able to get memory for the pointer array.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);returnspillSize;}
writeSortedFile()方法:
privatevoid writeSortedFile(boolean isLastFile) {// This call performs the actual sort.// 返回一个排序好的迭代器finalShuffleInMemorySorter.ShuffleSorterIteratorsortedRecords = inMemSorter.getSortedIterator();// If there are no sorted records, so we don't need to create an empty spill file.if(!sortedRecords.hasNext()) {return; }finalShuffleWriteMetricsReporterwriteMetricsToUse;// 如果为true,则为输出文件,否则为溢写文件if(isLastFile) {// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.writeMetricsToUse = writeMetrics; }else{// We're spilling, so bytes written should be counted towards spill rather than write.// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count// them towards shuffle bytes written.writeMetricsToUse =newShuffleWriteMetrics(); }// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to// be an API to directly transfer bytes from managed memory to the disk writer, we buffer// data through a byte array. This array does not need to be large enough to hold a single// record;// 创建一个字节缓冲数组,大小为1mfinalbyte[] writeBuffer =newbyte[diskWriteBufferSize];// Because this output will be read during shuffle, its compression codec must be controlled by// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use// createTempShuffleBlock here; see SPARK-3426 for more details.// 创建一个临时的shuffle blockfinalTuple2 spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock();// 获取文件和idfinalFilefile = spilledFileInfo._2();finalTempShuffleBlockIdblockId = spilledFileInfo._1();finalSpillInfospillInfo =newSpillInfo(numPartitions, file, blockId);// Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.// Our write path doesn't actually use this serializer (since we end up calling the `write()`// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work// around this, we pass a dummy no-op serializer.// 不做任何转换的序列化器,因为需要一个实例来构造DiskBlockObjectWriterfinalSerializerInstanceser =DummySerializerInstance.INSTANCE; int currentPartition =-1;finalFileSegmentcommittedSegment;try(DiskBlockObjectWriterwriter = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {finalint uaoSize =UnsafeAlignedOffset.getUaoSize();// 遍历while(sortedRecords.hasNext()) { sortedRecords.loadNext();finalint partition = sortedRecords.packedRecordPointer.getPartitionId(); assert (partition >= currentPartition);if(partition != currentPartition) {// Switch to the new partition// 如果切换到了新的分区,提交当前分区,并且记录当前分区大小if(currentPartition !=-1) {finalFileSegmentfileSegment = writer.commitAndGet(); spillInfo.partitionLengths[currentPartition] = fileSegment.length(); }// 然后切换到下一个分区currentPartition = partition; }// 获取指针,通过指针获取页号和偏移量finallong recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();finalObjectrecordPage = taskMemoryManager.getPage(recordPointer);finallong recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);// 获取剩余数据int dataRemaining =UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);// 跳过数据前面存储的长度long recordReadPosition = recordOffsetInPage + uaoSize;// skip over record lengthwhile(dataRemaining >0) {finalint toTransfer =Math.min(diskWriteBufferSize, dataRemaining);// 将数据拷贝到缓冲数组中Platform.copyMemory( recordPage, recordReadPosition, writeBuffer,Platform.BYTE_ARRAY_OFFSET, toTransfer);// 从缓冲数组中转入DiskBlockObjectWriterwriter.write(writeBuffer,0, toTransfer);// 更新位置recordReadPosition += toTransfer;// 更新剩余数据dataRemaining -= toTransfer; } writer.recordWritten(); }// 提交committedSegment = writer.commitAndGet(); }// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,// then the file might be empty. Note that it might be better to avoid calling// writeSortedFile() in that case.// 记录溢写文件的列表if(currentPartition !=-1) { spillInfo.partitionLengths[currentPartition] = committedSegment.length(); spills.add(spillInfo); }// 如果是溢写文件,更新溢写的指标if(!isLastFile) { writeMetrics.incRecordsWritten( ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten()); taskContext.taskMetrics().incDiskBytesSpilled( ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten()); }}
encodePageNumberAndOffset()方法如下:
public long encodePageNumberAndOffset(MemoryBlockpage, long offsetInPage) {// 如果开启了堆外内存,偏移量为绝对地址,可能需要64位进行编码,由于页大小限制,将其减去当前页的基地址,变为相对地址if(tungstenMemoryMode ==MemoryMode.OFF_HEAP) {// In off-heap mode, an offset is an absolute address that may require a full 64 bits to// encode. Due to our page size limitation, though, we can convert this into an offset that's// relative to the page's base offset; this relative offset will fit in 51 bits.offsetInPage -= page.getBaseOffset(); }returnencodePageNumberAndOffset(page.pageNumber, offsetInPage);}@VisibleForTestingpublic static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) { assert (pageNumber >=0) :"encodePageNumberAndOffset called with invalid page";// 高13位为页号,低51位为偏移量// 页号左移51位,再拼偏移量和上一个低51位都为1的掩码0x7FFFFFFFFFFFFLreturn(((long) pageNumber) <
ShuffleInMemorySorter的insertRecord()方法如下:
public void insertRecord(long recordPointer, int partitionId) {if(!hasSpaceForAnotherRecord()) {thrownewIllegalStateException("There is no space for new record"); } array.set(pos,PackedRecordPointer.packPointer(recordPointer, partitionId)); pos++;}
PackedRecordPointer.packPointer()方法:
public static long packPointer(long recordPointer, int partitionId) { assert (partitionId <=MAXIMUM_PARTITION_ID);// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.// 将页号右移24位,和低27位拼在一起,这样逻辑地址被压缩成40位finallong pageNumber = (recordPointer &MASK_LONG_UPPER_13_BITS) >>>24;finallong compressedAddress = pageNumber | (recordPointer &MASK_LONG_LOWER_27_BITS);// 将分区号放在高24位上return(((long) partitionId) <<40) | compressedAddress;}
getSortedIterator()方法:
publicShuffleSorterIteratorgetSortedIterator() { int offset =0;// 使用基数排序对内存分区ID进行排序。基数排序要快得多,但是在添加指针时需要额外的内存作为保留内存if(useRadixSort) { offset =RadixSort.sort( array, pos,PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX,false,false);// 否则采用timSort排序}else{MemoryBlockunused =newMemoryBlock( array.getBaseObject(), array.getBaseOffset() + pos *8L, (array.size() - pos) *8L);LongArraybuffer =newLongArray(unused);Sorter sorter =newSorter<>(newShuffleSortDataFormat(buffer)); sorter.sort(array,0, pos,SORT_COMPARATOR); }returnnewShuffleSorterIterator(pos, array, offset);}
3.2.2 closeAndWriteOutput()
@VisibleForTestingvoid closeAndWriteOutput()throwsIOException{ assert(sorter !=null); updatePeakMemoryUsed(); serBuffer =null; serOutputStream =null;// 获取溢写文件finalSpillInfo[] spills = sorter.closeAndGetSpills(); sorter =null;finallong[] partitionLengths;try{// 合并溢写文件partitionLengths = mergeSpills(spills); }finally{// 删除溢写文件for(SpillInfospill : spills) {if(spill.file.exists() && !spill.file.delete()) { logger.error("Error while deleting spill file {}", spill.file.getPath()); } } }// 更新mapstatusmapStatus =MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId);}
mergeSpills()方法:
privatelong[] mergeSpills(SpillInfo[] spills)throwsIOException{ long[] partitionLengths;// 如果没有溢写文件,创建空的if(spills.length ==0) {finalShuffleMapOutputWritermapWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());returnmapWriter.commitAllPartitions();// 如果只有一个溢写文件,将它合并输出}elseif(spills.length ==1) {Optional maybeSingleFileWriter = shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);if(maybeSingleFileWriter.isPresent()) {// Here, we don't need to perform any metrics updates because the bytes written to this// output file would have already been counted as shuffle bytes written.partitionLengths = spills[0].partitionLengths; maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths); }else{ partitionLengths = mergeSpillsUsingStandardWriter(spills); }// 如果有多个,合并输出,合并的时候有NIO和BIO两种方式}else{ partitionLengths = mergeSpillsUsingStandardWriter(spills); }returnpartitionLengths;}
3.3 SortShuffleWriter
SortShuffleWriter会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,如果超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对之前的所有输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合,入口为write()方法:
overridedefwrite(records:Iterator[Product2[K,V]]):Unit= {// 创建一个外部排序器,如果map端有预聚合,就传入aggregator和keyOrdering,否则不需要传入sorter =if(dep.mapSideCombine) {newExternalSorter[K,V,C]( context, dep.aggregator,Some(dep.partitioner), dep.keyOrdering, dep.serializer) }else{// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.newExternalSorter[K,V,V]( context, aggregator =None,Some(dep.partitioner), ordering =None, dep.serializer) }// 将数据放入ExternalSorter进行排序sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).// 创建一个输出WrtiervalmapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions)// 将外部排序的数据写入Writersorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)valpartitionLengths = mapOutputWriter.commitAllPartitions()// 更新mapstatusmapStatus =MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)}
insertAll()方法:
definsertAll(records:Iterator[Product2[K,V]]):Unit= {//TODO:stop combining if we find that the reduction factor isn't highvalshouldCombine = aggregator.isDefined// 是否需要map端聚合if(shouldCombine) {// Combine values in-memory first using our AppendOnlyMap// 使用AppendOnlyMap在内存中聚合values// 获取mergeValue()函数,将新值合并到当前聚合结果中valmergeValue = aggregator.get.mergeValue// 获取createCombiner()函数,创建聚合初始值valcreateCombiner = aggregator.get.createCombinervarkv:Product2[K,V] =null// 如果一个key当前有聚合值,则合并,如果没有创建初始值valupdate = (hadValue:Boolean, oldValue:C) => {if(hadValue) mergeValue(oldValue, kv._2)elsecreateCombiner(kv._2) }// 遍历while(records.hasNext) {// 增加读取记录数addElementsRead() kv = records.next()// map为PartitionedAppendOnlyMap,将分区和key作为key,聚合值作为valuemap.changeValue((getPartition(kv._1), kv._1), update)// 是否需要溢写到磁盘maybeSpillCollection(usingMap =true) }// 如果不需要map端聚合}else{// Stick values into our bufferwhile(records.hasNext) { addElementsRead()valkv = records.next()// buffer为PartitionedPairBuffer,将分区和key加进去buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])// 是否需要溢写到磁盘maybeSpillCollection(usingMap =false) } }}
该方法主要是判断在插入数据时,是否需要在map端进行预聚合,分别采用两种数据结构来保存
maybeSpillCollection()方法里面会调用maybeSpill()方法检查是否需要溢写,如果发生溢写,重新构造一个map或者buffer结构从头开始缓存,如下:
privatedefmaybeSpillCollection(usingMap:Boolean):Unit= {varestimatedSize =0Lif(usingMap) { estimatedSize = map.estimateSize()// 判断是否需要溢写if(maybeSpill(map, estimatedSize)) { map =newPartitionedAppendOnlyMap[K,C] } }else{ estimatedSize = buffer.estimateSize()// 判断是否需要溢写if(maybeSpill(buffer, estimatedSize)) { buffer =newPartitionedPairBuffer[K,C] } }if(estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize }}protecteddefmaybeSpill(collection:C, currentMemory:Long):Boolean= {varshouldSpill =false// 如果读取的记录数是32的倍数,并且预估map或者buffer内存占用大于默认的5m阈值if(elementsRead %32==0&& currentMemory >= myMemoryThreshold) {// Claim up to double our current memory from the shuffle memory pool// 尝试申请2*currentMemory-5m的内存valamountToRequest =2* currentMemory - myMemoryThresholdvalgranted = acquireMemory(amountToRequest)// 更新阈值myMemoryThreshold += granted// If we were granted too little memory to grow further (either tryToAcquire returned 0,// or we already had more memory than myMemoryThreshold), spill the current collection// 判断,如果还是不够,确定溢写shouldSpill = currentMemory >= myMemoryThreshold }// 如果shouldSpill为false,但是读取的记录数大于Integer.MAX_VALUE,也是需要溢写shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold// Actually spillif(shouldSpill) {// 溢写次数+1_spillCount +=1logSpillage(currentMemory)// 溢写缓存的集合spill(collection) _elementsRead =0_memoryBytesSpilled += currentMemory// 释放内存releaseMemory() } shouldSpill }
maybeSpill()方法里面会调用spill()进行溢写,如下:
overrideprotected[this]defspill(collection:WritablePartitionedPairCollection[K,C]):Unit= {// 根据给定的比较器进行排序,返回排序结果的迭代器valinMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)// 将迭代器中的数据溢写到磁盘文件中valspillFile = spillMemoryIteratorToDisk(inMemoryIterator)// ArrayBuffer记录所有溢写的文件spills += spillFile }
spillMemoryIteratorToDisk()方法如下:
private[this]defspillMemoryIteratorToDisk(inMemoryIterator:WritablePartitionedIterator) :SpilledFile= {// Because these files may be read during shuffle, their compression must be controlled by// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use// createTempShuffleBlock here; see SPARK-3426 for more context.// 创建一个临时块val(blockId, file) = diskBlockManager.createTempShuffleBlock()// These variables are reset after each flushvarobjectsWritten:Long=0valspillMetrics:ShuffleWriteMetrics=newShuffleWriteMetrics// 创建溢写文件的DiskBlockObjectWritervalwriter:DiskBlockObjectWriter= blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)// List of batch sizes (bytes) in the order they are written to disk// 记录写入批次大小valbatchSizes =newArrayBuffer[Long]// How many elements we have in each partition// 记录每个分区条数valelementsPerPartition =newArray[Long](numPartitions)// Flush the disk writer's contents to disk, and update relevant variables.// The writer is committed at the end of this process.// 将内存中的数据按批次刷写到磁盘中defflush():Unit= {valsegment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten =0}varsuccess =falsetry{// 遍历map或者buffer中的记录while(inMemoryIterator.hasNext) {valpartitionId = inMemoryIterator.nextPartition() require(partitionId >=0&& partitionId < numPartitions,s"partition Id:${partitionId}should be in the range [0,${numPartitions})")// 写入并更新计数值inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) +=1objectsWritten +=1// 写入条数达到10000条时,将这批刷写到磁盘if(objectsWritten == serializerBatchSize) { flush() } }// 遍历完以后,将剩余的刷写到磁盘if(objectsWritten >0) { flush() }else{ writer.revertPartialWritesAndClose() } success =true}finally{if(success) { writer.close() }else{// This code path only happens if an exception was thrown above before we set success;// close our stuff and let the exception be thrown furtherwriter.revertPartialWritesAndClose()if(file.exists()) {if(!file.delete()) { logWarning(s"Error deleting${file}") } } } }// 返回溢写文件SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)}
接下来就是排序合并操作,调用ExternalSorter.writePartitionedMapOutput()方法:
defwritePartitionedMapOutput( shuffleId:Int, mapId:Long, mapOutputWriter:ShuffleMapOutputWriter):Unit= {varnextPartitionId =0// 如果没有发生溢写if(spills.isEmpty) {// Case where we only have in-memory datavalcollection =if(aggregator.isDefined) mapelsebuffer// 根据指定的比较器进行排序valit = collection.destructiveSortedWritablePartitionedIterator(comparator)while(it.hasNext()) {valpartitionId = it.nextPartition()varpartitionWriter:ShufflePartitionWriter=nullvarpartitionPairsWriter:ShufflePartitionPairsWriter=nullTryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)valblockId =ShuffleBlockId(shuffleId, mapId, partitionId) partitionPairsWriter =newShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics)// 将分区内的数据依次取出while(it.hasNext && it.nextPartition() == partitionId) { it.writeNext(partitionPairsWriter) } } {if(partitionPairsWriter !=null) { partitionPairsWriter.close() } } nextPartitionId = partitionId +1}// 如果发生溢写,将溢写文件和缓存数据进行归并排序,排序完成后按照分区依次写入ShufflePartitionPairsWriter}else{// We must perform merge-sort; get an iterator by partition and write everything directly.// 这里会进行归并排序for((id, elements) <-this.partitionedIterator) {valblockId =ShuffleBlockId(shuffleId, mapId, id)varpartitionWriter:ShufflePartitionWriter=nullvarpartitionPairsWriter:ShufflePartitionPairsWriter=nullTryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(id) partitionPairsWriter =newShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics)if(elements.hasNext) {for(elem <- elements) { partitionPairsWriter.write(elem._1, elem._2) } } } {if(partitionPairsWriter !=null) { partitionPairsWriter.close() } } nextPartitionId = id +1} } context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)}
partitionedIterator()方法:
defpartitionedIterator:Iterator[(Int,Iterator[Product2[K,C]])] = {valusingMap = aggregator.isDefinedvalcollection:WritablePartitionedPairCollection[K,C] =if(usingMap) mapelsebufferif(spills.isEmpty) {// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps// we don't even need to sort by anything other than partition ID// 如果没有溢写,并且没有排序,只按照分区id排序if(ordering.isEmpty) {// The user hasn't requested sorted keys, so only sort by partition ID, not keygroupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))// 如果没有溢写但是排序,先按照分区id排序,再按key排序}else{// We do need to sort by both partition ID and keygroupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } }else{// Merge spilled and in-memory data// 如果有溢写,就将溢写文件和内存中的数据归并排序merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) }}
归并方法如下:
privatedefmerge(spills:Seq[SpilledFile], inMemory:Iterator[((Int,K),C)]) :Iterator[(Int,Iterator[Product2[K,C]])] = {// 读取溢写文件valreaders = spills.map(newSpillReader(_))valinMemBuffered = inMemory.buffered// 遍历分区(0until numPartitions).iterator.map { p =>valinMemIterator =newIteratorForPartition(p, inMemBuffered)// 合并溢写文件和内存中的数据valiterators = readers.map(_.readNextPartition()) ++Seq(inMemIterator)// 如果有聚合逻辑,按分区聚合,对key按照keyComparator排序if(aggregator.isDefined) {// Perform partial aggregation across partitions(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))// 如果没有聚合,但是有排序逻辑,按照ordering做归并}elseif(ordering.isDefined) {// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);// sort the elements without trying to merge them(p, mergeSort(iterators, ordering.get))// 什么都没有直接归并}else{ (p, iterators.iterator.flatten) } }}
在write()方法中调用commitAllPartitions()方法输出数据,其中调用writeIndexFileAndCommit()方法写出数据和索引文件,如下:
defwriteIndexFileAndCommit( shuffleId:Int, mapId:Long, lengths:Array[Long], dataTmp:File):Unit= {// 创建索引文件和临时索引文件valindexFile = getIndexFile(shuffleId, mapId)valindexTmp =Utils.tempFileWith(indexFile)try{// 获取shuffle data filevaldataFile = getDataFile(shuffleId, mapId)// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure// the following check and rename are atomic.// 对于每个executor只有一个IndexShuffleBlockResolver,确保原子性synchronized {// 检查索引是否和数据文件已经有了对应关系valexistingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)if(existingLengths !=null) {// Another attempt for the same task has already written our map outputs successfully,// so just use the existing partition lengths and delete our temporary map outputs.// 如果存在对应关系,说明shuffle write已经完成,删除临时索引文件System.arraycopy(existingLengths,0, lengths,0, lengths.length)if(dataTmp !=null&& dataTmp.exists()) { dataTmp.delete() } }else{// 如果不存在,创建一个BufferedOutputStream// This is the first successful attempt in writing the map outputs for this task,// so override any existing index and data files with the ones we wrote.valout =newDataOutputStream(newBufferedOutputStream(newFileOutputStream(indexTmp)))Utils.tryWithSafeFinally {// We take in lengths of each block, need to convert it to offsets.// 获取每个分区的大小,累加偏移量,写入临时索引文件varoffset =0L out.writeLong(offset)for(length <- lengths) { offset += length out.writeLong(offset) } } { out.close() }// 删除可能存在的其他索引文件if(indexFile.exists()) { indexFile.delete() }// 删除可能存在的其他数据文件if(dataFile.exists()) { dataFile.delete() }// 将临时文件重命名成正式文件if(!indexTmp.renameTo(indexFile)) {thrownewIOException("fail to rename file "+ indexTmp +" to "+ indexFile) }if(dataTmp !=null&& dataTmp.exists() && !dataTmp.renameTo(dataFile)) {thrownewIOException("fail to rename file "+ dataTmp +" to "+ dataFile) } } } }finally{if(indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at${indexTmp.getAbsolutePath}") } }}
龙华大道1号 http://www.kinghill.cn/Dynamics/2106.html