Spark中RangePartitioner的实现机制分析

一.分区器的区别

  • HashPartitioner分区可能HashPartitioner导致每个分区中数据量的不均匀。
  • RangePartitioner分区尽量保证每个分区中数据量的均匀,将一定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。

二.RangePartitioner分区执行原理概述

1.计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1e6的样本的数据量。

2.根据sampleSize和分区数量计算每个分区的数据抽样样本数量最大值sampleSizePrePartition。

3.根据以上两个值进行水塘抽样,返回RDD的总数据量,分区中总元素的个数和每个分区的采样数据。

4.计算出数据量较大的分区通过RDD.sample进行重新抽样。

5.通过抽样数组 candidates: ArrayBuffer[(K, wiegth)]计算出分区边界的数组BoundsArray

6.在取数据时,如果分区数小于128则直接获取,如果大于128则通过二分法,获取当前Key属于那个区间,返回对应的BoundsArray下标即为partitionsID。

源码分析可参考以下几篇博客

下面只对RanagePartitioner的核心机制进行分析总结。

三.RangePartitioner的实现机制

1.在总数不知道的情况下如何等概率地从中抽取N行?

类比水塘抽样法,该方法可以解决在总数不知道的情况下如何等概率地从中抽取一行数据
定义取出的行号为choice,第一次直接以第一行作为取出行 choice ,而后第二次以二分之一概率决定是否用第二行替换 choice ,第三次以三分之一的概率决定是否以第三行替换 choice ……,以此类推。

由上面的分析我们可以得出结论,在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数,算法结束。

解决方案:在RangePartition中如何实现在总数不知道的情况下如何等概率地从中抽取N行数据:

采样算法是RangePartitioner分区的核心,其内部使用的就是水塘抽样,而这个抽样特别适合那种总数很大而且未知,并无法将所有的数据全部存放到主内存中的情况。也就是我们不需要事先知道RDD中元素的个数。

def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // 把前K个元素放入到数组reservoir中,k为设置的每个分区的样本数,及sampleSizePerPartition
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // 如果分区记录数少于设置的分区样本数,则直接返回
    // 否则使用迭代器,每次迭代出的数据,为其生成一个0至 l 的随机数,如果随机数小于K,则把reservoir数组中的对应记录替换
    if (i < k) {
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      var l = i.toLong
      val rand = new XORShiftRandom(seed)
      while (input.hasNext) {
        val item = input.next()
       // l 的值不断迭代的
        l += 1
        val replacementIndex = (rand.nextDouble() * l).toLong
         //如果随机数小于K,则把reservoir数组中的对应记录替换
        if (replacementIndex < k) {
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }

分析
RangePartition中水塘抽样的理解,首先从池塘中先从前往后取出足够的样本数据,暂且称这批数据为旧数据,之后对未遍历的数据进行遍历,此次i值为该分区元素的第 i 条记录,i = k。

之后 i 值不断的迭代,然后取出(0,i)的随机数,如果该随机数小于k则,进行替换,保留第 i 个数,随着不断的迭代,i 值是一定是逐渐变大的,k 值是不变的,所以取出的新数据替换旧数据的几率就越来越小,例如,一开始,i = k,随机值小于 k 的概率为 (k-1)/k,迭代 n 次之后,该几率为 (k-1)/k+n。

用伪代码表示如下所示

从S中抽取首k项放入「水塘」中
对于每一个S[j]项(j ≥ k):
   随机产生一个范围0到j的整数r
   若 r < k 则把水塘中的第r项换成S[j]项
2.计算样本权重,对数据多的分区再进行抽样

父RDD各分区中的数据量可能不均匀,在极端情况下,有些分区内的数据量会占有整个RDD的绝大多数的数据,如果按照水塘抽样进行采样,会导致该分区所采样的数据量不足,因此需要对取样数不足的分区还需要重新进行采样。

通过(采样因子*分区记录数)得到每个分区应采样本数。

如果fraction * 分区内记录数 > sampleSizePerPartition,则该分区会再进行一次抽样,否则计算权重weight为 1 /(总样本数/总记录总数),因为sample中的比例就是(总样本数/总记录总数)。

如果fraction * 分区内记录数 < sampleSizePerPartition,权重weight 为(分区总数 / 采样总数),为该分区的取出的样本的真实权重,可能会比平均权重大,因为有可能在上面的reservoirSampleAndCount水塘抽样中采样总数已经达到了该分区记录数的最大值。

// 计算总样本数量和总记录数的占比,占比最大为1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        // 保存样本数据的集合buffer
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 保存数据分布不均衡的分区id(数据量超过fraction比率的分区)
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 计算抽取出来的样本数据
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽象数据大小,
            // 那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取
            imbalancedPartitions += idx
          } else {
            // 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }

        // 对于数据分布不均衡的RDD分区,重新进行数据抽样
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 获取数据分布不均衡的RDD分区,并构成RDD
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          // 随机种子
          val seed = byteswap32(-rdd.id - 1)
          // 利用rdd的sample抽样函数API进行数据抽样
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }

        // 将最终的抽样数据计算出rangeBounds出来
        RangePartitioner.determineBounds(candidates, partitions)
3.根据样本权重解决分区边界问题

先将candidate(Array[(key, weight)])按照key排序,计算总权重sumWeights,除以分区数,得到每个分区的平均权重step,接下来while循环遍历已排序的candidate,累加其权重cumWeight,每当累加权重达到一个分区的平均权重step,就获取一个key作为分区间隔符,最后返回所有获取到的分隔符,determineBounds执行完毕,也就返回了变量rangeBounds作为每个分区边界的key的集合。

def determineBounds[K: Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    // 按照数据进行数据排序,默认升序排列
    val ordered = candidates.sortBy(_._1)
    // 获取总的样本数量大小
    val numCandidates = ordered.size
    // 计算总的权重大小
    val sumWeights = ordered.map(_._2.toDouble).sum
    // 计算步长
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      // 获取排序后的第i个数据及权重
      val (key, weight) = ordered(i)
      // 累计权重
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.
        // 权重已经达到一个步长的范围,计算出一个分区id的值
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          // 上一个边界值为空,或者当前边界key数据大于上一个边界的值,那么当前key有效,进行计算
          // 添加当前key到边界集合中
          bounds += key
          // 累计target步长界限
          target += step
          // 分区数量加1
          j += 1
          // 上一个边界的值重置为当前边界的值
          previousBound = Some(key)
        }
      }
      i += 1
    }
    // 返回结果
    bounds.toArray
  }
4.由rangeBounds计算分区数和key属于哪个分区

rangeBounds少于128,直接遍历比较key和分隔符,得到PartitionId,否则使用二分查找,并做了边界条件的判断,最后,根据升序还是降序返回PartitionId。

5.父RDD中每个分区采样样本数的确定
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

父RDD每个分区需要采样的数据量是正常数的3倍。

因为父RDD各分区中的数据量可能会出现倾斜的情况,乘于3的目的就是保证数据量小的分区能够采样到足够的数据,而对于数据量大的分区会进行第二次采样。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,393评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,790评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,391评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,703评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,613评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,003评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,507评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,158评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,300评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,256评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,274评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,984评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,569评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,662评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,899评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,268评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,840评论 2 339

推荐阅读更多精彩内容