1.将数据缓存到内存
性能调优主要是将数据放入内存中,以加快处理数据的速度。通过spark.cacheTable(“tableName”)或者dataFrame.cache()可以将表缓存到内存;使用spark.uncacheTable(“tableName”)可以从内存中删除表。
案例:将从MySQL中读取到的数据缓存到内存以加速查询。
spark-shell启动时连接MySQL:
# spark-shell --master spark://localhost:7077
--jars /root/trainings/apache-hive-1.2.2-bin/lib/mysql-connector-java-5.1.46-bin.jar
--driver-class-path /root/trainings/apache-hive-1.2.2-bin/lib/mysql-connector-java-5.1.46-bin.jar
(1)从MySQL数据库中读取数据生成DataFrame
scala> val usersDF = spark.read.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/spark")
.option("dbtable","tblUsers")
.option("user","root")
.option("password","123456")
.load
usersDF: org.apache.spark.sql.DataFrame = [uID: int, uName: string ... 1 more field]
(2)将usersDF注册成表tblUsers
scala> usersDF.registerTempTable("tblUsers")
warning: there was one deprecation warning; re-run with -deprecation for details
(3)执行第一次查询
scala> spark.sql("select * from tblUsers").show
(4)将表进行缓存(标记),并执行第二次查询(触发缓存)
scala> spark.sqlContext.cacheTable("tblUsers")
scala> spark.sql("select * from tblUsers").show
(5)执行第三次查询
scala> spark.sql("select * from tblUsers").show
(6)通过Web Console查看三次查询的执行时间
在浏览器地址栏输入Spark Master的地址:http://192.168.126.110:8080,在Running Applications中点击Spark shell应用,打开应用详情页面:
可以看到没有加入缓存时查询的时间远大于加入缓存后的查询时间,即前两次的时间的时间远大于第三次的时间。
(7)清空缓存
scala> spark.sqlContext.cacheTable("tblUsers")
18/07/10 21:33:46 WARN execution.CacheManager: Asked to cache already cached data.
scala> spark.sqlContext.clearCache
2.性能优化相关参数
2.1缓存相关的参数
- spark.sql.inMemoryColumnarStorage.compressed:默认为true,Spark SQL将会基于统计信息自动为每一列选择一种压缩编码方式;
- spark.sql.inMemoryColumnarStorage.batchSize:默认值10000,缓存批处理大小。缓存数据时,较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来内存OOM风险;
2.2其他性能相关的配置
- spark.sql.files.maxPartitionBytes:默认值128MB,读取文件时单个分区可容纳的最大字节数;
- spark.sql.files.openCostInBytes:默认值4MB,打开文件的估算成本,按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件时会使用。高估更好,这样的话小文件将会比大文件分区更快(先被调用);
- spark.sql.autoBroadcastJoinThreshold:默认值10MB,用于配置一个表在执行join操作时能够广播给所有worker节点的最大字节大小。通过将这个值设置为-1可以禁用广播。注意,当前数据统计仅支持已经运行了ANALYZE TABLE <tablename> COMPUTE STATISTICS noscan命令的Hive Metastore表;
- spark.sql.shuffle.partitons:默认值200,用于配置join或者聚合操作的shuffle操作使用的分区数。
调优参数除了默认值之外,没有什么固定值可供参考,全凭在实际生产环境中经验来设置,调优是一项长期的经验积累过程。本节介绍仅供参考。