RDD的 Shuffle 和 分区
- RDD的分区操作
2.Shuffle 的原理
分区的作用
RDD 使用分区来分布式并行处理数据,并且要做到尽量少的在不同的 Executor 之间使用网络交换数据,所以当使用 RDD 读取数据的时候,会尽量在物理上更靠近数据源,比如说在读取 Cassandra 或者 HDFS 中数据的时候,会尽量的保存 RDD 的分区和数据源的分区数,分区模式等一一对应。
分区 和 Shuffle 的关系
分区的主要作用是用来实现并行计算,本质上和 Shuffle 没什么关系,但是往往在进行数据处理的时候, 例如 reduceByKey,groupByKey 等聚合操作,需要把 Key 相同的 Value 拉取到一起进行计算,这个时候因为这些 Key 相同的 Value 可能会坐落于不同的分区,于是理解分区才能理解 Shuffle 的根本原理。
Spark 中的 Shuffle 操作的特点
只有 Key-Value 型的 RDD 才会有 Shuffle 操作,但是有一个特例,就是 repartition 算子可以对任何数据类型 Shuffle。
RDD 的分区操作
查看分区数
scala> sc.parallelize(1 to 100).count
res0: Long = 100
之所以会有 8 个 Tasks, 是因为在启动的时候指定的命令是 spark-shell --master local[8], 这样会生成 1 个 Executors, 这个 Executors 有 8 个 Cores, 所以默认会有 8 个 Tasks, 每个 Cores 对应一个分区, 每个分区对应一个 Tasks, 可以通过 rdd.partitions.size 来查看分区数量。
同时也可以通过 spark-shell 的 WebUI 来查看 Executors 的情况。
默认的分区数量适合 Cores 的数量有关,也可以通过如下三种方式修改或者重新指定分区数量。
创建RDD 时,指定分区数量
scala> val rdd1 = sc.parallelize(1 to 100, 6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd1.partitions.size
res1: Int = 6
scala> val rdd2 = sc.textFile("hdfs:///dataset/wordcount.txt", 6)
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:///dataset/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd2.partitions.size
res2: Int = 7
rdd1 是通过本地集合创建的,传概念的时候通过第二个参数制定了分区的数量,rdd2 是通过读取HDFS 中文件创建的,同样通过第二个参数制定了分区数,因为是从 HDFS 中读取文件,所以最终的分区数是由 Hadoop 的 InputFormat 来指定的,所以比指定的分区数大了一个。
通过 coalesce 算子指定
coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
numPartitions 新生成的 RDD 的分区数
shuffle 是否 Shulle
scala> val source = sc.parallelize(1 to 100, 6)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> source.partitions.size
res0: Int = 6
scala> val noShuffleRdd = source.coalesce(numPartitions=8, shuffle=false)
noShuffleRdd: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:26
scala> noShuffleRdd.toDebugString
res1: String =
(6) CoalescedRDD[1] at coalesce at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
scala> val noShuffleRdd = source.coalesce(numPartitions=8, shuffle=false)
noShuffleRdd: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:26
scala> shuffleRdd.toDebugString
res3: String =
(8) MapPartitionsRDD[5] at coalesce at <console>:26 []
| CoalescedRDD[4] at coalesce at <console>:26 []
| ShuffledRDD[3] at coalesce at <console>:26 []
+-(6) MapPartitionsRDD[2] at coalesce at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
scala> noShuffleRdd.partitions.size
res4: Int = 6
scala> shuffleRdd.partitions.size
res5: Int = 8
RDD 的 Shuffle 是什么?
val sourceRdd = sc.textFile("hdfs://node01:9020/dataset/wordcount.txt")
val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_, 1))
val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)
val result = aggCountRdd.collect
reduceByKey 这个算子本质上就是先按照 Key 分组,后对每一组数据进行 reduce ,所面临的挑战就是 Key 相同的所有数据可能分布在不同的 Partition 分区中,甚至可能在不同的节点中,但是它们必须被共同计算。
为了让来自相同 Key 的所有数据都有 reduceByKey 的同一个 reduce 中处理,需要执行一个 all-to-all 的操作,需要在不同的节点(不同的分区)之间拷贝数据,必须跨分区聚合相同 Key 的所有数据,这个过程叫做 Shuffle。
RDD 的 Shuffle 原理
Spark 的Shuffle 发展大致有两个阶段: Hash base shuffle 和 Sort base shuffle
Hash base shuffle
大致的原理是分桶,假设 Reducer 的个数为 R ,那么每个 Mapper 有 R 个桶,按照 Key 的 Hash 将数据映射到不同的桶中,Reduce 找打每一个 Mapper 中对应自己的桶拉取数据。
假设 Mapper 的个数为 M ,整个集群的文件数量是 M * R ,如果有1000 个 Mapper 和 Reducer,则生成 1000000 个文件,这个量实在是太大了。
过多的文件会导致文件系统打开过多的文件描述符,占用系统资源,所以这种方式并不适合大规模数据的处理,值蛇和中等规模和小规模的数据处理,在Spark 1.2 版本中废弃了这种方式。
Sort base shuffle
对于 Sort base shuffle 来说,,每个 Map 侧的分区只有一个文件,Reduce 侧的 Task 来拉取,大致流程如下
- Map 侧将数据全部放入一个叫做 AppendOnlyMap 的组件中,同时可以在这个特殊的数据结构中做聚合操作.
- 然后通过一个类似于 MergeSort 的排序算法 TimSort 对 AppendOnlyMap 底层的 Array 排序.
先安装 Partition ID 排序,后按照 Key 的HashCode 排序. - 最终每个 Map Task 生成一个 输出文件,Reduce Task 来拉取自己对应的数据
从上面可以得到结论,Sort base shuffle 确实可以大幅减少所产生的中间文件,从而能够更好的应对大吞吐量的场景.
但是需要大家知道的是,Spark 的 Shuffle 算法并不只是这一种,即使是在最新版,也有三种 Shuffle 算法,这三种算法对每个 Map 都只产生一个临时文件,但是产生文件的方式不同,一种是类似于 Hash 的方式,一种是刚才 所说的 Sort ,最后一种是对 Sort 的一种优化 (使用 Unsafe API 直接申请堆外内存)
4 缓存
缓存的意义
缓存的API
缓存级别以及最佳实践
4.1 缓存的意义
使用缓存的原因----多次使用 RDD
需求: 在日志文件中找到访问次数最少的 Ip 和访问次数最多的 IP
######Job 是什么
![image.png](https://upload-images.jianshu.io/upload_images/3838887-2d298ec47193e250.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
###### Job 和 Stage 的关系
![image.png](https://upload-images.jianshu.io/upload_images/3838887-a11d3dfc7b195274.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)