为什么需要调优
在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。
Spark的性能调优由开发调优、资源调优、数据倾斜调优、shuffle调优几个部分组成。开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;数据倾斜调优,主要用一套完整的用来解决Spark作业数据倾斜的解决方案;shuffle调优,面向的是对Spark的原理有较深层次掌握的开发者。
性能优化学习
学习Spark开发调优和资源调优比较好的方式是参考美团点评技术团队的技术博客Spark性能调优-基础篇,这里已经写得非常全面了,学习完就可以掌握Spark性能调优的基础部分了。总体可以分为两个方面:
- 开发调优
Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。- 避免创建重复的RDD
- 尽可能复用同一个RDD
- 对多次使用的RDD进行持久化
- 尽量避免使用shuffle类算子
- 使用map-side预聚合的shuffle操作
- 使用高性能的算子
- 广播大变量
- 使用Kryo优化序列化性能
- 优化数据结构
- 资源调优
在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。调整的主要是一系列的资源相关参数。
所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。- num-executors
- executor-memory
- executor-cores
- driver-memory
- spark.default.parallelism
- spark.storage.memoryFraction
- spark.shuffle.memoryFraction
性能优化实践
以MovieLens数据集为基础,完成Spark的Map-Side Join和Reduce Side Join例子(过滤出评分高于4.0分的电影,要求显示电影ID 电影名称 电影分数),并比较性能优劣。应该如何调整不同的spark-submit参数获得最佳效果(运行时间),并给出基于目前的运行环境最优参数设置方案。
查看数据
简单查看一下所有表的结构才能完成目标任务。
所有评级都包含在“ratings.dat”文件中,并且位于格式如下:
用户名 | MovieID | 评级 | 时间戳 |
---|
- UserID的范围在1到6040之间
- MovieID的范围在1到3952之间
- 评级为5星级(仅限全星评级)
- 时间戳以秒为单位表示
- 每个用户至少有20个评级
用户信息位于“users.dat”文件中,如下所示
用户名 | 性别 | 年龄 | 职业 | 邮政编码 |
---|
性别用男性表示“M”,女性表示“F”表示
-
年龄选自以下范围:
- 1:“18岁以下”
- 18:“18-24”
- 25:“25-34”
- 35:“35-44”
- 45:“45-49”
- 50:“50-55”
- 56:“56+”
职业选自0-20的数字,分别代表不同意义(具体意义可查看官网)
电影信息位于文件“movies.dat”中,如下所示
MovieID | 标题 | 流派 |
---|
- 流派是安装分隔符(|)分开的,关键字符的拼接,如(Animation|Children's|Comedy)
- 由于意外重复,某些MovieID与电影不对应
条目和/或测试条目 - 电影大多是手动输入的,因此可能存在错误和不一致
所以任务就是把3个表连接起来并按条件过滤,但是不同的连接方式在性能上会出现极大的差距。
Reduce Side Join
- 当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。
- reduce-side-join 的缺陷在于会将key相同的数据发送到同一个partition中进行运算,大数据集的传输需要长时间的IO,同时任务并发度收到限制,还可能造成数据倾斜。
- Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE]
简单的讲就是先把集群上key相同的数据拉取到一个节点(shuffle操作),为每个key的数据创建一个task进行join连接操作,然后再把每个key连接的结果进行汇总。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object ReduceJoin {
def main(args: Array[String]){
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ratingRDD = sc.textFile(args(0)) //rating.dat表
val moviesRDD = sc.textFile(args(1)) //movies.dat表
val startTime = System.currentTimeMillis()//开始时间
val ratingPair = ratingRDD.map { x => //将数据转化为(K,V),K为movieID,V为平均分,直接join
val temp = x.split("::") //按照原始数据格式拆分为RDD格式
(temp(1),(temp(2).toFloat,1))
}.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
x._1/x._2 //通过每部电影总分和评论的人数计算出平均分
).filter(x => x._2.toFloat >= 4.0)//过滤出分数高于4分的电影
val moviePair = moviesRDD.map { x => //将它的类型转化为(K,V),K为movieID,方便join操作
val temp = x.split("::")
(temp(0),temp(1))
}
//println(ratingPair.count()) //查看rating表是否成功过滤掉4.0以下的电影
//根据key(movieID)进行连接,并将数据从KV形式格式化为原始格式
val result = moviePair.join(ratingPair).map(x => (x._1,x._2._1,x._2._2))
result.saveAsTextFile(args(2))
val endTime = System.currentTimeMillis()//结束时间
println("运行时间(秒)"+(endTime-startTime)*0.001)
}
}
然后编写相应的运行脚本,这里submit的时间随便使用最简单的几个参数,因为目的是对比map-side join和reduce-side join性能上的差异。
#!/bin/bash
hdfs dfs -rm -r /tmp/result
spark-submit --class ReduceJoin --master yarn-cluster /usr/tmp/untitled.jar /tmp/input/ratings.dat /tmp/input/movies.dat /tmp/result
提交任务后就可以去master的8088端口查看spark任务的执行情况了,18088端口查看执行记录和详细过程,看到Reduce-side join任务执行情况如下:
[图片上传中...(image.png-94aeb0-1535712775641-0)]
这里首先需要明白job,stage,task的概念。简单的讲,我们提交一个作业到spark,spark首先根据提交作业中的action算子将作业分为若干个job。
之后对于每个job而言,Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。
每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。
这里只有一个job是因为只有一个action算子(savaAsTextFile),3个stage是因为reduceByKey属于shuffle算子,还有未经协同划分的join也属于shuffle算子,一起将job分成了3个stage,每个stage的2个task是因为RDD数据被存在了两台机器上。通过时间统计可以看到stage1是最消耗时间的,因为它要执行reduceByKey的shuffle操作,会把key相同的数据集中到一个节点,在这个时候数据是整个评论数据集。而后面求平均后过滤再join的时候数据已经变得不是那么多了,所以这里的shuffle相对消耗时间较少(网络,IO少)。
Map-Side Join
- Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
- 在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。
- 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。
简单的讲就是把需要join的数据集中较小的那个数据集进行广播(因为在分布式系统应用中,存储数据都是用RDD对象,每个RDD对象中的数据都被划分为多个分区,每个节点都只持有部分分区,也就是数据集的一部分,而广播就是让每个节点都持有被广播数据的完整信息),然后在每个节点上(map端操作)将自己节点上持有的部分数据和被广播的表进行连接即可。但是需要注意,因为那个小的数据集要被广播,所以要求每个节点的内存必须足够存储被广播的那个数据集,不然就不能进行map-side-join。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object MapJoin {
def main(args: Array[String]){
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ratingRDD = sc.textFile(args(0)) //rating.dat表
val moviesRDD = sc.textFile(args(1)) //movies.dat表
val startTime = System.currentTimeMillis()//开始时间
val ratingPair = ratingRDD.map { x => //将数据转化为(K,V),K为movieID,V为平均分
val temp = x.split("::") //按照原始数据格式拆分为RDD格式
(temp(1),(temp(2).toFloat,1))
}.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
x._1/x._2 //通过每部电影总分和评论的人数计算出平均分
).filter(x => x._2 >= 4.0)//过滤出分数高于4分的电影
val moviePair = moviesRDD.map { x => //将数据转化为(K,V),K为movieID,V为电影name
val temp = x.split("::")
(temp(0),temp(1))
}.collectAsMap//保存为map 进行广播
var moviesBroadcast = sc.broadcast(moviePair) //将电影数据集广播,使每个节点都有一份完整的,就不需要shuffle
var result = ratingPair.map({ x =>
var movies = moviesBroadcast.value //取出广播变量内容值
var name = movies.getOrElse(x._1,"No") //取出当前movieID的电影名字
(x._1,(name,x._2)) //
})
result.map(x => (x._1,x._2._1,x._2._2)).saveAsTextFile(args(2))//重新定义输出格式并输出
val endTime = System.currentTimeMillis()//结束时间
println("运行时间(秒)"+(endTime-startTime)*0.001)
}
}
这里出现了两个Job的原因是有两个action算子(saveAsTextFile,collectAsMap)。在Job0中,只进行了一个工作collectAsMap,是为了后面广播方便。在Job1中,因为我们避免了耗时的join的Shuffle操作,自然就只有两个stage了。
-
这里还有一个地方可以改进,在广播的时候我有两个选择,广播内容为(电影ID,电影名字)的RDD和(电影ID,电影平均分),虽然两个数据集的行数一样,但是,电影名字的字节数远远大于平均分的字节数,所以广播(电影ID,电影平均分),最后再把分数低于4.0的过滤掉,可以传输更小的字节数,节约IO和网络传输时耗,时间缩短了3s。
运行结果分析
- 从整体运行时间来看,Reduce-side Join和Map-side Join分别为27s和24s,其最大的原因就是Reduce-side Join有更多的Shuffle操作,增加网络和IO时耗。
- Map-side Join的时耗主要是collectAsMap操作耗时,而Reduce-side Join的时耗主要是两个shuffle操作。在这里主要是为了计算电影评分的平均分,使得Map-side Join不得不用了一次shuffle操作,如果只是单纯的连接表,不需要求平均值的话,那么Map-side Join就不需要shuffle操作,会变得非常快了,这样Map-side Join和Reduce-side Join的差异会更明显了。
- 所以在开发过程中尽肯能的要去避免shuffle操作,用高性能的算子去代替。
Spark参数优化
前面说了,除了可以在开发过程中进行开发调优,还可以灵活的分配资源,使在现有资源上运行达到最优。以Map-side Join为例,使用3台centos6.5虚拟机,内存分别为6 2 2,cup为4核心i5。
详细原理见上图。我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。
在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。
Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。
task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。
以上就是Spark作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。
num-executors
- 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
- 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
之前没有手动设置这个参数,可以看到spark启动了2个executor进程。我这里只有3台虚拟机,尝试就设置为3了。发现性能降低了很多,估计是因为我的数据太小了,节点多了就加大了shuffle的消耗,所以设置为1,发现更快了,所以这真的是因为数据太小,数据传输的时间代价大于数据处理的时间代价。平常大数据情况下这个参数应该根据经验设置为50-100。
executor-memory
- 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
- 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3 ~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。
executor-cores
- 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
- 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。
由于我的集群是运行在虚拟机上的,所以所有节点共享windows的cpu,即每个节点相当于有4个cpu,所以设置为4,发现报错了,应该是资源不足,设置为2也报错,应该是虚拟机的cpu限制机制吧,所以只能设置为1或者默认了。
driver-memory
- 参数说明:该参数用于设置Driver进程的内存。
- 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
这个参数只要保证进行collect算子的时候,所有数据全部集中到Driver进程不会oom就行了,我这里数据相当小就不用设置了。
spark.default.parallelism
- 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
- 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
首先随便查看一个stage的信息,发现每个executor的task为1(因为文件很小只有一个hadoop block),只有一个线程完全没有并发,效率很低。根据num-executors * executor-cores的2~3倍,我这里就设置为2,和默认相比进步了1s。
spark.storage.memoryFraction
- 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
- 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
spark.shuffle.memoryFraction
- 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
- 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
序列化算法
在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作,序列化的作用主要是利用时间换空间:
- 分发给Executor上的Task
- 需要缓存的RDD(前提是使用序列化方式缓存)
- 广播变量
- Shuffle过程中的数据缓存
- 使用receiver方式接收的流数据缓存
- 算子函数中使用的外部变量
上面的六种数据,通过Java序列化(默认的序列化方式)形成一个二进制字节数组,大大减少了数据在内存、硬盘中占用的空间,减少了网络数据传输的开销,并且可以精确的推测内存使用情况,降低GC频率。
但是在序列化和反序列化的过程中,会消耗大量的时间,所以选择一个好的序列化算法很重要。目前Spark使用Kryo比Java默认的序列化快10倍。具体原理可见Kryo参考,这里只需要添加配置使用Kryo即可。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
运行时间变成了17s,再去查看序列化和反序列化的时耗:
总结
- 首先要从根源进行优化,也就是编写程序的时候,比如注意避免创建重复RDD、持久化常使用的RDD等编码方式。
- 编码过程中尽量少的出现shuffle操作,用其它操作代替。
- 序列化和反序列化使用得非常多,所以使用Kryo比默认快10倍是非常重要的。
- 对于资源而言,没有绝对的配置方法,首先要理解每个资源参数的意义和使用经验,再根据自己的集群状态来做调整。