Apache Spark是开源的、分布式的、集成计算引擎,支持在计算机集群上的并行数据计算。Spark支持多种被广泛使用的编程语言(Python,Java,Scala,R),包含一系列适用于不同任务处理的工具,从SQL数据分析,到流数据处理,到机器学习等,并且在单个PC上和数千台服务器上都能够友好运行,非常适合大数据分析处理。
spark应用的结构
spark应用包含driver进程和executor进程。driver进程负责三件事情:保存维护spark应用的状态信息,返回程序执行结果,分配任务给executor进程。executor进程负责执行driver进程分配的任务,并且返回任务状态和结果给driver。
除了driver进程和executor进程,spark应用还必须包含一个集群管理器,负责管理集群中的机器和分配计算资源。spark支持三种集群管理器:spark本身自带的集群管理器,Apache Mesos,Hadoop YARN。集群管理器也有类似于driver和executor的driver节点(有时候也叫master节点)和worker节点,其区别在于,集群管理器的driver节点和worker节点都是物理机器,spark应用的driver进程和executor进程都是执行程序的进程。当启动spark应用时,程序会首先向集群管理器申请计算资源,然后在申请到资源执行任务的过程中,集群管理器负责管理运行任务的机器。
执行模式Execution Modes
执行模式的不同决定了在运行spark应用时资源是如何进行分配的。spark支持三种执行模式可供选择:cluster mode集群模式,client mode客户端模式,local mode单机模式。
cluster mode集群模式
集群模式是spark最常见的执行模式,使用集群中的多台机器更能发挥spark分布式计算的优势。在集群模式下spark应用启动的过程是这样的:
1)首先,用户提交应用程序给集群管理器;
2)然后,集群管理器的driver节点在集群中的一台worker节点上启动driver进程,然后在其他worker节点上启动executor进程;
- driver进程和executor进程在不同的worker节点上;
- 同一个worker节点上可能会启动多个executor进程;
client mode客户端模式
客户端模式与集群模式下,spark应用的启动过程几乎是相同的,唯一不同的一点是,在客户端模式下,driver进程是在用户提交应用程序的客户端机器上启动的,而不是在集群中的任何一台worker节点上。
local mode单机模式
单机模式与前两种执行模式完全不同,在单机模式下,driver进程和executor进程全部运行在本地的一台机器上,并行计算是通过单机上的多线程实现的。单机模式不适合用于实际生产应用,通常用于在本地测试spark程序。
Spark应用的生命周期(spark程序外)
用户请求
- 首先,由用户提交spark应用程序;
1)在本机执行应用程序,准备向集群管理器的driver节点提交申请,申请的内容只有driver进程所需的资源;
- 用户的应用程序中必须包含SparkSession,因为SparkSession是spark程序的入口,用于与集群传递信息(比如executor的数量等);
2)集群管理器的driver节点接收到用户请求,并且在集群中的一台worker节点上启动driver进程;
3)集群管理器返回状态信息给本机,本机上提交程序的用户进程退出,此时,应用与本机断开,并且开始在启动的driver进程中运行。
启动
1)driver进程中的SparkSession向集群管理器中的driver节点发出请求,请求的内容是启动executor进程;
2)driver节点接收到请求,并且对资源进行评估,如果资源满足请求,driver节点在集群中的worker节点上启动executor进程;
- executor的数量和每个executor的资源大小,用户可以在使用spark-submit命令提交任务的时候通过参数进行配置;
3)集群的driver节点把启动的executor地址等信息发送给driver进程;
执行
driver进程和executor进程全部启动之后,接下来的执行过程就是在spark内部的生命周期流程:driver负责向executor分配任务,executor执行任务并且向driver返回任务的运行状态(成功或失败)。详细的过程在下一部分spark内部的生命周期中介绍。
执行结束
在spark应用运行结束之后,首先driver进程关闭,并且把运行结果返回给集群管理器,然后集群管理器关闭executor进程。
Spark应用的生命周期(spark程序内)
在spark内部实际执行的是用户提交的应用程序代码,理解用户程序在spark内部的生命周期流程有助于任务的跟踪和优化。
SparkSession
spark应用的第一步是创建一个SparkSession,因为它是spark程序的入口,在很多交互模式下,系统会为用户自动创建好一个SparkSession,但是在用户编写应用的时候必须手动创建SparkSession。
SparkSession创建之后,就开始执行用户提交的程序。在spark中,用户提交的所有程序都会被编译成底层RDD。
逻辑指令
Spark程序本质上由transformation操作和action操作组成,可以通过SQL API或者RDD API创建。为了理解spark程序是如何在集群上运行的,最重要的是清楚用户编写的程序是如何转化成物理执行计划的。为此,我们以以下代码为例,描述spark详细描述:
df1 = spark.range(2,10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)") # 2500000000000
Spark Job
一般情况下,一个action操作对应一个Spark Job。每个Job又会分成多个stages,stages的数量取决于程序中有多少个shuffle类型的操作。
stages
Stages是能够在多台机器上并行计算以完成想同类型的操作的一组tasks。一般情况下,Spark会把尽量多连续的transformation操作包含在同一个stage中,直到遇到shuffle类型的操作,才重新开始一个新的stage。shuffle,中文直译为洗牌,表示数据在不同executor之间的重新分配,比如对Dataframe按照某个字段进行排序,按照某个字段group之后聚合操作,都会引起数据的重新分配,属于shuffle类型的操作。Spark在遇到shuffle操作之后重新开始一个stage,并且记录下stages的顺序。
Tasks
每个stages中包含了很多tasks,每个task对应于单个executor上面对一组数据进行的一组transformation操作。tasks的数量取决于数据的partition的数量,如果你有一个包含非常多数据的很大的partition,就会有一个大的task,但是如果把一个大的数据repartition成很多个小的partition,就会有很多个小任务,所以,把数据分成很多个partition意味着更多的数据可以并行计算。
另外,stages和tasks有两个重要的特性:1)pipelining,spark会自动把连续的transformation操作放在一个stage中执行;2)shuffle persistence,spark在执行shuffle操作的时候,shuffle的前一个stage会首先把数据写入到stage当前节点的磁盘上,然后,执行shuffle的stage才启动,并且从每个节点的磁盘文件中读取对应的记录(比如对应某些特定的key的记录)执行计算任务。这样的机制的非常大的好处是,由于shuffle任务需要的数据已经在前一个stage写入到磁盘上,当shuffle任务执行失败的时候,spark只需要基于磁盘数据重新启动shuffle任务,而不需要重新启动所有的任务。