翻译:Hadoop权威指南之Spark-1

本文翻译自O'Reilly出版Tom White所著《Hadoop: The Definitive Guide》第4版第19章,向作者致敬。该书英文第4版已于2015年4月出版,至今已近15个月,而市面上中文第3版还在大行其道。Spark一章是第4版新增的内容,笔者在学习过程中顺便翻译记录,由于笔者也在学习,并无实战经验,难免翻译不妥或出错,欢迎方家来信斧正。翻译纯属兴趣,不做商业用途。秦铭,Email地址qinm08@gmail.com

本文原始地址


Apache Spark 是一个大规模数据处理的集群计算框架。和本书中讨论的大多数其他处理框架不同,Spark不使用MapReduce作为执行引擎,作为替代,Spark使用自己的分布式运行环境(distributed runtime)来执行集群上的工作。然而,Spark与MapReduce在API和runtime方面有许多相似,本章中我们将会看到。Spark和Hadoop紧密集成:它可以运行在YARN上,处理Hadoop的文件格式,后端存储采用HDFS。

Spark最著名的是它拥有把大量的工作数据集保持在内存中的能力。这种能力使得Spark胜过对应的MapReduce工作流(某些情况下差别显著),在MapReduce中数据集总是要从磁盘加载。两种类型的应用从Spark这种处理模型中受益巨大:1)迭代算法,一个函数在某数据集上反复执行直到满足退出条件。2)交互式分析,用户在某数据集上执行一系列的特定查询。

即使你不需要内存缓存,Spark依然有充满魅力的理由:它的DAG引擎和用户体验。与MapReduce不同,Spark的DAG引擎能够处理任意的多个操作组成的管道(pipelines of operators)并翻译为单个Job。

Spark的用户体验也是首屈一指的(second to none),它有丰富的API用来执行很多常见的数据处理任务,比如join。行文之时,Spark提供三种语言的API:Scala,Java和Python。本章中的大多数例子将采用Scala API,但翻译为别的语言也是容易的。Spark还带有一个基于Scala或Python的REPL(read-eval-print loop)环境,可以快速简便的查看数据集。

Spark是个构建分析工具的好平台,为达此目的,Apache Spark项目包含了众多的模块:机器学习(MLlib),图形处理(GraphX),流式处理(Spark Streaming),还有SQL(Spark SQL)。本章内容不涉及这些模块,感兴趣的读者可以访问 Apache Spark 网站

Installing Spark

下载页面 下载Spark二进制分发包的稳定版本(选择和你正在使用的Hadoop版本相匹配的)。在合适的地方解压这个tar包。

% tar xzf spark-x.y.z-bin-distro.tgz

把Spark加入到PATH环境变量中

% export SPARK_HOME=~/sw/spark-x.y.z-bin-distro
% export PATH=$PATH:$SPARK_HOME/bin

我们现在可以运行Spark的例子了。

An Example

为了介绍Spark,我们使用spark-shell来运行一个交互式会话,这是带有Spark附加组件的Scala REPL,用下面的命令启动shell:

% spark-shell
Spark context available as sc.
scala>

从控制台的输出,我们可以看到shell创建了一个Scala变量,sc,用来存储SparkContext实例。这是Spark的入口,我们可以这样加载一个文本文件:

scala> val lines = sc.textFile("input/ncdc/micro-tab/sample.txt")
lines: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

lines变量是对一个弹性数据集(RDD)的引用,RDD是Spark的核心抽象:分区在集群中多台机器上的只读的对象集合。在典型的Spark程序中,一个或多个RDD被加载进来作为输入,经过一系列的转变(transformation),成为一组目标RDD,可以对其执行action(比如计算结果或者写入持久化存储) 。“弹性数据集”中的“弹性”是指,Spark会通过从源RDD中重新计算的方式,来自动重建一个丢失的分区。

加载RDD和执行transformation不会触发数据处理,仅仅是创建一个执行计算的计划。当action(比如 foreach())执行的时候,才会触发计算。

我们要做的第一个transformation,是把lines拆分为fields:

scala> val records = lines.map(_.split("\t"))
records: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

这里使用了RDD的map()方法,对RDD中的每一个元素,执行一个函数。本例中,我们把每一行(字符串String)拆分为 Scala 的字符串数组(Array of Strings)。

接下来,我们使用过滤器(filter)来去掉可能存在的坏记录:

scala> val filtered = records.filter(rec => (rec(1) != "9999" && rec(2).matches("[01459]")))
filtered: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[3] at filter at <console>:16

RDD的filter方法接收一个返回布尔值的函数作为参数。这个函数过滤掉那些温度缺失(由9999表示)或者质量不好的记录。

为了找到每一年的最高气温,我们需要在year字段上执行分组操作,这样才能处理每一年的所有温度值。Spark提供reduceByKey()方法来做这件事情,但它需要一个键值对RDD,因此我们需要通过另一个map来把现有的RDD转变为正确的形式:

scala> val tuples = filtered.map(rec => (rec(0).toInt, rec(1).toInt))
tuples: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[4] at map at <console>:18

现在可以执行聚合了。reduceByKey()方法的参数是一个函数,这个函数接受两个数值并联合为一个单独的数值。这里我们使用Java的Math.max函数:

scala> val maxTemps = tuples.reduceByKey((a, b) => Math.max(a, b))
maxTemps: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:21

现在可以展示maxTemps的内容了,调用foreach()方法并传入println(),把每个元素打印到控制台:

scala> maxTemps.foreach(println(_))
(1950,22)
(1949,111)

这个foreach()方法,与标准Scala集合(比如List)中的等价物相同,对RDD中的每个元素应用一个函数(此函数具有副作用)。正是这个操作,促使Spark运行一个job来计算RDD中的数据,使之能够跑步通过println()方法:-)

或者,也可以把RDD保存到文件系统:

scala> maxTemps.saveAsTextFile("output")

这样会创建一个output目录,包含分区文件:

% cat output/part-*
(1950,22)
(1949,111)

这个saveAsTextFile()方法也会触发一个Spark job。主要的区别是没有返回值,而是把RDD的计算结果及其分区文件写入output目录中。

Spark Applications, Jobs, Stages, Tasks

示例中我们看到,和MapReduce一样,Spark也有job的概念。然而,Spark的job比MapReduce的job更通用,因为它是由任意的stage的有向无环图(DAG)组成。每个stage大致等同于MapReduce中的map或者reduce阶段。

Stages被Spark 运行时拆分为tasks,并行地运行在RDD的分区之上,就像MapReduce的task一样。

一个Job总是运行于一个application的上下文中,由SparkContext实例表示,application的作用是分组RDD和共享变量。一个application可以运行多个job,串行或者并行,并且提供一种机制,使得一个job可以访问同一application中的前一个job缓存的RDD。一个交互式的Spark会话,比如spark-shell会话,就是一个application的实例。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容