一.概述
Spark数据本地化即计算向数据移动,但数据块所在的Executor不一定有足够的的计算资源提供,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动。
二.本地化级别(Locality Levels)
Spark目前支持以下几种本地化级别:
- 1.PROCESS_LOCAL:进程本地化,表示 task 要计算的数据在同一个 Executor 中。
- 2.NODE_LOCAL: 节点本地化,速度稍慢,因为数据需要在不同的进程之间传递或从文件中读取。分为两种情况,第一种:task 要计算的数据是在同一个 worker 的不同 Executor 进程中。第二种:task 要计算的数据是在同一个 worker 的磁盘上,或在 HDFS 上恰好有 block 在同一个节点上。如果 Spark 要计算的数据来源于 HDFS 上,那么最好的本地化级别就是 NODE_LOCAL。
- 3.NO_PREF: 没有最佳位置,数据从哪访问都一样快,不需要位置优先。比如 Spark SQL 从 Mysql 中读取数据。
- 4.RACK_LOCAL:机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。
- 5.ANY:跨机架,数据在非同一机架的网络上,速度最慢。
三.Spark 的数据本地化由谁来负责
DAGScheduler 切割Job,划分Stage, 通过调用 submitStage 来提交一个Stage 对应的 Tasks,submitStage 会调用 submitMissingTasks, submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用 getPreferrdeLocations方法得到 partition 的优先位置,就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个Task ,该 Task 优先位置与其对应的 partition 对应的优先位置一致。
TaskScheduler 接收到了 TaskSet 后,TaskScheduler会为每个TaskSet创建一个TaskSetMagager来对其Task进行管理,TaskSetMagager中包含TaskSet 所有 Task,并管理这些 Task 的执行,在初始化TaskSetMagager的时候就会通过computeValidLocalityLevels计算该TaskSet包含的Locality Levels,以便在调度和延迟调度 tasks 时发挥作用。
总的来说,Spark 中的数据本地化是由 DAGScheduler 和 TaskScheduler 共同负责的。
四.Spark是如何进行调度
Locality Levels表示了计算节点与输入数据位置的关系,下面以一个图来展开 Spark 是如何进行调度的。这一个过程会涉及 RDD, DAGScheduler , TaskScheduler。
1.PROCESS_LOCAL
TaskScheduler 根据数据的位置向数据节点发送 Task 任务。如果这个任务在 worker1 的 Executor 中等待了 3 秒。(默认的,可以通过spark.locality.wait 来设置),可以通过 SparkConf() 来修改,重试了 5 次之后,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 PROCESS_LOCAL 降到 NODE_LOCAL。
2.NODE_LOCAL
TaskScheduler 重新发送 task 到 worker1 中的 Executor2 中执行,如果 Task 在worker1 的 Executor2 中等待了 3 秒,重试了 5 次,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 NODE_LOCAL 降到 RACK_LOCAL。
3.RACK_LOCAL
TaskScheduler重新发送 Task 到 worker2 中的 Executor1 中执行。
4.获取数据执行
当 Task 分配完成之后,Task 会通过所在的 worker 的 Executor 中的 BlockManager 来获取数据。如果 BlockManager 发现自己没有数据,那么它会调用 getRemoteValues 方法,通过 BlockManagerSlaveEndpoint与Driver所在节点的BlockManagerMaster中的BlockManagerMasterEndpoint先建立连接,获取数据所在的BlockManager的地址,然后通过BlockTransferService(网络传输组件)获取数据,通过网络传输回Task所在节点(这时候性能大幅下降,大量的网络IO占用资源),之后就开始计算流程。
四.优化建议
TaskScheduler在发送 Task 的时候,会根据数据所在的节点发送 Task ,这时候的数据本地化的级别是最高的,如果这个 Task 在这个Executor中等待了3秒,重试发射了5次还是依然无法执行,那么TaskScheduler就会认为这个Executor的计算资源满了,TaskScheduler会降低 1 级数据本地化的级别,重新发送 Task 到其他的Executor中执行,如果还是依然无法执行,那么继续降低数据本地化的级别...
如果想让每一个 Task 都能拿到最好的数据本地化级别,那么调优点就是等待时间加长。注意!如果过度调大等待时间,虽然为每一个 Task 都拿到了最好的数据本地化级别,但是我们 Job 执行的时间也会随之延长。
属性名称 | 默认值 | 含义 |
---|---|---|
spark.locality.wait | 3000 | 以下几个参数是关于Spark数据本地性的。本参数是以毫秒为单位启动本地数据task的等待时间,如果超出就启动下一本地优先级别的task。该设置同样可以应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),当然,也可以通过spark.locality.wait.node等参数设置不同优先级别的本地性 |
spark.locality.wait.process | spark.locality.wait | 本地进程级别的本地等待时间 |
spark.locality.wait.node | spark.locality.wait | 本地节点级别的本地等待时间 |
spark.locality.wait.rack | spark.locality.wait | 本地机架级别的本地等待时间 |
可以在代码里面这样设置:
new SparkConf.set("spark.locality.wait","1000")