目录
前言
思来想去,还是决定从头开始写起,从最基础的东西入手,研读Spark Core的源码。相对于之前东一榔头西一棒槌地抓重点,这样应该更能够由浅入深地建立对Spark核心的认知,写起来也更加有条理。
除非特殊说明,本系列文章的源码全部基于当前(2019年3月)最新的Spark 2.3.3版本。
星星之火,可以燎原。作为系列的第0篇,自然要请出大数据领域的Hello World程序——WordCount,以它为起点做一些准备,然后展开对Spark内部设计的探究。
Spark WordCount
Spark源码自带示例里只有Java版WordCount。考虑到Spark主要用Scala语言开发,下面给出一份Scala版本的,以它为准。
代码#0.1 - Scala版WordCount
package me.lmagics.spark.example;
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object ScalaWordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile(args(0))
val words: RDD[String] = lines.flatMap(line => line.split(" "))
val ones: RDD[(String, Int)] = words.map(word => (word, 1))
val counts: RDD[(String, Int)] = ones.reduceByKey((i1, i2) => i1 + i2)
val result: Array[(String, Int)] = counts.collect()
for ((word, count) <- result) {
println(word + "\t" + count)
}
sc.stop()
}
}
为了方便理解,代码中还显式标出了每个变量的类型。实际编码时可以不用这样做,因为Scala支持类型推断。
下面,通过WordCount程序中出现的要点,先来预热(或者说复习?)一下Spark中最基础的那些东西。
SparkConf
SparkConf类负责管理Spark的所有配置参数。用户可以通过SparkConf类的实例来自行设置或获取配置参数。
SparkContext
SparkContext类是一切Spark程序的总入口,它接受SparkConf中定义的配置参数,并完成大量的初始化工作。只有SparkContext初始化完成之后,Spark程序才能真正准备好运行。
RDD
RDD的正式名称为弹性分布式数据集(Resilient distributed dataset),是Spark中最重要的概念之一。Spark官方文档中对它的定义是:可以并行操作的、容错的元素集合。
除了可并行操作、容错两点之外,RDD还具有一些其他特点,如:不可变性(只能生成或转换,不能直接修改),分区性(内部数据会划分为Partition),以及名称中的弹性(可以灵活利用内存和外存)。
要生成RDD,有两种方法。一是对内存中存在的数据执行并行化(Parallelize)操作,二是从外部存储(例如HDFS、HBase、Cassandra等等)中的数据源读取并生成。代码#0.1中的RDD lines就是通过调用SparkContext.textFile()方法,从外部存储中的文本文件生成的。
RDD操作(算子)
官方文档中将类似flatMap()、map()、reduceByKey()、collect()这样在RDD上定义的,对数据操作的方法统称为Operation。但在中文环境中,可能是为了适应Scala函数式编程的特点,经常称作“算子”。为了统一,之后再提及RDD上的这类方法,就全部叫做“算子”。
算子有两类,即转换(Transformation)算子与动作(Action)算子。
转换算子用于对一个RDD施加一系列逻辑,使之变成另一个RDD。flatMap()、map()、reduceByKey()都是转换算子;动作算子用于真正执行RDD转换逻辑的计算,并返回其处理结果。collect()就是动作算子。
换句话说,转换算子是懒执行(延迟执行)的,需要动作算子来触发。如果没有动作算子,那么RDD的转换逻辑只会被“记忆”,而不会生效。
Spark Web UI
Spark Web UI是Spark中的两个主要监控组件之一,另外一个是度量系统(Metrics System,下图#0.2能看到)。通过Web UI可以直观地观察到Spark程序的运行状况、资源状态,以及度量系统收集来的各项监控指标,是调试Spark程序的基础工具。
以代码#0.1的WordCount程序为例,为方便讲解,它采用最简单的本地(Local)部署模式运行,而没有使用集群。在它运行的同时,打开本机浏览器并访问localhost:4040,就可以看到Web UI。4040是Web UI默认的端口。
下图示出Web UI中的Jobs与Stages页面。
这两张图中也有不少需要注意的点,来梳理一下。
Application
Application(应用)就是用户使用Spark API编写,并提交到Spark部署、执行的程序。一个应用即对应一个JVM,图中的“ScalaWordCount”就是应用的名称。
Job
Job(作业)是Spark调度体系中的高层。在Spark应用代码中,一个动作算子就标志着一个作业的提交,其中包含该动作算子本身,以及它负责的前面所有转换逻辑。示例的WordCount程序中只有一个动作算子,因此它只有一个Job 0。
Stage
Stage(阶段)是Spark调度体系中的中间层,作业可以包含多个阶段。阶段的划分是由Shuffle与Shuffle依赖决定的,下面也会简要介绍Shuffle。图#0.1显示Job 0划分为两个Stage 0和1。
Spark中有两种Stage,即ShuffleMapStage与ResultStage。
Task
Task(任务)是Spark调度体系中的低层,阶段通常也都会包含多个任务,任务的数量则与RDD的分区数量有关。图#0.1显示每个Stage下又包含2个Task。图#0.2显示Stage 0下Task的详情,以及对应的监控指标。
Spark中也有两种Task,与Stage对应,分别为ShuffleMapTask与ResultTask。这种设计与Hadoop MapReduce中的Map任务和Reduce任务有点相似,但根本上还是不同的。
Shuffle
Shuffle的中文意译是“洗牌”。它是所有采用类MapReduce思想的大数据框架计算过程的必经阶段,处于Map与Reduce中间,非常重要。
它可以分为Shuffle write与Shuffle read两个子阶段。Shuffle write即Map阶段的最后,输出上游计算产生的中间数据。Shuffle read即Reduce阶段的开头,拉取中间数据用于下游计算。Shuffle一般会涉及大量的内存、磁盘读写及网络传输,因此也是各个大数据框架的优化重点。
图#0.1的右下角显示出Stage 0的Shuffle write及Stage 1的Shuffle read数据量。
DAG与RDD依赖
DAG即有向无环图(Directed acyclic graph),是个图论概念。如果一个有向图从任意顶点出发,均无法经过若干条边回到该点,则这个图是一个有向无环图。关于它的细节可以参考其他资料,如英文维基。
在Spark中,DAG被用来存储各个RDD之间的依赖关系,或者叫做“血缘”(Lineage),更能表达“一个RDD如何从其他RDD转换过来”的含义。它是RDD容错性的基础,一旦数据丢失或Task出错,可以根据DAG中的Lineage重算。图#0.1显示的DAG非常简单,蓝色方块表示RDD,箭头则表示Lineage。
RDD的依赖关系在Spark内部用Dependency来表示,分为窄依赖(NarrowDependency)和Shuffle依赖(又称宽依赖,ShuffleDependency)两种。窄依赖表示子RDD依赖于父RDD中固定的分区,Shuffle依赖则表示子RDD依赖于父RDD中不定数目的分区。
Executor与Driver
Executor(执行器)与Driver(驱动器)都是JVM进程。Executor负责执行Task的具体计算逻辑,并将计算结果返回给Driver。Driver负责执行Application中的main()方法,SparkContext及其他必需组件的初始化也是在Driver进行。另外,它也向Executor调度Task,以及跟踪它们的运行状况。
在图#0.2的Tasks表格中可以看到Executor ID一列。由于这个WordCount程序是纯本地执行的,因此Executor与Driver是同一个节点。
Spark集群
虽然上面的例子是用本地模式演示的,但在生产环境中,我们总会使用多台服务器组成Spark集群,以最大化Spark的计算性能。官方文档中提供的Spark集群架构如下图所示。
Cluster Manager
Cluster Manager即集群管理器,它负责为集群中所有节点分配并管理资源。它根据部署模式的不同,也会有不同的角色。如果采用Standalone模式,即Spark自己管理,那么集群管理器就是Spark集群的Master。如果采用Spark on YARN模式,集群管理器就是YARN中的ResourceManager了。除了Standalone与YARN之外,还可以使用Mesos、Kubernetes作为集群管理器。
Worker
Worker是Spark集群中的工作节点。它负责使用分配的资源创建并持有Executor,以及与Cluster Manager保持通信,通知自己的资源状况和Executor状况。在Standalone模式下,Worker称作Slave,即我们常说的Master-Slaves架构。在Spark on YARN模式下,Worker节点的角色实际会由YARN的NodeManager来替代。
总结
本文以最简单的WordCount程序为切入点,介绍了代码中涉及到的基础知识。然后,通过引入程序执行时的Web UI界面和Spark集群的基础架构,串联并总结了其中展示的多个Spark基本概念。
通过对这些知识点的总结和整理,我们可以对Spark Core内部的组成有一个大致的先验了解。除了SparkConf、SparkContext及它们背后的支撑体系之外,Spark Core还可以分为存储、计算、调度、部署、监控五大子系统,这些理念渗透在字里行间,如果仔细阅读,相信就能够理解了。
下一篇计划来写SparkConf。加油。