如何正确地开启SparkStreaming的动态分配?
在邮件组中这个问题被讨论过,想要开启针对SparkStreaming的动态分配策略需要如下配置,其中必须设置spark.dynamicAllocation.enabled=false
和spark.executor.instances=0
,然后再开启spark.streaming.dynamicAllocation.enabled=true
:
spark-submit run-example \
--conf spark.streaming.dynamicAllocation.enabled=true \
--conf spark.executor.instances=0 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.master=yarn \
--conf spark.submit.deployMode=client \
org.apache.spark.examples.streaming.HdfsWordCount /foo
SparkStreaming动态分配跟原动态分配策略的区别
动态分配现在有两种策略:
-
spark.dynamicAllocation.enabled=false
所对应的org.apache.spark.ExecutorAllocationManager
; -
spark.streaming.dynamicAllocation.enabled=true
所对应的org.apache.spark.streaming.scheduler.ExecutorAllocationManager
;
策略一主要针对的是普通ETL任务或者SparkSQL任务的动态分配策略;策略二主要是针对SparkStraming任务类型对原策略做了改进;
策略一
策略一是根据SparkListener
实现的,原理是通过onStageSubmitted()/onStageCompleted()/onTaskStart()/onTaskEnd()方法来关联executor -> 是否空闲的关系;
如果在该executor上没有任何task运行,则加入removeExecutors列表,并观察spark.dynamicAllocation.executorIdleTimeout(默认60s)
内是否有使用(当然对于有cached的rdd是根据spark.dynamicAllocation.cachedExecutorIdleTimeout
这个参数),如果空闲这么长时间则调用removeExecutor()接口,移除该executor;
策略二:SPARK-12133
在SPARK-12133中提出了一中动态分配策略,可以了解到策略一存在如下弊端:
- 针对SparkStreaming任务,executor用户不会处于idle的状态因为每个batch每个一段时间都会运行;
- 策略一方法没有考虑batch队列的状况;
- 针对运行reciever的executors应该做不一样的处理;
所以在该PR种提出了一种针对SparkStreaming的动态分配算法,分三部分进行介绍:
- 基础功能;
- Recieiver Executor独立调度;
- Receiver的变化数量;
进行调度的基础是 $R = BatchProcessTime/BatchDuration$,具体策略如下:
- 如果 $R_{avg} > 0.9 $,增加executors, 每次增加$minx(1, round(R_{avg}))$个executor;
- 如果 $R_{avg} < 0.5 $,减少executors,每次减少一个executor