Spark_day04

RDD的 Shuffle 和 分区

Shuffle
  1. RDD的分区操作
    2.Shuffle 的原理
分区的作用

RDD 使用分区来分布式并行处理数据,并且要做到尽量少的在不同的 Executor 之间使用网络交换数据,所以当使用 RDD 读取数据的时候,会尽量在物理上更靠近数据源,比如说在读取 Cassandra 或者 HDFS 中数据的时候,会尽量的保存 RDD 的分区和数据源的分区数,分区模式等一一对应。

分区 和 Shuffle 的关系

分区的主要作用是用来实现并行计算,本质上和 Shuffle 没什么关系,但是往往在进行数据处理的时候, 例如 reduceByKey,groupByKey 等聚合操作,需要把 Key 相同的 Value 拉取到一起进行计算,这个时候因为这些 Key 相同的 Value 可能会坐落于不同的分区,于是理解分区才能理解 Shuffle 的根本原理。

Spark 中的 Shuffle 操作的特点

只有 Key-Value 型的 RDD 才会有 Shuffle 操作,但是有一个特例,就是 repartition 算子可以对任何数据类型 Shuffle。

RDD 的分区操作

查看分区数

scala> sc.parallelize(1 to 100).count
res0: Long = 100
8个Task

之所以会有 8 个 Tasks, 是因为在启动的时候指定的命令是 spark-shell --master local[8], 这样会生成 1 个 Executors, 这个 Executors 有 8 个 Cores, 所以默认会有 8 个 Tasks, 每个 Cores 对应一个分区, 每个分区对应一个 Tasks, 可以通过 rdd.partitions.size 来查看分区数量。


同时也可以通过 spark-shell 的 WebUI 来查看 Executors 的情况。



默认的分区数量适合 Cores 的数量有关,也可以通过如下三种方式修改或者重新指定分区数量。

创建RDD 时,指定分区数量
scala> val rdd1 = sc.parallelize(1 to 100, 6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd1.partitions.size
res1: Int = 6

scala> val rdd2 = sc.textFile("hdfs:///dataset/wordcount.txt", 6)
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:///dataset/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd2.partitions.size
res2: Int = 7

rdd1 是通过本地集合创建的,传概念的时候通过第二个参数制定了分区的数量,rdd2 是通过读取HDFS 中文件创建的,同样通过第二个参数制定了分区数,因为是从 HDFS 中读取文件,所以最终的分区数是由 Hadoop 的 InputFormat 来指定的,所以比指定的分区数大了一个。

通过 coalesce 算子指定
coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

numPartitions 新生成的 RDD 的分区数
shuffle 是否 Shulle

scala> val source = sc.parallelize(1 to 100, 6)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> source.partitions.size
res0: Int = 6

scala> val noShuffleRdd = source.coalesce(numPartitions=8, shuffle=false)
noShuffleRdd: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:26

scala> noShuffleRdd.toDebugString 
res1: String =
(6) CoalescedRDD[1] at coalesce at <console>:26 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

 scala> val noShuffleRdd = source.coalesce(numPartitions=8, shuffle=false)
 noShuffleRdd: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:26

scala> shuffleRdd.toDebugString 
res3: String =
(8) MapPartitionsRDD[5] at coalesce at <console>:26 []
 |  CoalescedRDD[4] at coalesce at <console>:26 []
 |  ShuffledRDD[3] at coalesce at <console>:26 []
 +-(6) MapPartitionsRDD[2] at coalesce at <console>:26 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

scala> noShuffleRdd.partitions.size     
res4: Int = 6

scala> shuffleRdd.partitions.size
res5: Int = 8
RDD 的 Shuffle 是什么?
val sourceRdd = sc.textFile("hdfs://node01:9020/dataset/wordcount.txt")
val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_, 1))
val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)
val result = aggCountRdd.collect

reduceByKey 这个算子本质上就是先按照 Key 分组,后对每一组数据进行 reduce ,所面临的挑战就是 Key 相同的所有数据可能分布在不同的 Partition 分区中,甚至可能在不同的节点中,但是它们必须被共同计算。
为了让来自相同 Key 的所有数据都有 reduceByKey 的同一个 reduce 中处理,需要执行一个 all-to-all 的操作,需要在不同的节点(不同的分区)之间拷贝数据,必须跨分区聚合相同 Key 的所有数据,这个过程叫做 Shuffle。

RDD 的 Shuffle 原理

Spark 的Shuffle 发展大致有两个阶段: Hash base shuffle 和 Sort base shuffle

Hash base shuffle

大致的原理是分桶,假设 Reducer 的个数为 R ,那么每个 Mapper 有 R 个桶,按照 Key 的 Hash 将数据映射到不同的桶中,Reduce 找打每一个 Mapper 中对应自己的桶拉取数据。

假设 Mapper 的个数为 M ,整个集群的文件数量是 M * R ,如果有1000 个 Mapper 和 Reducer,则生成 1000000 个文件,这个量实在是太大了。

过多的文件会导致文件系统打开过多的文件描述符,占用系统资源,所以这种方式并不适合大规模数据的处理,值蛇和中等规模和小规模的数据处理,在Spark 1.2 版本中废弃了这种方式。

Sort base shuffle

对于 Sort base shuffle 来说,,每个 Map 侧的分区只有一个文件,Reduce 侧的 Task 来拉取,大致流程如下

  1. Map 侧将数据全部放入一个叫做 AppendOnlyMap 的组件中,同时可以在这个特殊的数据结构中做聚合操作.
  2. 然后通过一个类似于 MergeSort 的排序算法 TimSort 对 AppendOnlyMap 底层的 Array 排序.
    先安装 Partition ID 排序,后按照 Key 的HashCode 排序.
  3. 最终每个 Map Task 生成一个 输出文件,Reduce Task 来拉取自己对应的数据
    从上面可以得到结论,Sort base shuffle 确实可以大幅减少所产生的中间文件,从而能够更好的应对大吞吐量的场景.
    但是需要大家知道的是,Spark 的 Shuffle 算法并不只是这一种,即使是在最新版,也有三种 Shuffle 算法,这三种算法对每个 Map 都只产生一个临时文件,但是产生文件的方式不同,一种是类似于 Hash 的方式,一种是刚才 所说的 Sort ,最后一种是对 Sort 的一种优化 (使用 Unsafe API 直接申请堆外内存)

4 缓存

缓存的意义
缓存的API
缓存级别以及最佳实践

4.1 缓存的意义

使用缓存的原因----多次使用 RDD
需求: 在日志文件中找到访问次数最少的 Ip 和访问次数最多的 IP

######Job 是什么
![image.png](https://upload-images.jianshu.io/upload_images/3838887-2d298ec47193e250.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

###### Job 和 Stage 的关系
![image.png](https://upload-images.jianshu.io/upload_images/3838887-a11d3dfc7b195274.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)



©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容