Spark09 RDD分区与并行调优

向 Spark 传递函数

在 Python 中如果函数比较短可以用 lambda 方式,当然也可以用传递函数
如:

word = rdd.filter(lambda s : 'error' in s)
def containsError(s):
    return 'error' in s
word = radd.filter(containsError)

在 Spark 中我们知道 'error' in s 这段代码是在 executor 中执行的,函数的定义是在 Driver 中。所以 Spark 会把 containsError 这个函数发送到 executor 中。所以如果 containsError 函数调用了其他变量 函数 类 等 都会经过序列化后发送到 executor 中。 lambada 也是一样的,lambda 不过就是不需要定义函数而已,可以理解为 lambda 是一个匿名函数。比如:

class SearchFunctions(object):
    def __init__(self, query):
        self.query = query
    def isMatch(self, s):
        return self.query in s
    def getMachesMemberReference(self, rdd):
        return rdd.filter(lambda x : self.query in x)
    def getMachesFunctionsReference(self, rdd):
        return rdd.filter(self.isMatch)

上述代码在执行是会报错。原因在于 Spark 向 executor 发送函数是发现有self.query 含有 SearchFunctions 的引用字段。 就需要把SearchFunctions 类实例的对象序列化后全部发送到 executor 中去。在 Python 中 对象序列化会报错。所以上述代码会执行报错。
在 Java 或 Scala 中可以让类实现 Serializable 接口 就可以将对象序列化。当然不是所有对象都是可以序列化的。比如 数据库连接,网络通道等。所以在实际中尽量避免将对象序列化传到 executor 中。上述例子可以做如下修改

 def getMachesMemberReference(self, rdd):
        query = self.query
        return rdd.filter(lambda x : query in x) # 该方式不会有类的引用字段 就不需要将对象序列化传递到 executor 中去

数据分区

在执行 RDD 的算子是会把数据分成多组发送到不同 executor 中执行 比如

rdd.filter(lambda x : len(x) > 0) #默认分区 并行执行

如果需要指定分区数用 repartition 算子

rdd.repartition(10).filter(lambda x : len(x) > 0)  # 将数据分为10分区

repartition 算子他会把数据通过网络进行混淆,并创建出新的分区集合。对数据进行重新分区往往代价比较大。

分区并不是对所有的应用都有好处,比如给定的 RDD 只需要扫描一次,就完全没必要分区。

Spark 中所有的键值对 RDD 都可以进行分区。系统会根据一个针对键值对的函数对元素进行分区。尽管 Spark 没有给出显示控制每个键具体落在哪一个 executor 节点上的方法,但 Spark 可以确保同一组的键出现在同一个节点上。比如,可以使用哈希分区将一个 RDD 分成100分分区,此时键的哈希值对100取模的结果相同的记录会被放在一个节点上。

举例:有一张很大的用户信息表,由(UserId, UserInfo)组成的RDD。有另外一张表(UserId, LinkInfo)对组成的表。存放过去5分钟内某网站各个用户访问的情况。我们要统计用户访问情况,可以使用 Spark join 方法 关联 UserInfo 和 LinkInfo 的组合。简单代码如下:
Python

userDataRdd =  #通过文件或其他方式加载数据 结构为(UserId, UserInfo)
def processNewLog():
   eventsRdd = #通过文件或其他方式加载过去5分钟用户行为数据 结构为(UserId, LinkInfo)
   userAndLinkInfoRdd = userDataRdd.join(eventsRdd)
   # 可继续对userAndLinkInfoRdd 进行 map filter 等操作  获取需要的数据

上述代码能正确运行但效率不高。每次调用 processNewLog 是都会进行 join 操作。我们对数据集如何分区一无所知,默认情况下,连接操作会将两个数据集中的所有键的 Hash 值都求出来,将该 Hash 值相同的记录通过网络传同一台机器上,然后在那台机器上对所有键相同的记录进行连接。因为 UserInfo 表比 5 分钟一次的 eventsRdd 要大的多,所以会浪费时间做很多无用工作,在每次调用都会对 UserInfo 表进行 Hash 值计算和夸节点数据混洗,虽然这些数据从来不会变化。

要解决这一问题也很简单。在程序开始时对 UserInfo 做 partitionBy 操作

userDataRdd = userDataRdd.partitionBy(100)

processNewLog 方法保持不变。在 Java 或 Scala 中需要将 HashPartitioner 传入partitionBy方法 比如:

userDataRdd = userDataRdd.partitionBy(new HashPartitioner(100))

eventsRdd 数据仅使用一次 所以指定分区没有什么用处。由于 userDataRdd 调用了 partitionBy,Spark 就知道了该 RDD 是根据键的哈希值来分区的。这样在调用 join时,Spark 会利用到这一点。当调用 userDataRdd.jonin(eventsRdd) 时,Spark 只会对 eventsRdd 进行数据混洗操作,将 eventsRdd 中特定 UserId 的记录发送到 UserDataRdd 的对应分区所在的那台机器上,这样需要的网络传输数据就会大大减少,程序运行效率也显著提高了。
注意 partitionBy 是返回一个新的 RDD 不是将原 RDD 数据进行修改。
其他一些算子也会利用到以后的分区信息 如 sortByKey 和 groupByKey,另一方面 诸如 map 等操作会导致新的 RDD 失去父 RDD 的分区 信息。

在 Spark glom 算子 partitioner 算子可以查看 RDD 的分区情况

能从分区中获益的操作 cogroup、groupWith、 join、 leftOuterJoin、 rightOuterJoin、groupByKey、reduceByKey、combineByKey以及lookup

影响分区方式的操作

Spark 内部知道各种操作如何影响分区方式,并将会对数据进行分区的操作的结果RDD 自动设置为对应的分区器。比如 Join 连接两个 RDD; 由于键相同的元素会被哈希到同一台机器上, Spark 知道输出结果也是 Hash 分区的,这样对连接结果进行诸如 reduceByKey 这样操作时就会明显变快。

不过转换算子就不一定了 比如 map 理论上 map 是可以修改key 的。因此结果就不会有固有分区。不过Spark 提供的 mapValues和flatMapValues。它们可以保证二元组的键保持不变。

这里列出会为生成结果 RDD 设好分区方式的操作 cogroup、 groupWith 、join 、leftOuterJoin、 rightOuterJoin、 groupByKey、 reduceByKey、 combineByKey 、partitionBy、 sort、 mapValues(如果父RDD有分区)、 flatMapValues(如果父RDD有分区)、 filter(如果父RDD有分区)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342