1.1 Spark 是什么
Apache Spark 是一个快速的,多用途的集群计算系统,相当于Hadoop MapReduce ,Spark 使用了内存来保存中间结果,能在数据还未写入磁盘的时候在内存中进行运算.
Spark 只是一个计算框架,不像Hadoop 一样包含分布式文件系统和完备的调度系统,如果要使用 Spark ,就需要搭载其他的文件系统和其他的调度系统.
为什么会有Spark ?
Spark 产生之前,已经有非常成熟的计算系统的存在了,比如我们的 MapReduce ,这些计算系统提供了高层次的 API ,把计算运行在集群中并提供容错能力,从而实现分布式计算.
虽然这些框架提供了大量的对访问利用计算资源的抽象,但是他们缺少了利用分布式内存的抽象,这些框架的缺点在于,多个计算之间的数据服用就是将中间的结果数据写入到一个稳定的文件系统中(比如: HDFS ),所以会产生数据的复制和备份,磁盘的 I/O 以及数据的 序列化, 所以这些框架在遇到需要在多个计算之间复用中间结果的操作的时会非常的不高效.
而这些复用中间结果的操作是非常的常见的,例如: 迭代式计算,交互式的数据挖掘, 图形计算等等
认识到这个问题之后,学学术界提出了一个新的模型 : RDDS
RDDS 是一个可以容错且并行的数据结构,它可以让用户显式的将中间结果数据集保存在内存中,并且通过控制数据集的分区来达到数据存放处理的最优化.
同时 RDDS 也提供了丰富的 API 来操作数据集
后来 RDDS 被 AMPLab 在 一个叫做 Spark 的框架中进行了开源
总结:
- Spark 是 Apache 的一个开源框架
- Spark 的母公司叫 Databticks
- Spark 是为了解决 MapReduce 等过去的计算系统无法在内存中保存中间结果的问题
- Spark 的核心是 RDDS ,RDDS 不仅谁一个计算框架,也是一种数据结构.
- RDDS即Resilient distributed datasets(弹性分布式数据集)。
1.2 Spark 的特点(优点)
理解 Spark 的特点,从而去理解 为什么要使用 Spark
速度快
- Spark 在内存中运行时,速度是 Hadoop MapReduce 的 100 倍
- Spark 基于硬盘的运算速度 大概是 Hadoop MapReduce 的 10 倍
- Spark 实现了一种叫做 RDDS 的 DAG(Directed Acyclic Graph)叫做有向无环图 执行引擎,其数据缓存在内存中 可以迭代处理.
易用
- Spark 支持 JAVA ,Scala , Python ,R , SQL 等多种语言的 API
- Spark 支持超过 80 个 高级运算符使得 用户非常容易的构建并行计算程序
- Spark 可以使用基于 Scala ,Python ,R , SQL 的 shell 交互式查询
通用
- Spark 提供一个完整的技术栈,包括 SQL 执行, Dataset 命令式 API, 机器学习库 MLlib ,图计算框架 GraphX ,流计算 SparkStreaming.
- 用户可以在同一个应用同时使用这些工具 , 这一点是划时代的.
兼容
- Spark 可以运行在 Hadoop Yarn ,Apache Mesos, Kubernets, Spark Standalone 等集群中
- Spark 可以访问 HBase , HDFS , Hive , Cassandra 在内的多种数据库
总结
- 支持 多种语言的 API
- 可以扩展至 超过 8K 个节点
- 能够在内存中缓存数据集,以实现交互式数据分析
- 提供命令行窗口, 减少探索式的数据分析的反应时间
1.3 Spark 组件
理解 Spark 能做什么
理解 Spark 的学习路线
- Spark 最核心的功能是 RDDS , RDDS 存在于 Spark-Core 这个包中,这个包也是 Spark 最核心的包
同时 Spark 在 Spark-Core 的上层提供了很多的工具,以便适用于不同类型的计算
Spark-Core 和 RDDs(弹性分布式数据集 Resilient distributed datasets)
- Spark SQL 在 Spark-core 基础之上带出了一个名为 DataSet 和 DataFrame 的数据抽象化的概念
- Spark SQL 提供了在 DataSet 和 DataFrame 之上执行 SQL 的能力
- Spark SQL 提供了DSL ,可以通过 Scala , Java , Python 等语言来操作 DataSet 和 DataFrame
- 它还支持 使用 JDBC / ODBC 服务器操作 SQL 语言
Spark Streaming
- Spark Streaming 充分利用 Spark-Core 的快速调度能力来运行 流分析
- 它截取小批量的数据并可以对之运行 RDD Transformation
- 它提供了在同一个程序中同时使用 流分析 和批量分析的能力
MLlib
- MLlib 是 Spark 上分布式机器学习的框架, Spark 分布式内存的架构比 Hadoop 吸盘式的 Apache Mahout 快上 10 倍, 有很大的扩展性
- MLlib 可以使用许多常见的 机器学习 和 统计算法 ,简化大规模 机器学习
- 汇总统计,相关性,分层抽样,假设检定,随机数据生成
- 支持向量机,回归,线性回归,逻辑回归,决策树,朴素贝叶斯
- 协同过滤, ALS
- k-means
- SVD 奇异值分解, PCA 主要成分分析
- TF-IDF, Word2Vec , StandardScaler
- SGD随机梯度下降,L-BFGS
Graphx
- Graphx 是分布式图计算框架,提供了一组可以表达图计算的 API ,Graphx 还对这种抽象化提供了优化运行
总结
- Spark 提供了批处理(RDDS) , 结构化查询(DataFrame), 流计算 (SparkStreaming),机器学习(MLlib),图计算(GraphX) 组件
- 这些组件都是依托于 通用的 计算引擎 RDDS 而构建出来的,所以 Spark-core 的 RDDS 是整个 Spark 的基础
1.4 Spark 和 Hadoop 的差别
Spark 集群搭建
从Spark 的集群架构开始,理解分布式环境,以及 Spark 的运行原理
理解Spark 的集群搭建,包括高可用的搭建方式
2.1 Spark 集群的结构
通过应用运行流程,理解分布式调度的基础概念
Sprak 如果将程序运行在一个集群中?
Spark 自身是没有集群管理工具的,但是如果想要管理数以千计的机器集群,就必须拥有一个集群的管理工具,所以 Spark集群 可以借助外部的集群工具来进行管理
-
整个的流程使用 Spark 的 Client 提交任务, 找到集群管理工具去申请资源, 然后将计算任务分发到集群中运行。
名词解释
Driver
- 该进程调用 Spark 程序的 Main 方法,并且启动 SparkContext
Cluster Manager
- 该进程负责和外部集群工具打交道,申请和释放集群资源
worker
- 该进程是一个守护进程,负责启动和管理 Executor
Executor
-
该进程是一个 JVM 虚拟机,负责运行 Spark Task
运行一个 Spark 程序大致经历如下几个步骤
1.启动 Driver ,创建 SparkContext
- Client 提交程序给 Driver ,Driver 向Cluster Manager 申请集群资源
- 资源申请完毕,在 Worker 中 启动 Executor
- Driver 将程序转化为 Tasks ,分发给 Executor 执行
问题一: Spark 程序可以运行在什么地方?
- 集群:一组协同工作的计算机,任务在调度软件的控制下,表现的好像在使用一台计算机一样
- 集群管理工具: 调度任务到集群的软件
- 常见的机权管理工具: Hadoop Yarn , Apache Mesos, Kubernetes
Spark 可以将任务运行在两种模式下:
- 单机: 使用线程模拟并行来运行程序
- 集群: 使用集群故管理器 来和不同类型的集群交互,将任务运行在集群中
Spark 可以使用的集群管理工具有:
- Spark Standalone (自带的)
- Hadoop Yarn
- Apache Mesos
- Kubernetes
问题二: Driver 和 Worker 什么时候启动?
在Standalone 集群中,分为两个角色, Master 和 Slave , 而 Slave 就是Worker ,所以在 Standalone 集群中,启动之初就会创建固定数量的 Worker
Driver 的启动分为两种模式: Client 和 Cluster
在 Client 模式下, Driver 运行在 Client 端,在Client 启动的时候被启动。
-
在 Cluster 模式下,Driver 运行在某个 Worker 中,随着应用的提交而启动。
在Yarn 集群模式下,也依然分为 Client 模式和 Cluster 模式,较新的版本中已经开始废弃 Client 模式了,所以上图所示的就是 Cluster 模式
如果要在 Yarn 中运行 Spark 程序,首先要和 ResourceManager 交互,开启 ApplicationMaster ,其中运行了 Driver ,Driver 创建基础环境后,会由 ResourceManager 提供到对应的容器中,运行 Executor , Executot 会反向向 Driver 注册自己,并申请 Tasks 执行。
总结
- Master 负责总控,调度,管理和协调 Worker ,保留资源状况等。
- Slave 对应的 Worker 节点,用于启动 Executor 执行 Task ,定期向 Master 汇报
- Driver 运行在 Client 或者 Slave(Worker) 中,默认运行在 Slave(Worker ) 中
RDD 是什么?
定义:
RDD,全称为 Resilient Distributed Datasets ,是一个容错的,并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。
同时,RDD, 还提供了一组丰富的操作来操作这些数据,在这些操作中,包括 map ,flatMap ,filter 等转换操作实现了 MONAD 模式,很好的契合了 Scala 的集合操作,除此之外,RDD 还提供了 join , groupBy ,reduceBykey 等更方便的操作,以支持创建的数据运算,通常来讲,针对数据处理有几种常见的模型: 包括: Iterative Algorithms, Relational Queries, MapReduce, Stream Processing.例如 Hadoop MapReduce 采用了 MapReduce 模型, Storm 则采用了 Stream Processing 模型.
而RDD 则混合了这四种模型,使得 Spark 可以应用于各种大数据的处理场景。
RDD 作为数据结构,本质上是一个只读的分区记录集合,一个RDD 可以包含多个分区,每个分区就是一个DataSet 片段。RDD 之间可以相互的依赖,如果 RDD 的每个分区最多只能被一个子 RDD 的一个分区使用,则称之为: “窄依赖” ,若被多个子 RDD 的分区依赖,称之为“宽依赖” ,不同的操作,依据其特性,可能会产生不同的依赖 ,列入 Map 操作则会插产生窄依赖,而 Join 操作,就会产生宽依赖。
特点
- RDD 是一个编程模型
- RDD允许用户显式的指定数据存放在内存或者磁盘
- RDD 是分布式的,用户可以控制 RDD 的分区
- RDD 是一个编程模型
- RDD提供了丰富的操作API
- RDD 提供了reduceByKey ,groupByKey 等操作符,用以操作 Key-Value 型数据
- RDD提供了 max ,min , mean 等操作符,用以操作数字型数据
- RDD 提供了 map ,flatMap ,filter 等操作符,用以实现 Monad 模式(简单说,Monad就是一种设计模式,表示将一个运算过程,通过函数拆解成互相连接的多个步骤。你只要提供下一步运算所需的函数,整个运算就会自动进行下去。)
3.RDD 是混合型编程模型,可以支持迭代计算,关系查询,MapReduce ,流计算
4.RDD 是只读的。
5.RDD 之间有依赖关系,更具执行操作的操作符的不同,依赖关系可以分为 宽依赖 和 窄依赖。
(wordcount.txt)单词统计的 案例程序从结构上可以用上图表示,分为两大部分
存储
- 文件如果存放在 HDFS 上 ,就是分块的,类似上图所示,这个 wordcount.txt 分了三块
计算
Spark 不仅可以读HDFS ,Spark 还可以读取很多其他的数据集,Spark 可以从数据集中创建出 RDD
比如: 上图中,使用了一个RDD 表示 HDFS 上的 某一个文件,这个文件在 HDFS 中是分为 3块的,那么 RDD 在读取的时候也会有 3 个分区, 每个 RDD 的分区对应了一个 HDFS 的分块。
后续RDD 在计算的时候,可以更改分区,也可以继续保持三个分区,每个分区之间有依赖关系,列入说 RDD2 的分区1 依赖了 RDD1 的分区1
RDD之所以要设计为有分区,是因为要进行分布式的计算,每个不同的分区在不同的线程,或者进程,甚至节点中,从而做到了并行计算。
总结
- RDD 是弹性分布式数据集
- RDD 一个非常重要的基础是RDD 可以分区,因为RDD 是运行在分布式的环境下的。
4.1 创建RDD
SparkContext 程序的入口
val conf = new SparkConf().setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
SparkContext 是 Spark-core 的入口组件,是一个 Spark 的程序的入口,是一个元老级别的API 了.
如果把 一个 Spark 程序分为前后端,那么服务端就是可以运行在 Spark 程序的集群,而 Driver 就是 Spark 的前端,在 Driver 中 SparkContext 是最主要的组件,也是 Driver 在运行时首先会创建的组件,是 Driver 的核心组件。
SparkContext 从提供的 API 来看,主要的作用是来连接集群,创建 RDD ,累加器, 广播变量等 功能。
简单的说,RDD 有三种创建方式
RDD 可以通过本地集合之间创建
RDD 也可以通过读取外部数据集来创建
RDD 也可以通过其他的 RDD 衍生而来
通过本地集合直接创建 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6)
val rddParallelize = sc.parallelize(list, 2)
val rddMake = sc.makeRDD(list, 2)
通过 parallelize 和 makeRDD 这两个 API 可以通过本地集合创建 RDD
这两个 API 本质上是一样的, 在 makeRDD 这个方法的内部, 最终也是调用了 parallelize
因为不是从外部直接读取数据集的, 所以没有外部的分区可以借鉴, 于是在这两个方法都都有两个参数, 第一个参数是本地集合, 第二个参数是分区数
通过读取外部文件创建 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)
val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")
访问方式:
支持访问文件夹
sc.textFile("hdfs:///dataset")
支持访问压缩文件
sc.textFile("hdfs:///dataset/words.gz")
支持通过通配符访问
sc.textFile("hdfs:///dataset/*.txt")
注意
如果把 Spark 应用泡在集群上,则 worker 有可能在任何一个节点上运行
所以如果使用 如下形式访问本地文件的话,要确保所有的 worker 中的对应的路径上有这个文件,否则可能会报错,无法找到这个文件。
file:///…
分区
默认情况下读取 hdfs 中的文件的时候,每个 hdfs 的 block 对应一个 RDD 的 partition ,block 的默认是 128M
通过第二个参数,可以指定分区数量
sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 20)
如果通过第二个参数指定了分区,这个分区的数量一定不能小于 “block” 的数量
通过其他的 RDD 衍生 新的 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)
val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 20)
val words = source.flatMap { line => line.split(" ") }
source 是通过读取 HDFS 中的文件所创建的
words 是通过 source 调用算子 map 生成的新 RDD
通常每个 CPU core 对应 2-4 个分区是合理的值
支持的平台
支持Hadoop几乎所有的数据格式,支持HDFS 的访问
通过第三方的这次hi,可以访问Aws 和 阿里云中的文件,可以到对应的平台查看API
总结:
RDD 可以通过三种方式来创建,本地集合创建,外部数据集创建,其他的RDD 衍生。
RDD 算子
理解各个算子的作用
通过理解算子的作用,反向理解 WordCount 程序,以及 Spark的要点
Map算子
sc.parallelize(Seq(1, 2, 3))
.map( num => num * 10 )
.collect()
作用
把RDD 中的数据一对一的转为 另外的一种形式
调用
def map[U: ClassTag](f: T ⇒ U): RDD[U]
参数
f → Map 算子是 原RDD → 新RDD 的过程, 这个函数的参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据
注意点
Map 是一对一,如果函数是 String → Array [String ] 则新的 RDD 中每条数据就是一个数组
FlatMap 算子
sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
.flatMap( line => line.split(" ") )
.collect()
作用
FlatMap 算子和 Map 算子雷士,但是FlatMap 是一对多
调用
def flatMap[U: ClassTag](f: T ⇒ List[U]): RDD[U]
参数
f → 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDD
注意点
flatMap 其实是两个操作,是 map + flatten ,也是先转换,后把转换而来的List 展开。
ReduceByKey
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.reduceByKey( (curr, agg) => curr + agg )
.collect()
作用
首先按照 Key 分组,接下来把整组的 Value 计算出一个聚合值,这个操作非常类似于 MapReduce 中的 Reduce
调用
def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
参数
func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果
注意点
ReduceByKey 只能作用于 Key-Value 型数据, Key-Value 型数据在当前语境中特指 Tuple2
ReduceByKey 是一个需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少
总结
map 和 flatMap 算是都是转换,只是 flatMap 在转换过后会在执行展开,所以 map 是一对一,flatMap 是一对多
reduceByKey 类似于 MapReduce 中的reduce