Spark Sort Shuffle Read

Shuffle read 是等到Mapper stage结束后才开始读取数据。边读取数据边处理,数据先放在内存,最后落盘。下面先介绍Shuffle read 的详细过程,然后分析这一个过程的内存使用。

Shuffle read过程

  1. ShuffledRDD的compute()方法
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {  
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]  
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)  
    .read()  
    .asInstanceOf[Iterator[(K, C)]]  
} 

getReader()方法得到HashShuffleReader对象,然后调用HashShuffleReader的read()方法读取前一Stage中ShuffleMapTask生成的数据。

  1. HashShuffleReader的read()方法
override def read(): Iterator[Product2[K, C]] = {  
    val ser = Serializer.getSerializer(dep.serializer)  
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)  
  
    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {  
      if (dep.mapSideCombine) {  
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))  
      } else {  
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))  
      }  
    } else {  
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")  
  
      // Convert the Product2s to pairs since this is what downstream RDDs currently expect  
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))  
    }  
  
    // Sort the output if there is a sort ordering defined.  
    dep.keyOrdering match {  
      case Some(keyOrd: Ordering[K]) =>  
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,  
        // the ExternalSorter won't spill to disk.  
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))  
        sorter.insertAll(aggregatedIter)  
        context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)  
        context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)  
        sorter.iterator  
      case None =>  
        aggregatedIter  
    }  
  }  

读取数据的主要流程:1. 获取待拉取数据的iterator;2. 使用AppendOnlyMap/ExternalAppendOnlyMap 做combine,这个过程和shuffle write一样;3. 如果需要对key排序,则使用ExternalSorter。groupByKey, reduceByKey, aggregateByKey不需要对key排序,sortByKey需要对key排序。下面讲述主要如何得到iterator。

  1. BlockStoreShuffleFetcher的fetch()方法
def fetch[T](  
      shuffleId: Int,  
      reduceId: Int,  
      context: TaskContext,  
      serializer: Serializer)  
    : Iterator[T] =  
  {  
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))  
    val blockManager = SparkEnv.get.blockManager  
  
    val startTime = System.currentTimeMillis  
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)  
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(  
      shuffleId, reduceId, System.currentTimeMillis - startTime))  
  
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]  
    for (((address, size), index) <- statuses.zipWithIndex) {  
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))  
    }  
  
    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {  
      case (address, splits) =>  
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))  
    }  
  
    val blockFetcherItr = new ShuffleBlockFetcherIterator(  
      context,  
      SparkEnv.get.blockManager.shuffleClient,  
      blockManager,  
      blocksByAddress,  
      serializer,  
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility  
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)  
    val itr = blockFetcherItr.flatMap(unpackBlock)  
  
    val completionIter = CompletionIterator[T, Iterator[T]](itr, {  
      context.taskMetrics.updateShuffleReadMetrics()  
    })  
  }  

fetch()方法会得到ShuffleBlockFetcherIterator对象,该对象的initialize()方法将分别从本地和远程读取数据。

  1. ShuffleBlockFetcherIterator的initialize()方法
private[this] def initialize(): Unit = {  
    // Add a task completion callback (called in both success case and failure case) to cleanup.  
    context.addTaskCompletionListener(_ => cleanup())  
  
    // Split local and remote blocks.  
    val remoteRequests = splitLocalRemoteBlocks()  
    // Add the remote requests into our queue in a random order  
    fetchRequests ++= Utils.randomize(remoteRequests)  
  
    // Send out initial requests for blocks, up to our maxBytesInFlight  
    while (fetchRequests.nonEmpty &&  
      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {  
      sendRequest(fetchRequests.dequeue())  
    }  
  
    val numFetches = remoteRequests.size - fetchRequests.size  
    logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))  
  
    // Get Local Blocks  
    fetchLocalBlocks()  
    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))  
  }  

首先区分本地和远程数据block(block代表一个partition),sendRequest()会读取远程数据,fetchLocalBlocks()读取本地数据。出于带宽和内存的考虑,每次sendRequest会从最多5个node去获取数据,获取的数据量由spark.reducer.maxMbInFlight控制。下面着重讲sendRequest()的流程。

  1. 读取远程数据
