Spark性能优化分为四个方面:
1、开发调优
2、资源调优
3、数据倾斜调优
4、shuffle调优
1. 开发调优
1.1 避免创建重复的RDD
对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。
错误示例:
val rdd1 = sc.textFile("../hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("../hello.txt")
rdd2.reduce(...)
正确示例:
val rdd1 = sc.textFile("../hello.txt")
rdd1.map(...)
rdd1.reduce(...)
1.2 尽可能复用同一个RDD
在对不同的数据执行算子操作时还要尽可能地复用一个RDD。
错误示例:
JavaPairRDD</long><long , String> rdd1 = ...
JavaRDD<string> rdd2 = rdd1.map(...)
// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)
正确示例:
JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
1.3 对多次使用的RDD进行持久化
对多次使用的RDD进行持久化,以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据。
正确示例:
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,
//内存不充足时持久化到磁盘文件中。
// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition
//都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,
//从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
对于persist(),可根据业务场景选择持久化级别。
1.4 尽可能避免使用shuffle类算子
shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。所以要尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。
Broadcast与map进行join代码示例:
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,
//那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,
//拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
1.5 使用map-side预聚合的shuffle操作
如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。
所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。
在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
1.6 使用高性能的算子
1)使用reduceByKey/aggregateByKey替代groupByKey
2)使用mapPartitions替代普通map
3)使用foreachPartitions替代foreach
4)使用filter之后进行coalesce操作
5)使用repartitionAndSortWithinPartitions替代repartition与sort类操作
1.7 广播大变量
有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。
错误示例:
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)
正确示例:
// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)
1.8 使用Kryo优化序列化性能
在Spark中,主要有三个地方涉及到了序列化:
1、在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
2、将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
3、使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
1.9 优化数据结构
Java中,有三种类型比较耗费内存:
1、对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
2、字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
3、集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。