Spark core Insight
1.深入理解 RDD 的内在逻辑
- 能够使用 RDD 的 算子
- 理解 RDD 算子的 Shuffle 和 缓存
- 理解 RDD 整体的使用流程
- 理解 RDD 的调度原理
- 理解 Spark 中常见的分布式变量共享方式
1. 深入 RDD
- 深入理解RDD 的内在逻辑,以及 RDD 的内部属性( RDD 由什么组成)
1.1 案例
需求
- 给定一个网站的访问记录,俗称 Access log ( 需要这份数据的可以私聊我发给你.)
计算其中出现的独立 IP ,以及其访问的次数
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
val result = sc.textFile("dataset/access_log_sample.txt")
.map(item => (item.split(" ")(0), 1))
.filter(item => StringUtils.isNotBlank(item._1))
.reduceByKey((curr, agg) => curr + agg)
.sortBy(item => item._2, false)
.take(10)
result.foreach(item => println(item))
- 针对这个小案例,我们问出了五个问题?
1. 假设要针对整个网站的历史数据进行处理,数据量有1T,如何处理?
- 放在集群中,利用集群多台计算机来并行处理数据.
2. 如何放在集群中运行?
简单来讲,并行计算就是同时使用多个计算资源解决同一个问题, 有四个要点:
- 要解决的问题必须可以分解为多个可以并发计算的部分
- 每个部分要可以在不同处理器上被同时执行
- 需要一个共享内存的机制
- 需要一个总体上的协作机制来进行资源调度
3. 如果放在集群中的话,可能要对整个计算任务进行分解, 那么应该如何分解任务呢?
概述:
- 对于 HDFS 中的文件,是分为不同的 Block 的
- 在进行计算的时候,就可以按照 Block 来划分,每一个 Block 对应着一个不同的计算单元
扩展
- RDD 并没有真实的存放数据,数据是从 HDFS 中读取的,在计算的过程中读取即可。
- RDD 至少是需要可以分片的,因为 HDFS 中的文件就是分片的, RDD 分片的意义在于表示对源数据集每个分片的计算, RDD 可以分片,也就意味着 可以并行计算。
移动数据不如移动计算,这是一个基础的优化,如何做到?
- 小标题的意思是说,早大数据的情况下,移动数据到另外一台机器上会涉及到带宽等问题,所以基本上应该减少数据量的传输过程,也就是数据尽量就在本台机器上面进行计算。
- 每一个计算单元需要记录其存储单元的位置,尽量调度过去
5. 在集群中运行,需要很多节点之间配合,出错的概率也更高,出错了怎么办?
假设 RDD1 RDD2 RDD3 在转换的过程中,RDD2 出错了,有两种办法可以解决:
- 缓存 RDD2 的数据,直接回复 RDD2 ,类似 HDFS 上的备份机制。
- 记录 RDD2 的依赖关系,通过其父类的 RDD 来回复 RDD2 ,这种方式会减少很多的数据交互和保存的磁盘空间。
如何通过父级 RDD 来 恢复?
- 记录 RDD2 的父亲是 RDD1
- 记录 RDD2 的计算函数, 例如 记录(下面就是一个计算函数):
RDD2 = RDD1.map(…), map(…)
- 当 RDD2 计算出错的时候, 可以通过 父级 RDD 和计算函数来恢复 RDD2
6. 假如任务特别的复杂,流程特别的长,有很多的 RDD 之间的依赖关系,如何优化呢?
- 上面提到了可以使用依赖关系来进行容错,但是如果依赖关系特别长的时候,这种方式其实也比较低效,这个时候就应该使用另外一种方式,就是记录数据集的状态。
在 RDD 中有两个手段可以做到
- 缓存
- CheckPoint
1.2 再谈 RDD
- 理解 RDD 为什么会出现
- 理解 RDD 的主要特点
- 理解 RDD 的五大属性
1.2.1 RDD 为什么会出现?
在 RDD 出现之前,当时 MapReduce 是比较主流的,而 MapReduce 如何执行迭代计算的任务呢?
- 多个 MapReduce 任务之间没有基于 内存的数据共享方式,只能通过磁盘来进行共享
- 因为涉及到了磁盘的I/O读写共享,这种方式效率明显低下。
RDD 如何解决迭代计算非常低效的问题的呢?
- 在 Spark 中,其实最终 Job3 从逻辑上的计算过程是:
Job3 = (Job1.map).filter
- 整个过程是共享内存的,而不需要将中间结果存放在可靠的分布式文件系统中。
- 这种方式可以在保证容错的前提下,提供更多的灵活的,更快的执行速度, RDD 在执行迭代型任务的时候的表现可以通过下面这个代码体现。
// 线性回归
val points = sc.textFile(...)
.map(...)
.persist(...)
val w = randomValue
for (i <- 1 to 10000) {
val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
.reduce(_ + _)
w -= gradient
}
- 在这个例子中,进行了大致 10000 次数的迭代,如果在 MapReduce 中实现的话,可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果,那么速度明显会比 RDD 慢很多很多。
1.2.3 RDD 的特点
RDD 不仅是数据集,也是编程模型
-
RDD 既是一种数据结构,同时也提供了上层的 API , 同时 RDD 的 API 和 Scala 中对集合运算 API 非常的相似了,同时也有各种 算子。
RDD 的算子大致分为两类:
- Transformation(转换操作) : 例如 map , flatMap , filter 等
- Action( 动作操作)例如: reduce ,collect , show 等
- 执行 RDD 的时候,在执行到Transformation(转换)操作的时候,并不会立刻执行,知道遇见了 Action (动作)操作的时候,才是真正的触发执行,这个点特叫做 : 惰性求值
RDD 可以分区
- RDD 是一个分布式计算框架,所以,一定要能过进行分区计算的,只有能分区了,才能利用集群的并行计算能力。
- 同时,RDD 不需要始终被 具体化,也就是说: RDD 中可以没有数据 ,只要有足够的信息知道自己是从谁计算得来的就可以了,这是一种非常高效的容错方式
RDD 是只读的
- RDD 是 只读的,不允许任何形式的修改,虽说不能因为 RDD 和 HDFS 都是只读的,就认为分布式存储系统必须设计为 只读的,但是设计为 只读的,会显著降低问题的复杂度,因为 RDD 需要可以容错, 可以惰性求值, 可以移动计算, 所以很难支持修改.
- RDD2 中可能没有数据, 只是保留了依赖关系和计算函数, 那修改啥?
- 如果因为支持修改, 而必须保存数据的话, 怎么容错?
- 如果允许修改, 如何定位要修改的那一行? RDD 的转换是粗粒度的, 也就是说, RDD 并不感知具体每一行在哪.
RDD 是可以容错的
RDD 的容错有两种方式
- 保存 RDD 之间的依赖关系, 以及计算函数, 出现错误重新计算
- 直接将 RDD 的数据存放在外部存储系统, 出现错误直接读取, Checkpoint
1.2.3 什么叫做弹性分布式数据集
分布式
- RDD 支持分区, 可以运行在集群中
弹性
- RDD 支持高效的容错
- RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中
数据集
- RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数
- RDD 也可以缓存起来, 相当于存储具体数据
总结: RDD 的 五大属性
首先整理一下上面所提到的 RDD 所要实现的功能:
- RDD 有分区
- RDD 要可以通过依赖关系和计算函数进行容错
- RDD 要针对数据本地性进行优化
- RDD 支持 MapReduce 形式的计算, 所以要能够对数据进行 Shuffled
对于 RDD 来说, 其中应该有什么内容呢? 如果站在 RDD 设计者的角度上, 这个类中, 至少需要什么属性?
- Partition List 分片列表, 记录 RDD 的分片, 可以在创建 RDD 的时候指定分区数目, 也可以通过算子来生成新的 RDD 从而改变分区数目
- Compute Function 为了实现容错, 需要记录 RDD 之间转换所执行的计算函数
- RDD Dependencies RDD 之间的依赖关系, 要在 RDD 中记录其上级 RDD 是谁, 从而实现容错和计算
- Partitioner 为了执行 Shuffled 操作, 必须要有一个函数用来计算数据应该发往哪个分区
- Preferred Location 优先位置, 为了实现数据本地性操作, 从而移动计算而不是移动存储, 需要记录每个 RDD 分区最好应该放置在什么位置
2. RDD 的 算子
1.理解 RDD 的算子分类, 以及其特性
2.理解常见算子的使用
分类
RDD 中的算子从功能上分为两大类
- Transformation(转换) 它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD
- Action(动作) 执行各个分区的计算任务, 将的到的结果返回到 Driver 中
RDD 中可以存放各种类型的数据, 那么对于不同类型的数据, RDD 又可以分为三类
- 针对基础类型(例如 String)处理的普通算子
- 针对 Key-Value 数据处理的 byKey 算子
- 针对数字类型数据处理的计算算子
特点
- Spark 中所有的 Transformations 是 Lazy(惰性) 的, 它们不会立即执行获得结果. 相反, 它们只会记录在数据集上要应用的操作. 只有当需要返回结果给 Driver 时, 才会执行这些操作, 通过 DAGScheduler ,TaskScheduler 分发到集群中运行, 这个特性叫做 惰性求值
- 默认情况下, 每一个 Action 运行的时候, 其所关联的所有 Transformation RDD 都会重新计算, 但是也可以使用 presist 方法将 RDD 持久化到磁盘或者内存中. 这个时候为了下次可以更快的访问, 会把数据保存到集群上.
2.1 Transformations 算子
map(T ⇒ U)
.map( num => num * 10 )
.collect()
作用
- 把 RDD 中的数据 一对一 的转为另一种形式
签名
def map[U: ClassTag](f: T ⇒ U): RDD[U]
参数
f → Map 算子是 原RDD → 新RDD 的过程, 传入函数的参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据
注意点
- Map 是一对一, 如果函数是 String → Array[String] 则新的 RDD 中每条数据就是一个数组
flatMap(T ⇒ List[U])
sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
.flatMap( line => line.split(" ") )
.collect()
作用
- FlatMap 算子和 Map 算子类似, 但是 FlatMap 是一对多
- 就是说,我们的一个“hello lily" 字符串变成了 Hello ,lily 两个了。和map 不同的是,map 只是在原先的上面进行修改。
调用
def flatMap[U: ClassTag](f: T ⇒ List[U]): RDD[U]
参数
f → 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDD
注意点
- flatMap 其实是两个操作, 是 map + flatten, 也就是先转换, 后把转换而来的 List 展开
- Spark 中并没有直接展平 RDD 中数组的算子, 可以使用 flatMap 做这件事
filter(T ⇒ Boolean)
sc.parallelize(Seq(1, 2, 3))
.filter( value => value >= 3 )
.collect()
作用
- Filter 算子的主要作用是过滤掉不需要的内容,也就是说保留符合条件的内容,为true ,则保留。
mapPartitions(List[T] ⇒ List[U])
- RDD[T] ⇒ RDD[U] 和 map 类似, 但是针对整个分区的数据转换
mapPartitionsWithIndex
- 和 mapPartitions 类似, 只是在函数中增加了分区的 Index
mapValues
sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
.mapValues( value => value * 10 )
.collect()
作用
- MapValues 只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value
sample(withReplacement, fraction, seed)
sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.sample(withReplacement = true, 0.6, 2)
.collect()
作用
- Sample 算子可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失
参数
- Sample 接受第一个参数为 withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说, 如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复
- Sample 接受第二个参数为 fraction, 意为抽样的比例
- Sample 接受第三个参数为 seed, 随机数种子, 用于 Sample 内部随机生成下标, 一般不指定, 使用默认值
union(other)并集
val rdd1 = sc.parallelize(Seq(1, 2, 3))
val rdd2 = sc.parallelize(Seq(4, 5, 6))
rdd1.union(rdd2)
.collect()
intersection(other)
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))
rdd1.intersection(rdd2)
.collect()
作用
- Intersection 算子是一个集合操作, 用于求得 左侧集合 和 右侧集合 的交集, 换句话说, 就是左侧集合和右侧集合都有的元素, 并生成一个新的 RDD
subtract(other, numPartitions)(差集)
- 可以设置分区数
distinct(numPartitions)(去重)
sc.parallelize(Seq(1, 1, 2, 2, 3))
.distinct()
.collect()
作用
- Distinct 算子用于去重
注意点
- Distinct 是一个需要 Shuffled 的操作
- 本质上 Distinct 就是一个 reductByKey, 把重复的合并为一个
reduceByKey((V, V) ⇒ V, numPartition)
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.reduceByKey( (curr, agg) => curr + agg )
.collect()
作用
首先按照 Key 分组生成一个 Tuple, 然后针对每个组执行 reduce 算子
调用
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
参数
func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果
注意点
ReduceByKey 只能作用于 Key-Value 型数据, Key-Value 型数据在当前语境中特指 Tuple2
ReduceByKey 是一个需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少
groupByKey()
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.groupByKey()
.collect()
作用
- GroupByKey 算子的主要作用是按照 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value
注意点
- GroupByKey 是一个 Shuffled
- GroupByKey 和 ReduceByKey 不同, 因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好
combineByKey()
val rdd = sc.parallelize(Seq(
("zhangsan", 99.0),
("zhangsan", 96.0),
("lisi", 97.0),
("lisi", 98.0),
("zhangsan", 97.0))
)
val combineRdd = rdd.combineByKey(
score => (score, 1),
(scoreCount: (Double, Int),newScore) => (scoreCount._1 + newScore, scoreCount._2 + 1),
(scoreCount1: (Double, Int), scoreCount2: (Double, Int)) =>
(scoreCount1._1 + scoreCount2._1, scoreCount1._2 + scoreCount2._2)
)
val meanRdd = combineRdd.map(score => (score._1, score._2._1 / score._2._2))
meanRdd.collect()
作用
- 对数据集按照 Key 进行聚合
调用
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner],[serializer])
参数
- createCombiner 将 Value 进行初步转换
- mergeValue 在每个分区把上一步转换的结果聚合
- mergeCombiners 在所有分区上把每个分区的聚合结果聚合
- partitioner 可选, 分区函数
- mapSideCombiner 可选, 是否在 Map 端 Combine
- serializer 序列化器
注意点
- combineByKey 的要点就是三个函数的意义要理解
- groupByKey, reduceByKey 的底层都是 combineByKey
aggregateByKey()
val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
val result = rdd.aggregateByKey(0.8)(
seqOp = (zero, price) => price * zero,
combOp = (curr, agg) => curr + agg
).collect()
println(result)
作用
聚合所有 Key 相同的 Value, 换句话说, 按照 Key 聚合 Value
调用
rdd.aggregateByKey(zeroValue)(seqOp, combOp)
参数
zeroValue 初始值
seqOp 转换每一个值的函数
comboOp 将转换过的值聚合的函数
注意点 为什么需要两个函数?
aggregateByKey 运行将一个 RDD[(K, V)] 聚合为 RDD[(K, U)], 如果要做到这件事的话, 就需要先对数据做一次转换, 将每条数据从 V 转为 U, seqOp 就是干这件事的 当 seqOp 的事情结束以后, comboOp 把其结果聚合
和 reduceByKey 的区别
- aggregateByKey 最终聚合结果的类型和传入的初始值类型保持一致
- reduceByKey 在集合中选取第一个值作为初始值, 并且聚合过的数据类型不能改变
foldByKey(zeroValue)((V, V) ⇒ V)
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.foldByKey(zeroValue = 10)( (curr, agg) => curr + agg )
.collect()
作用
和 ReduceByKey 是一样的, 都是按照 Key 做分组去求聚合, 但是 FoldByKey 的不同点在于可以指定初始值
调用
foldByKey(zeroValue)(func)
参数
zeroValue 初始值
func seqOp 和 combOp 相同, 都是这个参数
注意点
FoldByKey 是 AggregateByKey 的简化版本, seqOp 和 combOp 是同一个函数
FoldByKey 指定的初始值作用于每一个 Value
join(other, numPartitions)
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))
rdd1.join(rdd2).collect()
作用
将两个 RDD 按照相同的 Key 进行连接
调用
join(other, [partitioner or numPartitions])
参数
other 其它 RDD
partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数量来改变分区
注意点
- Join 有点类似于 SQL 中的内连接, 只会再结果中包含能够连接到的 Key
- Join 的结果是一个笛卡尔积形式, 例如 "a", 1), ("a", 2 和 "a", 10), ("a", 11 的 Join 结果集是 "a", 1, 10), ("a", 1, 11), ("a", 2, 10), ("a", 2, 11
cogroup(other, numPartitions)
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 5), ("b", 2), ("b", 6), ("c", 3), ("d", 2)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("b", 1), ("d", 3)))
val rdd3 = sc.parallelize(Seq(("b", 10), ("a", 1)))
val result1 = rdd1.cogroup(rdd2).collect()
val result2 = rdd1.cogroup(rdd2, rdd3).collect()
/*
执行结果:
Array(
(d,(CompactBuffer(2),CompactBuffer(3))),
(a,(CompactBuffer(1, 2, 5),CompactBuffer(10))),
(b,(CompactBuffer(2, 6),CompactBuffer(1))),
(c,(CompactBuffer(3),CompactBuffer()))
)
*/
println(result1)
/*
执行结果:
Array(
(d,(CompactBuffer(2),CompactBuffer(3),CompactBuffer())),
(a,(CompactBuffer(1, 2, 5),CompactBuffer(10),CompactBuffer(1))),
(b,(CompactBuffer(2, 6),CompactBuffer(1),Co...
*/
println(result2)
作用
多个 RDD 协同分组, 将多个 RDD 中 Key 相同的 Value 分组
调用
cogroup(rdd1, rdd2, rdd3, [partitioner or numPartitions])
参数
rdd… 最多可以传三个 RDD 进去, 加上调用者, 可以为四个 RDD 协同分组
partitioner or numPartitions 可选, 可以通过传递分区函数或者分区数来改变分区
注意点
对 RDD1, RDD2, RDD3 进行 cogroup, 结果中就一定会有三个 List, 如果没有 Value 则是空 List, 这一点类似于 SQL 的全连接, 返回所有结果, 即使没有关联上
CoGroup 是一个需要 Shuffled 的操作
cartesian(other)
(RDD[T], RDD[U]) ⇒ RDD[(T, U)]
- 生成两个 RDD 的笛卡尔积
sortBy(ascending, numPartitions)
val rdd1 = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val sortByResult = rdd1.sortBy( item => item._2 ).collect()
val sortByKeyResult = rdd1.sortByKey().collect()
println(sortByResult)
println(sortByKeyResult)
作用
排序相关相关的算子有两个, 一个是 sortBy, 另外一个是 sortByKey
调用
sortBy(func, ascending, numPartitions)
参数
func 通过这个函数返回要排序的字段
ascending 是否升序
numPartitions 分区数
注意点
普通的 RDD 没有 sortByKey, 只有 Key-Value 的 RDD 才有
sortBy 可以指定按照哪个字段来排序, sortByKey 直接按照 Key 来排序
partitionBy(partitioner)
- 使用用传入的 partitioner 重新分区, 如果和当前分区函数相同, 则忽略操作
coalesce(numPartitions)
- 减少分区数
val rdd = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val oldNum = rdd.partitions.length
val coalesceRdd = rdd.coalesce(4, shuffle = true)
val coalesceNum = coalesceRdd.partitions.length
val repartitionRdd = rdd.repartition(4)
val repartitionNum = repartitionRdd.partitions.length
print(oldNum, coalesceNum, repartitionNum)
作用
一般涉及到分区操作的算子常见的有两个, repartitioin 和 coalesce, 两个算子都可以调大或者调小分区数量
调用
repartitioin(numPartitions)
coalesce(numPartitions, shuffle)
参数
numPartitions 新的分区数
shuffle 是否 shuffle, 如果新的分区数量比原分区数大, 必须 Shuffled, 否则重分区无效
注意点
repartition 和 coalesce 的不同就在于 coalesce 可以控制是否 Shuffle
repartition 是一个 Shuffled 操作
repartition(numPartitions)
- 重新分区
repartitionAndSortWithinPartitions
- 重新分区的同时升序排序, 在 partitioner 中排序, 比先重分区再排序要效率高, 建议使用在需要分区后再排序的场景使用
常见的 Transformation 类型的 RDD
map
flatMap
filter
groupBy
reduceByKey
常见的 Action 类型的 RDD
collect
countByKey
reduce