private[this] def sendRequest(req: FetchRequest) {  
    logDebug("Sending request for %d blocks (%s) from %s".format(  
      req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))  
    bytesInFlight += req.size  
  
    // so we can look up the size of each blockID  
    val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap  
    val blockIds = req.blocks.map(_._1.toString)  
  
    val address = req.address  
    shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,  
      new BlockFetchingListener {  
        override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {  
          // Only add the buffer to results queue if the iterator is not zombie,  
          // i.e. cleanup() has not been called yet.  
          if (!isZombie) {  
            // Increment the ref count because we need to pass this to a different thread.  
            // This needs to be released after use.  
            buf.retain()  
            results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))  
            shuffleMetrics.incRemoteBytesRead(buf.size)  
            shuffleMetrics.incRemoteBlocksFetched(1)  
          }  
          logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))  
        }  
  
        override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {  
          logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)  
          results.put(new FailureFetchResult(BlockId(blockId), e))  
        }  
      }  
    )  
  }  

shuffleClient的fetchBlocks()方法读取远程数据。ShuffleClient有两个子类,分别是ExternalShuffleClient和BlockTransferService,BlockTransferService也有两个子类,分别是NettyBlockTransferService和NioBlockTransferService。Spark 1.5.2中已经将NioBlockTransferService方式设置为deprecated,在后续版本中将被移除。

  1. ExternalShuffleClient的fetchBlocks()方法
public void fetchBlocks(  
    final String host,  
    final int port,  
    final String execId,  
    String[] blockIds,  
    BlockFetchingListener listener) {  
  assert appId != null : "Called before init()";  
  logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);  
  try {  
    RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =  
      new RetryingBlockFetcher.BlockFetchStarter() {  
        @Override  
        public void createAndStart(String[] blockIds, BlockFetchingListener listener)  
            throws IOException {  
          TransportClient client = clientFactory.createClient(host, port);  
          new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();  
        }  
      };  
  
    int maxRetries = conf.maxIORetries();  
    if (maxRetries > 0) {  
      // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's  
      // a bug in this code. We should remove the if statement once we're sure of the stability.  
      new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start();  
    } else {  
      blockFetchStarter.createAndStart(blockIds, listener);  
    }  
  } catch (Exception e) {  
    logger.error("Exception while beginning fetchBlocks", e);  
    for (String blockId : blockIds) {  
      listener.onBlockFetchFailure(blockId, e);  
    }  
  }  
}  

调用OneForOneBlockFetcher的start()方法,一个chunk一个chunk的读取数据。

  1. OneForOneBlockFetcher的start()方法
public void start() {  
    if (blockIds.length == 0) {  
      throw new IllegalArgumentException("Zero-sized blockIds array");  
    }  
  
    client.sendRpc(openMessage.toByteArray(), new RpcResponseCallback() {  
      @Override  
      public void onSuccess(byte[] response) {  
        try {  
          streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteArray(response);  
          logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);  
  
          // Immediately request all chunks -- we expect that the total size of the request is  
          // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].  
          for (int i = 0; i < streamHandle.numChunks; i++) {  
            client.fetchChunk(streamHandle.streamId, i, chunkCallback);  
          }  
        } catch (Exception e) {  
          logger.error("Failed while starting block fetches after success", e);  
          failRemainingBlocks(blockIds, e);  
        }  
      }  
  
      @Override  
      public void onFailure(Throwable e) {  
        logger.error("Failed while starting block fetches", e);  
        failRemainingBlocks(blockIds, e);  
      }  
    });  
  }  

Spark sort shuffle read的内存分析:

ExternalAppendOnlyMap 中作为内存缓冲数据的对象如下:

private var currentMap = new SizeTrackingAppendOnlyMap[K, C]

ExternalSorter中作为内存缓冲数据的对象如下:

private var map = new PartitionedAppendOnlyMap[K, C]

所以在一个worker node 上,一个reducer task占据的主要内存是SizeTrackingAppendOnlyMap + PartitionedAppendOnlyMap。如果一个worker node上有多个reducer task运行(最多为core num个),shuffle read最多可使用的内存大小是

ExecutorHeapMemeory * spark.shuffle.safetyFraction*spark.shuffle.memoryFraction
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容