翻译:Hadoop权威指南之Spark-3

本文原始地址

Resilient Distributed Datasets

RDD是每个spark程序的核心,本节我们来看看更多细节。

Creation

创建RDD有三种方式:从一个内存中的对象集合,被称为并行化(parallelizing) 一个集合;使用一个外部存储(比如HDFS)的数据集;转变(transform)已存在的RDD。在对少量的输入数据并行地进行CPU密集型运算时,第一种方式非常有用。例如,下面执行从1到10的独立运算:

val params = sc.parallelize(1 to 10)
val result = params.map(performExpensiveComputation)

函数performExpensiveComputation并行处理输入数据。并行性的级别由属性spark.default.parallelism决定,该属性的默认值取决于Spark的运行方式。本地运行时,是本地机器的核心数量,集群运行时,是集群中所有执行(executor)节点的核心总数量。

可以为某特定运算设置并行性级别,指定parallelize()方法的第二个参数即可:

sc.parallelize(1 to 10, 10)

创建RDD的第二种方式,是创建一个指向外部数据集的引用。我们已经见过怎样为一个文本文件创建String对象的RDD:

val text:RDD[String] = sc.textFile(inputPath)

路径inputPath可以是任意的Hadoop文件系统路径,比如本地文件系统或HDFS上的一个文件。内部来看,Spark使用旧的MapReduce API中的TextInputFormat来读取这个文件。这就意味着文件切分行为与Hadoop是一样的,因此在HDFS的情况下,一个Spark分区对应一个HDFS块(block)。这个默认行为可以改变,传入第二个参数来请求一个特殊的切分数量:

sc.textFile(inputPath, 10)

另外一个方法允许把多个文本文件作为一个整体来处理,返回的RDD中,是成对的string,第一个string是文件的路径,第二个string是文件的内容。因为每个文件都会加载进内存,所以这种方式仅仅适合于小文件:

val files:RDD[(String, String)] = sc.wholeTextFiles(inputPath)

Spark能够处理文本文件以外的其他文件格式,比如,序列文件可以这样读入:

sc.sequenceFile[IntWritable, Text](inputPath)

注意这里指定序列文件的键和值的Writable类型的方式。对于常用的Writable类型,Spark能够映射到Java中的等价物,因此我们可以使用等价的方式:

sc.sequenceFile[Int, String](inputPath)

从任意的Hadoop InputFormat来创建RDD,有两种方式:基于文件的格式,使用hadoopFile(),接收一个路径;其他格式,比如HBase的TableInputFormat,使用hadoopRDD()。这些方法使用旧的MapReduce API。如果要用新的MapReduce API,使用newAPIHadoopFile()和newAPIHadoopRDD()。下面是读取Avro数据文件的示例,使用特定的API和一个WeatherRecord类:

val job = new Job()
AvroJob.setInputKeySchema(job, WeatherRecord.getClassSchema)
val data = sc.newAPIHadoopFile(inputPath,
    classOf[AvroKeyInputFormat[WeatherRecord]],
    classOf[AvroKey[WeatherRecord]], classOf[NullWritable],
    job.getConfiguration)

除了路径之外,newAPIHadoopFile()方法还需要InputFormat的类型、键的类型、值的类型,再加上Hadoop配置,该配置中带有Avro模式,在第二行我们使用AvroJob帮助类做的设置。

创建RDD的第三种方式,是转变(transform)已存在的RDD。

Transformations and Actions

Spark提供两种类型的操作:transformationsactions。transformations从已存在的RDD生成新的RDD,而actions会触发运算并输出结果——返回给用户,或者保存到外部存储。

Actions会立刻产生影响,而transformations不会——它们是懒惰的,它们不做任何工作,直到action被触发。下面的例子,把文本文件中的每一行转为小写:

val text = sc.textFile(inputPath)
val lower: RDD[String] = text.map(_.toLowerCase())
lower.foreach(println(_))

map()方法是个transformation,Spark内部这样处理:稍晚的时候,一个函数(这里是toLowerCase())会被调用,来处理RDD中的每一个元素。这个函数实际上没有执行,直到foreach()方法(这是个action)被调用,然后Spark会运行一个job,读取输入的文件,对文件中的每一行调用toLowerCase(),然后把结果写到控制台。

怎样分辨一个操作究竟是transformation还是action呢?一个方法是看它的返回类型:如果返回类型是RDD,这是个transformation;否则就是action。当你查阅RDD的文档时,这种方法是很有用的。对RDD执行的大多数操作,可以在RDD的文档(org.apache.spark.rdd包)中找到,更多的操作在PairRDDFunctions里,这里包含了处理键值对RDD的transformations和actions。

Spark的库中包含了丰富的操作,有transformations者诸如映射(mapping)、分组(grouping)、聚合(aggregating)、再分配(repartitioning)、取样(sampling)、连接(joining)多个RDD、把RDDs作为集合(sets)对待。还有actions者诸如把RDDs物化(materializing)为集合(collections)、对RDD进行计算统计、从RDD中取样出固定数目的元素,把RDD保存到外部存储。细节内容,查看文档。

