下载 Spark
1.官网地址 http://spark.apache.org/
2.文档查看地址 https://spark.apache.org/docs/2.1.1/
3.下载地址 https://archive.apache.org/dist/spark/
目前最新版本为 2.4.0, 考虑到国内企业使用情况我们仍然选择 2.1.1 来学习. 不过2.x.x 的版本差别都不大.
集群角色
Master 和 Worker
1. Master
Spark 特有资源调度系统的 Leader。掌管着整个集群的资源信息,类似于 Yarn 框架中的 ResourceManager,主要功能:
监听 Worker,看 Worker 是否正常工作;
Master 对 Worker、Application 等的管理(接收 Worker 的注册并管理所有的Worker,接收 Client 提交的 Application,调度等待的 Application 并向Worker 提交)。
2. Worker
Spark 特有资源调度系统的 Slave,有多个。每个 Slave 掌管着所在节点的资源信息,类似于 Yarn 框架中的 NodeManager,主要功能:
通过 RegisterWorker 注册到 Master;
定时发送心跳给 Master;
根据 Master 发送的 Application 配置进程环境,并启动 ExecutorBackend(执行 Task 所需的临时进程)
Driver和Executor
1. Driver(驱动器)
Spark 的驱动器是执行开发程序中的 main 方法的线程。
它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行。如果你是用Spark Shell,那么当你启动Spark shell的时候,系统后台自启了一个Spark驱动器程序,就是在Spark shell中预加载的一个叫作 sc 的SparkContext对象。如果驱动器程序终止,那么Spark应用也就结束了。主要负责:
将用户程序转化为作业(Job);
在Executor之间调度任务(Task);
跟踪Executor的执行情况;
通过UI展示查询运行情况。
2. Executor(执行器)
Spark Executor是一个工作节点,负责在 Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。主要负责:
负责运行组成 Spark 应用的任务,并将状态信息返回给驱动器程序;
通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor内的,因此任务可以在运行时充分利用缓存数据加速运算。
总结:Master 和 Worker 是 Spark 的守护进程,即 Spark 在特定模式下正常运行所必须的进程。Driver 和 Executor 是临时程序,当有具体任务提交到 Spark 集群才会开启的程序。
2.1 Local 模式
Local 模式就是指的只在一台计算机上来运行 Spark.
通常用于测试的目的来使用 Local 模式, 实际的生产环境中不会使用 Local 模式.
2.1.1 解压 Spark 安装包
把安装包上传到/opt/software/下, 并解压到/opt/module/目录下
tar-zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module
然后复制刚刚解压得到的目录, 并命名为spark-local:
cp -r spark-2.1.1-bin-hadoop2.7 spark-local
2.1.2 运行官方求PI的案例
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
/usr/hdp/3.0.1.0-187/spark2/examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar 100
使用spark-submit来发布应用程序.
语法:
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options<application-jar> \
[application-arguments]
--master 指定 master 的地址,默认为local. 表示在本机运行.
--class 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
--deploy-mode 是否发布你的驱动到 worker节点(cluster 模式) 或者作为一个本地客户端 (client 模式) (default: client)
--conf: 任意的 Spark 配置属性, 格式key=value. 如果值包含空格,可以加引号"key=value"
application-jar: 打包好的应用 jar,包含依赖. 这个 URL 在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar
application-arguments: 传给main()方法的参数
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 6 指定所有executor使用的cpu核数为6个
--executor-cores 表示每个executor使用的 cpu 的核数
结果展示
该算法是利用蒙特·卡罗算法求PI
备注: 也可以使用run-examples来运行
/usr/hdp/3.0.1.0-187/spark2/bin/run-example SparkPi 100
2.1.3 使用 Spark-shell
Spark-shell 是 Spark 给我们提供的交互式命令窗口(类似于 Scala 的 REPL)
本案例在 Spark-shell 中使用 Spark 来统计文件中各个单词的数量.
步骤1: 创建 2 个文本文件
mkdir input
cd input
touch 1.txt
touch 2.txt
分别在 1.txt 和 2.txt 内输入一些单词.
步骤2: 打开 Spark-shell
spark-shell --master local
步骤3: 查看进程和通过 web 查看应用程序运行情况
地址: http://slaver03.sugon.com:4040/jobs/
步骤4: 运行 wordcount 程序
sc.textFile("file:///opt/zhangxw/input/").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect
步骤5: 登录slaver03.sugon.com:4040/查看程序运行
2.1.4 提交流程
Spark 通用运行简易流程
2.1.5 wordcount 数据流程分析:
1. textFile("input"):读取本地文件input文件夹数据;
2. flatMap(_.split(" ")):压平操作,按照空格分割符将一行数据映射成一个个单词;
3. map((_,1)):对每一个元素操作,将单词映射为元组;
4. reduceByKey(_+_):按照key将值进行聚合,相加;
5. collect:将数据收集到Driver端展示。
2.2 Spark 核心概念介绍
driver program(驱动程序)
每个 Spark 应用程序都包含一个驱动程序, 驱动程序负责把并行操作发布到集群上.
驱动程序包含 Spark 应用程序中的主函数, 定义了分布式数据集以应用在集群中.
在前面的wordcount案例集中, spark-shell 就是我们的驱动程序, 所以我们可以在其中键入我们任何想要的操作, 然后由他负责发布.
驱动程序通过SparkContext对象来访问 Spark, SparkContext对象相当于一个到 Spark 集群的连接.
在 spark-shell 中, 会自动创建一个SparkContext对象, 并把这个对象命名为sc.
RDDs(Resilient Distributed Dataset) 弹性分布式数据集
一旦拥有了SparkContext对象, 就可以使用它来创建 RDD 了. 在前面的例子中, 我们调用sc.textFile(...)来创建了一个 RDD, 表示文件中的每一行文本. 我们可以对这些文本行运行各种各样的操作.
在第二部分的SparkCore中, 我们重点就是学习 RDD.
cluster managers(集群管理器)
为了在一个 Spark 集群上运行计算, SparkContext对象可以连接到几种集群管理器(Spark’s own standalone cluster manager, Mesos or YARN).
集群管理器负责跨应用程序分配资源.
executor(执行器)
SparkContext对象一旦成功连接到集群管理器, 就可以获取到集群中每个节点上的执行器(executor).
执行器是一个进程(进程名: ExecutorBackend, 运行在 Worker 上), 用来执行计算和为应用程序存储数据.
然后, Spark 会发送应用程序代码(比如:jar包)到每个执行器. 最后, SparkContext对象发送任务到执行器开始执行程序.
专业术语
2.3 Standalone 模式
2.4 Yarn 模式
2.4.1 Yarn 模式概述
Spark 客户端可以直接连接 Yarn,不需要额外构建Spark集群。
有 yarn-client 和 yarn-cluster 两种模式,主要区别在于:Driver 程序的运行节点不同。
yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出
yarn-cluster:Driver程序运行在由 RM(ResourceManager)启动的 AM(AplicationMaster)上, 适用于生产环境。
工作模式介绍:
2.4.2 Yarn 模式配置
步骤1: 修改 hadoop 配置文件 yarn-site.xml, 添加如下内容:
由于咱们的测试环境的虚拟机内存太少, 防止将来任务被意外杀死, 配置所以做如下配置.
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
修改后分发配置文件.
步骤2: 复制 spark, 并命名为spark-yarn
cp -r spark-standalone spark-yarn
步骤3: 修改spark-evn.sh文件
去掉 master 的 HA 配置, 日志服务的配置保留着.
并添加如下配置: 告诉 spark 客户端 yarn 相关配置
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop
步骤4: 执行一段程序
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client /usr/hdp/3.0.1.0-187/spark2/examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar 100
2.4.3 日志服务
在前面的页面中点击 history 无法直接连接到 spark 的日志.
可以在spark-default.conf中添加如下配置达到上述目的
spark.yarn.historyServer.address=hadoop201:18080 spark.history.ui.port=18080
如果在 yarn 日志端无法查看到具体的日志, 则在yarn-site.xml中添加如下配置
<property> <name>yarn.log.server.url</name> <value>http://hadoop201:19888/jobhistory/logs</value> </property>
3.1 编写 WordCount 程序
步骤1:创建 maven 项目, 导入依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> <build> <plugins> <!-- 打包插件, 否则 scala 类不会编译并打包进去 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
步骤2: 创建WordCount.scala
package day01 object WordCount { def main(args: Array[String]): Unit = { // 1. 创建 SparkConf对象, 并设置 App名字 val conf: SparkConf = new SparkConf().setAppName("WordCount") // 2. 创建SparkContext对象 val sc = new SparkContext(conf) // 3. 使用sc创建RDD并执行相应的transformation和action sc.textFile("/input") .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .saveAsTextFile("/result") // 4. 关闭连接 sc.stop() } }