【源码剖析】- Spark 新旧内存管理方案(下)

上一篇文章【源码剖析】- Spark 新旧内存管理方案(上)介绍了旧的内存管理方案以及其实现类 StaticMemoryManager 是如何工作的,本文将通过介绍 UnifiedMemoryManager 来介绍新内存管理方案(以下统称为新方案)。

内存总体分布

系统预留

在新方案中,内存依然分为三块,分别是系统预留、用于 storage、用于 execution。其中系统预留大小如下:

val reservedMemory = conf.getLong("spark.testing.reservedMemory",
  if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)

生产环境中使用一般不会设置 spark.testing.reservedMemoryspark.testing,所以我们认为系统预留空间大小置为 RESERVED_SYSTEM_MEMORY_BYTES,即 300M。

execution 和 storage 部分总大小

上一小节这段代码是 UnifiedMemoryManager#getMaxMemory 的一个片段,该方法返回 execution 和 storage 可以共用的总空间,让我们来看看这个方法的具体实现:

  private def getMaxMemory(conf: SparkConf): Long = {
    //< 生产环境中一般不会设置 spark.testing.memory,所以这里认为 systemMemory 大小为 Jvm 最大可用内存
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    //< 系统预留 300M
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = reservedMemory * 1.5
    //< 如果 systemMemory 小于450M,则抛异常
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please use a larger heap size.")
    }
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
    //< 最终 execution 和 storage 的可用内存之和为 (JVM最大可用内存 - 系统预留内存) * spark.memory.fraction
    (usableMemory * memoryFraction).toLong
  }

从以上代码及注释我们可以看出,最终 execution 和 storage 的可用内存之和为 (JVM最大可用内存 - 系统预留内存) * spark.memory.fraction,默认为(JVM 最大可用内存 - 300M)* 0.75。举个例子,如果你为 execution 设置了2G 内存,那么 execution 和 storage 可用的总内存为 (2048-300)*0.75=1311

execution 和 storage 部分默认大小

上一小节搞清了用于 execution 和 storage 的内存之和 maxMemory,那么用于 execution 和 storage 的内存分别为多少呢?看下面三段代码:

object UnifiedMemoryManager 的 apply 方法用来构造类 UnifiedMemoryManager 的实例

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxMemory = maxMemory,
      storageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)
  }

这段代码确定在构造 UnifiedMemoryManager 时:

  • maxMemory 即 execution 和 storage 能共用的内存总和为 getMaxMemory(conf),即 (JVM最大可用内存 - 系统预留内存) * spark.memory.fraction
  • storageRegionSize 为 maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5),在没有设置 spark.memory.storageFraction 的情况下为一半的 maxMemory

那么 storageRegionSize 是干嘛用的呢?继续看 UnifiedMemoryManagerMemoryManager 构造函数:

private[spark] class UnifiedMemoryManager private[memory] (
    conf: SparkConf,
    val maxMemory: Long,
    storageRegionSize: Long,
    numCores: Int)
  extends MemoryManager(
    conf,
    numCores,
    storageRegionSize,
    maxMemory - storageRegionSize)

private[spark] abstract class MemoryManager(
    conf: SparkConf,
    numCores: Int,
    storageMemory: Long,
    onHeapExecutionMemory: Long) extends Logging

我们不难发现:

  • storageRegionSize 就是 storageMemory,大小为 maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5),默认为 maxMemory * 0.5
  • execution 的大小为 maxMemory - storageRegionSize,默认为 maxMemory * 0.5,即默认情况下 storageMemory 和 execution 能用的内存相同,各占一半

互相借用内存

新方案与旧方案最大的不同是:旧方案中 execution 和 storage 可用的内存是固定死的,即使一方内存不够用而另一方有大把空闲内存,空闲一方也无法将结存借给不足一方,这样降造成严重的内存浪费。而新方案解决了这一点,execution 和 storage 之间的内存可以互相借用,大大提供内存利用率,也更好的满足了不同资源侧重的计算的需求

下面便来介绍新方案中内存是如何互相借用的

acquireStorageMemory

先来看看 storage 从 execution 借用内存是如何在分配 storage 内存中发挥作用的

这一过程对应的实现是 UnifiedMemoryManager#acquireStorageMemory,上面的流程图应该说明了是如何 storage 内存及在 storage 内存不足时是如何向 execution 借用内存的

acquireExecutionMemory

该方法是给 execution 给指定 task 分配内存的实现,当 execution pool 内存不足时,会从 storage pool 中借。该方法在某些情况下可能会阻塞直到有足够空闲内存。

在该方法内部定义了两个函数:

  • maybeGrowExecutionPool:会释放storage中保存的数据,减小storage部分内存大小,从而增大Execution部分
  • computeMaxExecutionPoolSize:计算在 storage 释放内存借给 execution 后,execution 部分的内存大小

在定义了这两个方法后,直接调用 ExecutionMemoryPool#acquireMemory 方法,acquireMemory方法会一直处理该 task 的请求,直到分配到足够内存或系统判断无法满足该请求为止。acquireMemory 方法内部有一个死循环,循环内部逻辑如下:

从上面的流程图中,我们可以知道当 execution pool 要为某个 task 分配内存并且内存不足时,会从 storage pool 中借用内存,能借用的最大 size 为 storage 的空闲内存+之前 storage 从 execution 借走的内存。这与 storage 从 execution 借用内存不同,storage 只能从 execution 借走空闲的内存,不能借走 execution 中已在使用的从 storage 借来的内存,源码中的解释是如果要这么做实现太过复杂,暂时不支持。

以上过程分析的是memoryMode 为 ON_HEAP 的情况,如果是 OFF_HEAP,则直接从 offHeapExecution 内存池中分配,本文重点不在此,故不展开分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容