MapReduce in Spark

尽管名字很有暗示性,Spark中的map()和reduce()操作,与Hadoop MapReduce中相同名字的函数,不是直接对应的。Hadoop MapReduce中的map和reduce的通常形式是:

map: (K1, V1) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)

从list标记可以看出,这两个函数都可以返回多个输出对。这种操作在Spark(Scala)中被实现为flatMap(),与map()很像,但是移除了一层嵌套:

scala> val l = List(1, 2, 3)
l: List[Int] = List(1, 2, 3)

scala> l.map(a => List(a))
res0: List[List[Int]] = List(List(1), List(2), List(3))

scala> l.flatMap(a => List(a))
res1: List[Int] = List(1, 2, 3)

有一种朴素的方式,可以在Spark中模拟Hadoop MapReduce。用两个flatMap()操作,中间用groupByKey()和sortByKey()来执行MapReduce的混洗(shuffle)和排序:

val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)

这里key的类型K2要继承自Scala的Ordering类型,以满足sortByKey()。

这个例子可以帮助我们理解MapReduce和Spark的关系,但是不能盲目应用。首先,这里的语义和Hadoop的MapReduce有微小的差别,sortByKey()执行的是全量排序。使用repartitionAndSortWithinPartitions()方法来执行部分排序,可以避免这个问题。然而,这样还是无效的,因为Spark有两次混洗的过程(一次groupByKey(),一次sort)。

与其重造MapReduce,不如仅仅使用那些你实际需要的操作。比如,如果不需要按key排序,你可以省略sortByKey(),这在Hadoop MapReduce中是不可能的。

同样的,大多数情况下groupByKey()太普遍了。通常只在聚合数据时需要混洗,因此应该使用reduceByKey(),foldByKey(),或者aggregateByKey(),这些函数比groupByKey()更有效率,因为它们可以在map任务中作为combiner运行。最后,flatMap()可能总是不需要的,如果总有一个返回值,map()是首选,如果有0或1个返回值,使用filter()。

Aggregation transformations

根据key来聚合键值对RDD的三个主要的transformations是reduceByKey(),foldByKey(),和aggregateByKey()。它们的工作方式稍有不同,但它们都是根据键来聚合值的,为每一个键生成一个单独的值。对应的actions是reduce(),fold()和aggregate(),它们以类似的方式运行,为整个RDD输出一个单独的值。

最简单的是reduceByKey(),它对成对儿的值反复执行一个函数,直到生成一个单独的值。例如:

val pairs: RDD[(String, Int)] =
    sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
val sums: RDD[(String, Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

键 a 对应的值,使用相加函数(+)聚合起来,(3 + 1)+ 5 = 9,而键 b 对应的值只有一个,因此不需要聚合。一般来说,这些操作是分布式的,在RDD的不同分区对应的任务中分别执行,因此这些函数要具有互换性和连接性。换句话说,操作的顺序和分组是不重要的。这种情况下,聚合函数可以这样执行 5 +(3 + 1),或者 3 + (1 + 5),都会返回相同的结果。

在assert语句中使用的三联相等操作符(===),来自ScalaTest,比通常的 == 操作符提供更多有用的失败信息。

下面是用foldByKey()来执行相同的操作:

val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

注意到这次我们需要提供一个零值,整数相加时是0,但如果是别的类型和操作,零值将是其他不同的东西。这一次,键 a 对应的值聚合的方式是((0 + 3)+ 1)+ 5)= 9(也可能是其他的顺序,不过加 0 总是第一个操作)。对于 b 是0 + 7 = 7。

使用foldByKey(),并不比reduceByKey()更强或更弱。特别地,也不能改变聚合结果的值类型。为此我们需要aggregateByKey(),例如,我们可以把那些整数值聚合到一个集合里:

val sets: RDD[(String, HashSet[Int])] =
    pairs.aggregateByKey(new HashSet[Int])(_+=_, _++=_)
assert(sets.collect.toSet === Set(("a", Set(1, 3, 5)), ("b", Set(7))))

集合相加时,零值是空集合,因此我们用new HashSet[Int]来创建一个新的可变集合。我们需要向aggregateByKey()提供两个函数作为参数。第一个函数用来控制怎样把一个Int和一个HashSet[Int]相加,本例中我们用加等函数 += 把整数加到集合里面(+ 会返回一个新集合,旧集合不会改变)。

第二个函数用来控制怎样把两个HashSet[Int]相加(这种情况发生在map任务的combiner执行之后,reduce任务把两个分区聚合之时),这里我们使用 ++= 把第二个集合的所有元素加到第一个集合里。

对于键 a,操作的顺序可能是:
(( ∅ + 3) + 1) + 5) = (1, 3, 5)
或者:
( ∅ + 3) + 1) ++ ( ∅ + 5) = (1, 3) ++ (5) = (1, 3, 5)
如果Spark使用了组合器(combiner)的话。

转变后的RDD可以持久化到内存中,因此后续的操作效率很高。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容