1.分配更多的资源 -- 性能调优的王道
真实项目里的脚本:
bin/spark-submit \
--class com.xx.xx \
--num-executors 80 \
--driver-memory 6g \
--executor-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.waite.timeout=300 \
/usr/xx/xx.jar \
args
分配资源之前,要先看机器能够给我们提供多少资源,并且最大化的利用这些资源才是最好的;
1.standalone模式:
根据实际情况分配spark作业资源,比如每台机器4G,2个cpu,20台机器
2.spark-on-yarn模式:
要看spark作业要提交到的资源队列,大概有多少资源?
SparkContext、DAGScheduler、taskScheduler,会将我们的算子切割成大量的task提交到Application的executor上去执行,比如分配给其中一个的executor100个task,他的cpu只有2个,那么并行只能执行2个task,如果cpu为5个,那么久先执行5个再执行5个
a.增加executor的数量:
如果executor的数量比较少,那么意味着可以并行执行的task的数量就比较少,
就意味着Application的并行执行能力比较弱;
比如:
有3个executor,每个executor有2个cup core,
那么能够并行执行的task就是6个,6个执行完换下一批6个
增加executor的个数后,并行执行的task就会变多,性能得到提升
b.增加每个executor的cpu core
比如:
之前:3个exexutor,每个executor的cpu core为2个,那么并执行的task是6个
把cpu core增加到5个,那么并行执行的task就是15个,提高了性能
c.增加每个executor的内存量:
1.如果需要对RDD进行cache,那么更多的内存就能缓存更多的数据,
将更少的数据写入磁盘,甚至不写入磁盘,减少了磁盘IO。
2.对于shuffle操作,reduce端会需要内存来存储拉取过来的数据进行聚合,如果内存不够,
也会写入磁盘,增加executor内存,就会有更少的数据写入磁盘,较少磁盘IO,提高性能。
3.对于task的执行,需要创建很多对象,如果内存比较小,可能导致JVM堆内存满了,
然后频繁的GC,垃圾回收,minorGC和fullGC,速度会很慢,如果加大内存,
带来更少的GC,速度提升。
2.调节并行度
并行度:spark作业中,各个stage的task个数,也就代表了saprk作业中各个阶段[stage]的并行度。
[spark作业,Application,jobs,action会触发一个job,每个job会拆成多个stage,发生shuffle的时候,会拆出一个stage]
如果不调节并行度,导致并行度过低,会怎么样???
比如:
1.我们通过上面的分配好资源后,有50个executor,每个executor10G内存,每个executor有3个cpu core
基本已经达到了集群或者yarn的资源上限
2. 50个executor * 3个cpu = 150个task,即可以并行执行150个task;
而我们设置的时候只设置了100个并行度的task,这时候每个executor分配到2个task,同时运行task的数量只有100个,导致每个executor剩下的1个cpu core在那空转,浪费资源。
资源虽然够了,但是并行度没有和资源相匹配,导致分配下去的资源浪费掉了!!!
**合理的并行度设置,应该要设置的足够大,大到完全合理的利用集群资源!而且减少了每个task要处理的数据量[比如150g的数据分别分发给100个task处理就是每个task处理1.5G,但是如果是150个task的话,每个task就处理1G]**
总结:
a. task数量,至少设置成与Spark application的总cpu数相同(理想状态下,比如150个cpu core,分配150个task,差不多同时运行完毕)
b.官方推荐做法:task的数量设置成 Spark application的cpu core的个数的3~5倍!!
比如总共150个cpu core,设置成300~500个task
为什么这么设置呢???
实际情况下和理想状态下是不一样的,因为有些task运行的快,有些运行的慢,
比如有些运行需要50s,有些需要几分钟运行完毕,如果刚好设置task数量和cpu core的数量相同,可能会导致资源的浪费;
比如150个task,10个运行完了,还有140个在运行,那么势必会导致10个cpu core的闲置,
所以如果设置task的数量为cpu数量的2~3倍,一旦有task运行完,另一个task就会立刻补上来,
尽量让cpu core不要空闲,提升了spark作业运行效率,提升性能。
c.如何设置一个 Spark application的并行度???
SparkConf sc = new SparkConf()
.set("spark.default.parallelism","500");
3.重构RDD架构及RDD持久化:
默认情况下 RDD出现的问题:
RDD4
/
hdfs --> RDD1 --> RDD2 -->RDD3
\
RDD5
以上情况: 执行RDD4和RDD5的时候都会从RDD1到RDD2然后到RDD3,执行期间的算子操作,
而不会说到RDD3的算子操作后的结果给缓存起来,所以这样很麻烦,
出现了RDD重复计算的情况,导致性能急剧下降!
结论:
对于要多次计算和使用的公共RDD,一定要进行持久化!
持久化也就是:BlockManager将RDD的数据缓存到内存或者磁盘上,后续无论对这个RDD进行多少次计算,都只需要到缓存中取就ok了。
持久化策略:
rdd.persist(StorageLevel.xx()) 或者 cache
1.优先会把数据缓存到内存中 -- StorageLevel.MEMORY_ONLY
2.如果纯内存空间无法支撑公共RDD的数据时,就会优先考虑使用序列化的机制在纯内存中存储,
将RDD中的每个partition中的数据序列化成一个大的字节数组,也就是一个对象,
序列化后,大大减少了内存空间的占用。-- StorageLevel.MEMORY_ONLY_SER
序列化唯一的缺点:在获取数据的时候需要反序列化
3.如果序列化纯内存的方式还是导致OOM,内存溢出的话,那就要考虑磁盘的方式。
内存+磁盘的普通方式(无序列化) -- StorageLevel.MEMORY_AND_DISK
4.如果上面的还是无法存下的话,就用 内存+磁盘+序列化 -- StorageLevel.MEMORY_AND_DISK_SER
另:在机器内存**极度充足**的情况下,可以使用双副本机制,来持久化,保证数据的高可靠性
如果机器宕机了,那么还有一份副本数据,就不用再次进行算子计算了。[锦上添花--一般不要这么做] -- StorageLevel.MEMORY_ONLY_SER_2
4.广播大变量 [sc.broadcast(rdd.collect)]
问题情景:
当我们在写程序用到外部的维度表的数据进行使用的时候,程序默认会给每个task都发送相同的这个数据,
如果这个数据为100M,如果我们有1000个task,100G的数据,通过网络传输到task,
集群瞬间因为这个原因消耗掉100G的内存,对spark作业运行速度造成极大的影响,性能想想都很可怕!!!
解决方案:
sc.broadcast(rdd.collect)
分析原理:
[BlockManager:负责管理某个executor上的内存和磁盘上的数据]
广播变量会在Driver上有一份副本,当一个task使用到广播变量的数据时,会在自己本地的executor的BlockManager去取数据,
发现没有,BlockManager就会到Driver上远程去取数据,并保存在本地,
然后第二个task需要的时候来找BlockManager直接就可以找到该数据,
executor的Blockmanager除了可以到Driver远程的取数据,
还可能到邻近节点的BlockManager上去拉取数据,越近越好!
举例说明:
50个executor,1000个task,外部数据10M,
默认情况下,1000个task,1000个副本,10G数据,网络传输,集群中10G的内存消耗
如果使用广播,50个executor,500M的数据,网络传输速率大大增加,
10G=10000M 和 500M的对比 20倍啊。。。
**之前的一个测试[真实]:
没有经过任何调优的spark作业,运行下来16个小时,
合理分配资源+合理分配并行度+RDD持久化,作业下来5个小时,
非常重要的一个调优Shuffle优化之后,2~3个小时,
应用了其他的性能调优之后,JVM调参+广播等等,30分钟
16个小时 和 30分钟对比,太可怕了!!!性能调优真的真的很重要!!!
5.Kryo序列化机制:
默认情况下,Spark内部使用java的序列化机制
ObjectOutPutStream/ObjectInputStream,对象输入输出流机制来进行序列化
这种序列化机制的好处:
处理方便,只是需要在算子里使用的变量是实现Serializable接口即可
缺点在于:
默认序列化机制效率不高,序列化的速度比较慢,序列化后的数据占用内存空间还比较大
解决:手动进行序列化格式的优化:Kryo [spark支持的]
Kryo序列化机制比默认的java序列化机制速度快,
序列化后的数据更小,是java序列化后数据的 1/10 。
所以Kryo序列化后,会让网络传输的数据更少,在集群中耗费的资源大大减少。
Kryo序列化机制:[一旦启用,会生效的几个地方]
a.算子函数中使用到的外部变量[比如广播的外部维度表数据]
优化网络传输性能,较少集群的内存占用和消耗
b.持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
将每个RDD partition序列化成一个大的字节数组时,就会使用Kryo进一步优化序列化的效率和性能。
持久化RDD占用的内存越少,task执行的时候,创建的对象,不至于频繁的占满内存,频繁的GC
c.shuffle
在stage间的task的shuffle操作时,节点与节点之间的task会通过网络拉取和传输数据,
此时这些数据也是可能需要序列化的,就会使用Kryo
实现Kryo:
step1. 在SparkConf里设置 new SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KyroSerializer")
.registerKryoClasses(new Class[]{MyCategory.class})
[Kryo之所以没有没有作为默认的序列化类库,就是因为Kryo要求,如果要达到它的最佳效果的话]
[一定要注册我们自定义的类,不如:算子函数中使用到了外部自定义的对象变量,这时要求必须注册这个类,否则Kyro就达不到最佳性能]
step2. 注册使用到的,需要Kryo序列化的一些自定义类
6.使用FastUtil优化数据格式:
FastUtil是什么??
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,
fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。
fastutil还提供了一些JDK标准类库中没有的额外功能(比如双向迭代器)。
fastutil除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,但是对引用类型是使用等于号(=)进行比较的,而不是equals()方法。
fastutil尽量提供了在任何场景下都是速度最快的集合类库。
Spark中FastUtil运用的场景??
1.如果算子函数使用了外部变量,
第一可以使用broadcast广播变量优化;
第二可以使用Kryo序列化类库,提升序列化性能和效率;
第三如果外部变量是某种比较大的集合(Map、List等),可以考虑fastutil来改写外部变量,
首先从源头上就减少了内存的占用,通过广播变量进一步减少内存占用,
再通过Kryo类库进一步减少内存占用
避免了executor内存频繁占满,频繁唤起GC,导致性能下降的现象
使用步骤:
step1:导入pom依赖
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
step2:
List<Integer> => IntList
基本都是类似于IntList的格式,前缀就是集合的元素类型,
特殊的就是Map,Int2IntMap,代表了Key-Value映射的元素类型
7.调节数据本地化等待时长:
问题发生的场景:
spark在Driver上,对Application的每一个stage的task分配之前,
都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;
spark的分配算法:
a.优先把每一个task正好分配到他要计算的数据所在的节点,这样的话不用在网络间传输数据
b.但是,task没有机会分配到数据所在的节点上,为什么呢???
因为那个节点上的计算资源和计算能力都满了,这个时候 spark会等待一段时间,
默认情况下是3s钟,到最后,实在等不了了,就会选择一个较差的本地化级别,
比如说会把task分配到靠他要计算的数据的节点最近的节点,然后进行计算
c.对于b来说肯定要发生网络传输,task会通过其所在节点的executor的BlockManager来获取数据,
BlockManager发现自己本地没有,就会用getRemote()的方法,通过TransferService(网络数据传输组件)
从数据所在节点的BlockManager中获取数据,通过网络传输给task所在的节点
总结:
我们肯定是希望看到 task和数据都在同一个节点上,直接从本地的executor中的BlockManager中去获取数据,
纯内存或者带点IO,如果通过网络传输,那么大量的网络传输和磁盘IO都是性能的杀手
本地化的级别类型:
1.PROCESS_LOCAL: 进程本地化,代码和数据都在同一个进程中,也就是在同一个executor进程中,
task由executor来执行,数据在executor的BlockManager中,性能最好
2.NODE_LOCAL: 节点本地化,比如说一个节点上有两个executor,其中一个task需要第一个executor的数据,
但是他被分配给了第二个executor,他会找第二个executor的BlockManager去取数据,但是没有,
BlockManager会去第一个的executor的BlockManager去取数据,这是发生在进程中的
3.NOPREF: 数据从哪里获取都一样,没有好坏之分
4.RACK_LOCAL: 数据在同一个机架上的不同节点上,需要进行网络间的数据传输
5.ANY: 数据可能在集群中的任何地方,而且不在同一个机架,这种性能最差!!
开发时的流程:
观察spark作业时的日志,先测试,先用client模式,在本地就可以看到比较全的日志。
日志里面会显示:starting task...,PROCESS_LOCAL,或者是NODE_LOCAL,观察大部分数据本地化的级别
如果发现大部分都是PROCESS_LOCAL的级别,那就不用调了,如果大部分都是NODE_LOCAL或者ANY,那就要调节一下等待时长了
要反复调节,反复观察本地化级别是否提升,查看spark作业运行的时间有没有缩短
不要本末倒置,如果是 本地化级别提升了,但是因为大量的等待时间,spark作业的运行时常变大了,这就不要调节了
spark.locality.waite
spark.locality.waite.process
spark.locality.waite.node
spark.locality.waite.rack
默认等待时长都是3s
设置方法:
new SparkConf().set("spark.locality.waite","10")//不要带s
8.JVM调优:1个executor对应1个JVM进程
A. 降低cache操作的内存占比
JVM模块:
每一次存放对象的时候都会放入eden区域,其中有一个survivor区域,另一个survivor区域是空闲的[新生代],
当eden区域和一个survivor区域放满了以后(spark运行产生的对象太多了),
就会触发minor gc,小型垃圾回收,把不再使用的对象从内存中清空,给后面新创建的对象腾出空间
清理掉了不在使用的对象后,还有一部分存活的对象(还要继续使用的对象),
将存活的对象放入空闲的那个survivor区域里,这里默认eden:survivor1: survivor2 = 8:1:1,
假如对象占了1.5放不下survivor区域了,那么就会放到[老年代]里;
假如JVM的内存不够大的话,可能导致频繁的新生代内存满溢,频繁的进行minor gc,
频繁的minor gc会导致短时间内,有些存活的对象,多次垃圾回收都没有回收掉,
会导致这种短生命周期的对象(其实是不一定要长期使用的对象)年龄过大,
垃圾回收次数太多,还没有回收到,就已经跑到了老年代;
老年代中可能会因为内存不足,囤积一大堆短生命周期的对象(本来应该在年轻代中的),
可能马上就要回收掉的对象,此时可能造成老年代内存满溢,造成频繁的full gc(全局/全面垃圾回收),full gc就会去老年代中回收对象;
由于full gc算法的设计,是针对老年代中的对象,数量很少,满溢进行full gc的频率应该很少,
因此采取了不太复杂的但是耗费性能和时间的垃圾回收算法。full gc 很慢很慢;
full gc 和 minor gc,无论是快还是慢,都会导致JVM的工作线程停止工作,即 stop the world,
简言之:gc的时候,spark停止工作,等待垃圾回收结束;
在spark中,堆内存被分为了两块:
一块是专门用来给RDD cache和persist操作进行RDD数据缓存用的;
一块是给spark算子函数的运行使用的,存放函数中自己创建的对象;
默认情况下,给RDD cache的内存占比是60%,但是在某些情况下,比如RDD cache不那么紧张,
而task算子函数中创建的对象过多,内存不太大,导致频繁的minor gc,甚至频繁的full gc,
导致spark频繁的暂停工作,性能影响会非常大,
解决办法:
集群是spark-onyarn的话就可以通过spark ui来查看,spark的作业情况,
可以看到每个stage的运行情况,包括每个task的运行时间,gc时间等等,
如果发现gc太频繁,时间太长,此时可以适当调节这个比例;
总结:
降低cache的内存占比,大不了用persist操作,选择将一部分的RDD数据存入磁盘,
或者序列化方式Kryo,来减少RDD缓存的内存占比;
对应的RDD算子函数的内存占比就增多了,就可以减少minor gc的频率,同时减少full gc的频率,提高性能
具体实现:0.6->0.5->0.4->0.2
new SparkConf().set("spark.storage.memoryFraction","0.5")
B. executor堆外内存与连接时常
1. executor堆外内存[off-heap memory]:
场景:
比如两个stage,第二个stage的executor的task需要第一个executor的数据,
虽然可以通过Driver的MapOutputTracker可以找到自己数据的地址[也就是第一个executor的BlockManager],
但是第一个executor已经挂掉了,关联的BlockManager也没了,就没办法获取到数据;
有时候,如果你的spark作业处理的数据量特别大,几亿数据量;
spark作业一运行,是不是报错诸如:shuffle file cannot find,executor task lost,out of memory,
这时候可能是executor的堆外内存不够用了,导致executor在运行的时候出现了内存溢出;
导致后续的stage的task在运行的时候,可能从一些executor中拉取shuffle map output 文件,
但是executor已经挂掉了,关联的BlockManager也没有了,所以可能会报shuffle output file not found,resubmitting task,executor lost,spark作业彻底失败;
这个时候就可以考虑调节executor的堆外内存,堆外内存调节的比较大的话,也会提升性能;
怎么调价堆外内存的大小??
在spark-submit 的脚本中添加
--conf spark.yarn.executor.memoryOverhead=2048
注意:这个设置是在spark-submit脚本中,不是在 new SparkConf()里设置的!!!
这个是在spark-onyarn的集群中设置的,企业也是这么设置的!
默认情况下,堆外内存是300多M,我们在项目中通常都会出现问题,导致spark作业反复崩溃,
我们就可以调节这个参数 ,一般来说至少1G(1024M),有时候也会2G、4G,
来避免JVM oom的异常问题,提高整体spark作业的性能
2. 连接时常的等待:
知识回顾:如果JVM处于垃圾回收过程,所有的工作线程将会停止,相当于一旦进行垃圾回收,
spark/executor就会停止工作,无法提供响应
场景:
通常executor优先会从自己关联的BlockManager去取数据,如果本地没有,
会通过TransferService,去远程连接其他节点上的executor的BlockManager去取;
如果这个远程的executor正好创建的对象特别大,特别多,频繁的让JVM的内存满溢,进行垃圾回收,
此时就没有反应,无法建立网络连接,会有卡住的现象。spark默认的网络连接超时时间是60s,
如果卡住60秒都无法建立网络连接的话,就宣布失败;
出现的现象:偶尔会出现,一串fileId诸如:hg3y4h5g4j5h5g5h3 not found,file lost,
报错几次,几次都拉取不到数据的话,可能导致spark作业的崩溃!
也可能会导致DAGScheduler多次提交stage,TaskScheduler反复提交多次task,
大大延长了spark作业的运行时间
解决办法:[注意是在shell脚本上不是在SparkConf上set!!]
spark-submit
--conf spark.core.connection.ack.waite.timeout=300
9.shuffle调优
shuffle的概念以及场景
什么情况下会发生shuffle??
在spark中,主要是这几个算子:groupByKey、reduceByKey、countByKey、join等
什么是shuffle?
a) groupByKey:把分布在集群中各个节点上的数据中同一个key,对应的values都集中到一块,
集中到集群中的同一个节点上,更严密的说就是集中到一个节点上的一个executor的task中。
集中一个key对应的values后才能交给我们处理,<key,iterable<value>>
b) reduceByKey:算子函数对values集中进行reduce操作,最后变成一个value
c) join RDD<key,value> RDD<key,value>,只要两个RDD中key相同的value都会到一个节点的executor的task中,供我们处理
以reduceByKey为例:
9.1. shuffle调优之 map端合并输出文件
默认的shuffle对性能有什么影响??
实际生产环境的条件:
100个节点,每个节点一个executor:100个executor,每个executor2个cpu core,
总格1000个task,平均到每个executor是10个task;按照第二个stage的task个数和第一个stage的相同,
那么每个节点map端输出的文件个数就是:10 * 1000 = 10000 个
总共100个节点,总共map端输出的文件数:10000 * 100 = 100W 个
100万个。。。太吓人了!!!
shuffle中的写磁盘操作,基本上是shuffle中性能消耗最严重的部分,
通过上面的分析可知,一个普通的生产环境的spark job的shuffle环节,会写入磁盘100万个文件,
磁盘IO性能和对spark作业执行速度的影响,是极其惊人的!!
基本上,spark作业的性能,都消耗在了shuffle中了,虽然不只是shuffle的map端输出文件这一部分,但是这也是非常大的一个性能消耗点。
怎么解决?
开启map端输出文件合并机制:
new SparkConf().set('spark.shuffle.consolidateFiles','true')
实际开发中,开启了map端输出文件合并机制后,有什么变化?
100个节点,100个executor,
每个节点2个cpu core,
总共1000个task,每个executor10个task,
每个节点的输出文件个数:
2*1000 = 2000 个文件
总共输出文件个数:
100 * 2000 = 20万 个文件
相比开启合并之前的100万个,相差了5倍!!
合并map端输出文件,对spark的性能有哪些影响呢?
1. map task写入磁盘文件的IO,减少:100万 -> 20万个文件
2. 第二个stage,原本要拉取第一个stage的task数量文件,1000个task,第二个stage的每个task都会拉取1000份文件,走网络传输;合并以后,100个节点,每个节点2个cpu,第二个stage的每个task只需要拉取 100 * 2 = 200 个文件,网络传输的性能大大增强
实际生产中,使用了spark.shuffle.consolidateFiles后,实际的调优效果:
对于上述的生产环境的配置,性能的提升还是相当可观的,从之前的5个小时 降到了 2~3个小时
总结:
不要小看这个map端输出文件合并机制,实际上在数据量比较大的情况下,本身做了前面的优化,
executor上去了 -> cpu core 上去了 -> 并行度(task的数量)上去了,但是shuffle没调优,
这时候就很糟糕了,大量的map端输出文件的产生,会对性能有比较恶劣的影响
9.2. map端内存缓冲与reduce端内存占比
spark.shuffle.file.buffer,默认32k
spark.shuffle.memoryFraction,占比默认0.2
调优的分量:
map端内存缓冲和reduce端内存占比,网上对他俩说的是shuffle调优的不二之选,其实这是不对的,
因为以实际的生产经验来说,这两个参数没那么重要,但是还是有一点效果的,
就像是很多小的细节综合起来效果就很明显了,
原理:
map:
默认情况下,shuffle的map task输出到磁盘文件的时候,统一都会先写入每个task自己关联的一个内存缓冲区中,
这个缓冲区默认大小是32k,每一次,当内存缓冲区满溢后,才会进行spill操作,溢写到磁盘文件中
reduce:
reduce端task,在拉取数据之后,会用hashmap的数据格式来对每个key对应的values进行汇聚,
针对每个key对应的value,执行我们自定义的聚合函数的代码,比如_+_,(把所有values相加)
reduce task,在进行汇聚、聚合等操作的时候,实际上,使用的就是自己对应的executor的内存,
executor(jvm进程,堆),默认executor内存中划分给reduce task进行聚合的比例是20%。
问题来了,内存占比是20%,所以很有可能会出现,拉取过来的数据很多,那么在内存中,
放不下,这个时候就会发生spill(溢写)到磁盘文件中取.
如果不调优会出现什么问题??
默认map端内存缓冲是32k,
默认reduce端聚合内存占比是20%
如果map端处理的数据比较大,而内存缓冲是固定的,会出现什么问题呢?
每个task处理320k,32k的内存缓冲,总共向磁盘溢写10次,
每个task处理32000k,32k的内存缓冲,总共向磁盘溢写1000次,
这样就造成了多次的map端往磁盘文件的spill溢写操作,发生大量的磁盘IO,降低性能
map数据量比较大,reduce端拉取过来的数据很多,就会频繁的发生reduce端聚合内存不够用,
频繁发生spill操作,溢写到磁盘上去,这样一来,磁盘上溢写的数据量越大,
后面进行聚合操作的时候,很可能会多次读取磁盘中的数据进行聚合
默认情况下,在数据量比较大的时候,可能频繁的发生reduce端磁盘文件的读写;
这两点是很像的,而且有关联的,数据量变大,map端肯定出现问题,reduce也出现问题,
出的问题都是一样的,都是磁盘IO频繁,变多,影响性能
调优解决:
我们要看spark UI,
1. 如果公司用的是standalone模式,那么很简单,把spark跑起来,会显示sparkUI的地址,
4040端口号,进去看,依次点击可以看到,每个stage的详情,有哪些executor,有哪些task,
每个task的shuffle write 和 shuffle read的量,shuffle的磁盘和内存,读写的数据量
2. 如果是yarn模式提交,从yarn的界面进去,点击对应的application,进入spark ui,查看详情
如果发现磁盘的read和write很大,就意味着要调节一下shuffle的参数,进行调优,
首先当然要考虑map端输出文件合并机制
调节上面两个的参数,原则是:
spark.shuffle.buffer,每次扩大一倍,然后看看效果,64k,128k
spark.shuffle.memoryFraction,每次提高0.1,看看效果
不能调节的过大,因为你这边调节的很大,相对应的其他的就会变得很小,其他环节就会出问题
调节后的效果:
map task内存缓冲变大了,减少了spill到磁盘文件的次数;
reduce端聚合内存变大了,减少了spill到磁盘的次数,而且减少了后面聚合时读取磁盘的数量
new SparkConf()
.set("spark.shuffle.file.buffer","64")
.set("spark.shuffle.file.memoryFraction","0.3")
10.算子调优
1.算子调优之MapPartitons提升map的操作性能
在spark中最近本的原则:每个task处理RDD中的每一个partition
优缺点对比:
普通Map:
优点:比如处理了一千条数据,内存不够了,那么就可以将已经处理的一千条数据从内存里面垃圾回收掉,
或者用其他办法腾出空间;通常普通的map操作不会导致内存OOM异常;
缺点:比如一个partition中有10000条数据,那么function会执行和计算一万次
MapPartitions:
优点:一个task仅仅会执行一次function,一次function接收partition中的所有数据
只要执行一次就可以了,性能比较高
缺点:对于大数据量来说,比如一个partition100万条数据,一次传入一个function后,
可能一下子内存就不够了,但是又没办法腾出空间来,可能就OOM,内存溢出
那么什么时候使用MapPartitions呢?
当数据量不太大的时候,都可以使用MapPartitions来操作,性能还是很不错的,
不过也有经验表明用了MapPartitions后,内存直接溢出,
所以在项目中自己先估算一下RDD的数据量,以每个partition的量,还有分配给executor的内存大小,
可以试一下,如果直接OOM了,那就放弃吧,如果能够跑通,那就可以使用。
2.算子调优之filter之后 filter 之后 用 coalesce来减少partition的数量
默认情况下,RDD经过filter之后,RDD中每个partition的数据量会不太一样,(原本partition里的数据量可能是差不多的)
问题:
1.每一个partition的数据量变少了,但是在后面进行处理的时候,
还是要和partition的数量一样的task数量去处理,有点浪费task计算资源
2.每个partition的数据量不一样,后面会导致每个处理partition的task要处理的数据量不一样,
这时候很容易出现**数据倾斜**
比如说,有一个partition的数据量是100,而另一个partition的数据量是900,
在task处理逻辑一样的情况下,不同task要处理的数据量可能差别就到了9倍,甚至10倍以上,
同样导致速度差别在9倍或者10倍以上
这样就是导致了有的task运行的速度很快,有的运行的很慢,这就是数据倾斜。
解决:
针对以上问题,我们希望把partition压缩,因为数据量变小了,partition完全可以对应的变少,
比如原来4个partition,现在可以变成2个partition,那么就只要用后面的2个task来处理,
不会造成task资源的浪费(不必要针对只有一点点数据的partition来启动一个task进行计算)
避免了数据倾斜的问题
3.算子调优之使用foreachPartition优化写入数据库性能
默认的foreach有哪些缺点?
首先和map一样,对于每条数据都要去调一次function,task为每个数据,都要去执行一次task;
如果一个partition有100万条数据,就要调用100万次,性能极差!
如果每条数据都要创建一个数据库连接,那么就要创建100万个数据库连接,
但是数据库连接的创建和销毁都是非常耗性能的,虽然我们用了数据库连接池,只要创建固定数量的连接,
还是得多次通过数据库连接,往数据库里(mysql)发送一条sql语句,mysql需要去执行这条sql语句,
有100万条数据,那么就是要发送100万次sql语句;
用了foreachPartition以后,有哪些好处?
1.对于我们写的函数就调用一次就行了,一次传入一个partition的所有数据
2.主要创建或者获取一个数据库连接就可以了
3.只要向数据里发送一条sql语句和一组参数就可以了
在实际开发中,我们都是清一色使用foreachPartition算子操作,
但是有个问题,跟mapPartitions操作一样,如果partition的数据量非常大,
比如真的是100万条,那几本就不行了!一下子进来可能会发生OOM,内存溢出的问题
一组数据的对比:
生产环境中:
一个partition中有1000条数据,用foreach,跟用foreachPartition,
性能提高了2~3分;
数据库里是:
for循环里preparestatement.addBatch
外面是preparestatement.executeBatch
4.算子调优之repartition解决SparkSQL低并行度的问题
并行度: 我们是可以自己设置的
1.spark.default.parallelism
2.sc.textFile(),第二个参数传入指定的数量(这个方法用的非常少)
在生产环境中,我们是要自己手动设置一下并行度的,官网推荐就是在spark-submit脚本中,
指定你的application总共要启动多少个executor,100个,每个executor多少个cpu core,
2~3个,假设application的总cpu core有200个;
官方推荐设置并行度要是总共cpu core个数的2~3倍,一般最大值,所以是 600;
设置的这个并行度,在哪些情况下生效?哪些情况下不生效?
1.如果没有使用SparkSQL(DataFrame)的话,那么整个spark应用的并行度就是我们设置的那个并行度
2.如果第一个stage使用了SparkSQL从Hive表中查询了一些数据,然后做了一些transformatin的操作,
接着做了一个shuffle操作(groupByKey);下一个stage,在shuffle之后,做了一些transformation的操作
如果Hive表对应了20个block,而我们自己设置的并行度是100,
那么第一个stage的并行度是不受我们控制的,就只有20个task,第二个stage的才是我们设置的并行度100个
问题出在哪里了?
SparkSQL 默认情况下,我们是没办法手动设置并行度的,所以可能造成问题,也可能不造成问题,
SparkSQL后面的transformation算子操作,可能是很复杂的业务逻辑,甚至是很复杂的算法,
如果SparkSQL默认的并行度设置的很少,20个,然后每个task要处理为数不少的数据量,
还要执行很复杂的算法,这就导致第一个stage特别慢,第二个stage 1000个task,特别快!
解决办法:
repartition:
使用SparkSQL这一步的并行度和task的数量肯定是没办法改变了,但是可以将SparkSQL查出来的RDD,
使用repartition算子进行重新分区,比如分多个partition,20 -> 100个;
然后从repartition以后的RDD,并行度和task数量,就会按照我们预期的来了,
就可以避免在跟SparkSQL绑定在一起的stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑
5.算子操作reduceByKey:
reduceByKey相较于普通的shuffle操作(不如groupByKey),他的一个特点就是会进行map端的本地聚合;
对map端给下个stage每个task创建的输出文件中,写数据之前,就会进行本地的combiner操作,也就是多每个key的value,都会执行算子函数(_+_),减少了磁盘IO,较少了磁盘空间的占用,在reduce端的缓存也变少了
11.troubleshooting之控制reduce端缓冲大小以避免内存溢出(OOM)
new SparkConf().set("spark.reducer.maxSizeInFlight","24") //默认是48M
Map端的task是不断地输出数据的,数据量可能是很大的,
但是其reduce端的task,并不是等到Map端task将属于自己的那个分数据全部写入磁盘后,再去拉取的
Map端写一点数据,reduce端task就会去拉取一小部分数据,立刻进行后面的聚合,算子函数的应用;
每次reduce能够拉取多少数据,是由reduce端buffer来定,因为拉取过来的数据都是放入buffer中的,
然后采用后面的executor分配的堆内存占比(0.2),去进行后续的聚合,函数操作
reduce端buffer 可能会出现什么问题?
reduce端buffer默认是48M,也许大多时候,还没有拉取满48M,也许是10M,就计算掉了,
但是有时候,Map端的数据量特别大,写出的速度特别快,reduce端拉取的时候,全部到达了自己缓冲的最大极限48M,全部填满,
这个时候,再加上reduce端执行的聚合函数代码,可能会创建大量的对象,也许一下子内存就撑不住了,就会造成OOM,reduce端的内存就会造成内存泄漏
如何解决?
这个时候,我们应该减少reduce端task缓冲的大小,我们宁愿多拉取几次,但是每次同时能拉取到reduce端每个task的数据量比较少,就不容易发生OOM,比如调成12M;
在实际生产中,这种问题是很常见的,这是典型的以性能换执行的原理,
reduce的缓冲小了,不容易造成OOM了,但是性能一定是有所下降的,你要拉取的次数多了,
就会走更多的网络IO流,这时候只能走牺牲性能的方式了;
曾经一个经验:
曾经写了一个特别复杂的spark作业,写完代码后,半个月就是跑步起来,里面各种各样的问题,
需要进行troubleshooting,调节了十几个参数,其中里面就有reduce端缓冲的大小,最后,
总算跑起来了!
12. troubleshooting之解决JVM GC导致的shuffle拉取文件失败:
过程:
第一个stage的task输出文件的同时 ,会像Driver上记录这些数据信息,然后下一个stage的task想要得到上个stage的数据,
就得像Driver所要元数据信息,然后去像上一个的stage的task生成的文件中拉取数据。
问题场景:
在spark作业中,有时候经常出现一种情况,就是log日志报出:shuffle file not found..,
有时候他会偶尔出现一次,有的时候出现一次后重新提交stage、task,重新执行一遍 就好了。
分析问题:
executor在JVM进程中,可能内存不太够用,那么此时就很可能执行GC,minor gc 或者 full gc,
总之一旦发生gc后,就会导致所有工作线程全部停止,比如BlockManager,基于netty的网络通信。
第二个stage的task去拉取数据的时候,上一个executor正好在进行gc,就导致拉取了半天也没拉取到数据,
那为什么第二次提交stage的时候,就又可以了呢?
因为第二次提交的时候,上一个executor已经完成了gc。
解决:
spark.shuffle.io.maxRetries 3[默认3次]
shuffle 文件拉取时,如果没有拉取到,最多或者重试几次,默认3次
spark.shuffle.io.retryWait 5s [默认5s]
每一次重新拉取文件的时间间隔,默认5s
默认情况下,第一个stage的executor正在漫长的full gc,第二个stage的executor尝试去拉取数据,
结果没拉取到,这样会反复重试拉取3次,中间间隔时间5s,也就是总共15s,拉取不成功,就报 shuffle file not found
我们可以增大上面两个参数的值:
spark.shuffle.io.maxRetries 60次
spark.shuffle.io.retryWait 60s
最多可以忍受一个小时没有拉取到shuffle file,这只是一个设置最大的可能值,
full gc 也不可能一个小时都没结束把,
这样就解决了因为gc 而无法拉取到数据的问题
13. troubleshooting之解决yarn-cluster模式的JVM栈内存溢出问题
yarn-cluster运行流程:
1.本地机器执行spark-submit脚本[yarn-cluster模式],提交spark application给resourceManager
2. resourceManager找到一个节点[nodeManager]启动applicationMaster[Driver进程]
3. applicationMaster找resourceManager申请executor
4. resourceManager分配container(内存+cpu)
5. applicationMaster找到对应nodeManager申请启动executor
6. nodeManager启动executor
7. executor找applicationMaster进行反向注册
到这里为止,applicationMaster(Driver)就知道自己有哪些资源可以用(executor),
然后就会去执行job,拆分stage,提交stage的task,进行task调度,
分配到各个executor上面去执行。
yarn-client 和 yarn-cluster的区别:
yarn-client模式Driver运行在本地机器上;yarn-cluster模式Driver是运行在yarn集群上的某个nodeManager节点上的;
yarn-client模式会导致本地机器负责spark作业的调用,所以网卡流量会激增,yarn-cluster没有这个问题;
yarnclient的Driver运行在本地,通常来说本地机器和yarn集群都不会在一个机房,所以性能不是特别好;
yarn-cluster模式下,Driver是跟yarn集群运行在一个机房内,性能上也会好很好;
实践经验碰到的yarn-cluster的问题:
有时候运行了包含spark sql的spark作业,可能会遇到 在yarn-client上运行好好地,在yarn-cluster模式下,
可能无法提交运行,会报出JVM的PermGen(永久代)的内存溢出-OOM;
Yarn-client模式下,Driver是运行在本地机器的,spark使用的JVM的PerGen的配置,是本地的spark-class文件,
(spark客户端是默认有配置的),JVM的永久代大小默认是128M,这个是没问题的;
但是在Yarn-cluster模式下,Driver是运行在yarn集群的某个节点上的,使用的是没有经过配置的默认设置82M(PerGen永久代大小)
spark sql内部会进行很负责的sl语义解析、语法树的转换,特别复杂,在这种情况下,如果sql特别复杂,
很可能会导致性能的消耗,内存的消耗,可能对PermGen永久代的内存占比就很大
所以此时,如果对PermGen的内存占比需求多与82M,但是又小于128M,就会出现类似上面的情况,
yarn-client可以正常运行因为他的默认permgen大小是128M,但是yarn-cluster的默认是82M,就会出现PermGen OOM -- PermGen out of memory
解决:
spark-submit脚本中加入参数:
--conf spark.driver.extraJavaOptions='-XX:PermSize=128M -XX:MxPermSize=256M'
这样就设置了永久代的大小默认128M,最大256M,那么这样的话,就可以保证spark作业不会出现上面的PermG