Spark 3. RDD 操作一 基础 ,放入方法,闭包,输出元素, 使用 K-V 工作

RDD 操作一 基础 ,放入方法,闭包,输出元素,使用 Key-Value 工作

原文地址: http://spark.apache.org/docs/latest/programming-guide.html
仅限交流使用,转载请注明出处。如有错误,欢迎指出!

Henvealf/译

RDD 提供了两种类型的操作:

  • transformations :从已经存在的 RDD 中创建出一个新的 RDD。
  • actions: 在集群上运行了一个计算后,最终返回一个值给设备中的程序。

transformation 的一个例子就是map,对 RDD 中的每个元素进行相同的操作,返回一个新的 RDD。
action 的一个例子就是 reduce,使用相同的函数来聚合 RDD 中的元素。

在 Spark 中,所有的 transformation 都是懒惰的(lazy),以至于他不会立刻计算出他们结果。代替的是,他们仅仅记住这个 transformation 应用在哪些基础的数据集上(比如一个文件)。transformation 计算仅仅是在程序中的一个动作需要一个返回值的时候才开始。这个设计让能够让 Spark 更加高效。举个例子,我们能够意识到一个 map 生成的数据集只会用在一个 reduce 上,并且仅仅返回 reduce 的结果给设备,而不会是一个 map 后的很大的数据集给设备。

默认情况下,在你每次重新运行一个通过转换(transforme)得到的RDD的action 的时候,转换每次都可能重新再运行一次。然而,你也可以使用 persist(或者 cache)方法将一个 RDD 持久化在内存中,这样就能让 Spark 把这些元素维持在集群中,让下一次的存取速度变得飞快。这里也同样支持持久话 RDD 在磁盘中,或者备份在多个节点中。

基础

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
lineLengths.persist()
val totalLength = lineLengths.reduce((a, b) => a + b)

往 Spark 中放入方法

scala

Spark 的 API 最信任的就是在集群上运行的方法上放方法。下面有两个建议:

  • 使用匿名方法语法,能够减少代码量。
  • 静态化在全局的单例对象上的函数,就是定义一个object,可以把它理解为直接创建了一个对象,不需要 new 就可以使用。也可以把他理解为一个类,而其中的函数都默认为静态的。里面有你用到所有函数/方法。比如,你可以定义 object MyFunctions ,之后通过 MyFunxtions 来使用方法:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意你也可能传入一个引用给一个类(class) 中的函数(与单例 object 的做法是相反的),他需要向 Spark 中传入包含了要使用的方法的类的对象。比如下面:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

着这里,我们要 new 一个 MyClass 的对象才能使用 doSuff 方法。我们要将这一整个对象传送入集群中才可以,然后书写方式和 rdd.map(x => this.func1(x)) 很像。

用很相似的方式,外部的对象存取字段就会引用到整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

可以发现这样其实就是 rdd.map(x => this.field + x), 这样在外部就得到了他的 this 引用,这样很不安全,容易出错,为了解决这个问题,下面有一个简单方式, 就是先将字段赋给一个局部变量中:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解闭包(closures)

在 Spark 中比较难理解的就是当在集群上运行代码的时候,变量和方法的作用域与生命周期。在其作用域之外修改变量的 RDD 操作可以是一个混淆的常用资源( can be a frequent source of confusion)。下面我们使用 foreach 来递增一个计数器,相同的问题也能同样出现在其他的操作上。

例子

考虑原生的 RDD 元素的加和操作,这个操作在不同的虚拟机上执行可能会呈现出不同的行为。这个普通的例子是将 Spark 运行在 local 模式下(--master = local[n])的情况与运行在集群上的情况做比较(通过 park-submit 给 YARN)。

Scala

var counter = 0;
var rdd = sc.parallelize(data)

// Wrong: 不会执行他
rdd.foreach(x => counter + x)

println("Counter: " + counter)

本地模式 Vs. 集群模式

在这之前先明确一下一些概念:

Driver: 驱动器,一个 job 只有一个,主要负责 job 的解析,与 task 的调度等。

Executor:执行器,实际运行 task 的地方,一个 job 有多个。

上面这段代码的行为是不确定的,可能不像预想中那样工作。为了执行 job ,Spark 会将处理 RDD 的操作拆分到许多 task 中,且每一个 task 被一个执行器执行。在执行之前, Spark 会计算 task 的闭包。闭包是一些必须让执行器可见的变量和方法,这样执行器才能执行他们在 RDD 上的操作(这里就是 foreach)。这个闭包是被序列化并传送到了每个执行器。

在集群上的变量会立刻被送到每个执行器中,事实上,当 counter 被引用使用在 foreach 方法里面时,他就不再是驱动(driver)节点上的 counter 了。也就是说在驱动节点的内存中也一直会有一个 counter ,可他对执行器来说,已经不可见了。执行器仅仅能看到序列化了的闭包中的拷贝。事实上, 驱动器上的 final 的 counter 的值在操作执行的时候一直都是0,执行器操作的只是引用的序列化的闭包中的值。

在 local 模式下,foreach 方法实际会运行在作为驱动器的 JVM 中,也就是说运行程序的 JVM 和运行驱动器的 JVM 是同一个。所以操作就会引用到原始的 countercounter 的值就被改变了。

如果想要确保现在说的这种情况有确定的行为,一种就是使用一个 Accumulator(积累器)。Accumulator 常常使用于在执行被分片到不同的 worker 时需要安全的对变量进行更新的情况。 Accumulator 以后详细介绍。

一般情况下,闭包--构建循环或者局部函数,应该不要用于改变一些全局的状态。 Spark 不能确定或者保证修改闭包之外的的对象引用时的行为。一些代码在本地模式下运行的好好的,在放到集群上运行时就可能得不到期望的结果。如果需要使用全局的聚合,就使用一个 Accumulator 来代替他。

输出一个 RDD 的元素

另一个老事件就是试图使用 rdd.foreach(println) 或者 rdd.map(println) 打印出元素的值。在一个机器上,输出 RDD 所有的元素的将会生成你期望的输出。然而,在 cluster 模式下是,stdout 会由执行器来调用,写在了执行器的标准输出上,而不是驱动器上。所以在驱动器上你就看不到 stdout 的输出类了。

为了在驱动器上输出所有的元素,一个你可以使用 collect 方法,先把这个 RDD 带到驱动器节点上: rdd.collect().foreach(println)。不过这中方法容易造成内存不足。因为 collect() 会把 RDD 实体拿进一个单独的机器中;如果仅仅需要输出 RDD 的一小部分元素,最安全的方式是使用 take(): rdd.take().foreach(println).

使用 Key-Value 工作

Scala

RDDs 中包括任何类型的对象,有一些特殊的操作是能用于 RDDs 的键值对上。 最普遍的就是集群上的 “洗牌” 过程,就是使用 key 来进行分组和聚合。

在 Scala 中,这些操作在 包含 ** Tuple2** (二元组)对象的 RDDs 中是自动(直接?)可用的(在本语言中,之间写一个(a,b) 就能创建 tuples )。键值对操作可以在 PairRDDFunction 中得到,他是自动包装了一个元组RDD。

举个例子,下面的代码就在键值对上使用 reduceByKey 操作来计算当前文件的行数。

val lines = sc.textFile("data.txt")
val pairs = line.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我们也可以使用 counts.sortByKey(),在这个例子中,机会按字母排序这些简直对,最后 count.collect() 就将他们带会驱动器程序,作为一个对象数值使用。

注意 :当你使用自定义的对象来作为键值对的键值,你必须保证这个自定义的该对象的 equals 方法和与之联合匹配的 hashCode() 方法。

End !!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容