向 Spark 传递函数
在 Python 中如果函数比较短可以用 lambda 方式,当然也可以用传递函数
如:
word = rdd.filter(lambda s : 'error' in s)
def containsError(s):
return 'error' in s
word = radd.filter(containsError)
在 Spark 中我们知道 'error' in s 这段代码是在 executor 中执行的,函数的定义是在 Driver 中。所以 Spark 会把 containsError 这个函数发送到 executor 中。所以如果 containsError 函数调用了其他变量 函数 类 等 都会经过序列化后发送到 executor 中。 lambada 也是一样的,lambda 不过就是不需要定义函数而已,可以理解为 lambda 是一个匿名函数。比如:
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def isMatch(self, s):
return self.query in s
def getMachesMemberReference(self, rdd):
return rdd.filter(lambda x : self.query in x)
def getMachesFunctionsReference(self, rdd):
return rdd.filter(self.isMatch)
上述代码在执行是会报错。原因在于 Spark 向 executor 发送函数是发现有self.query 含有 SearchFunctions 的引用字段。 就需要把SearchFunctions 类实例的对象序列化后全部发送到 executor 中去。在 Python 中 对象序列化会报错。所以上述代码会执行报错。
在 Java 或 Scala 中可以让类实现 Serializable 接口 就可以将对象序列化。当然不是所有对象都是可以序列化的。比如 数据库连接,网络通道等。所以在实际中尽量避免将对象序列化传到 executor 中。上述例子可以做如下修改
def getMachesMemberReference(self, rdd):
query = self.query
return rdd.filter(lambda x : query in x) # 该方式不会有类的引用字段 就不需要将对象序列化传递到 executor 中去
数据分区
在执行 RDD 的算子是会把数据分成多组发送到不同 executor 中执行 比如
rdd.filter(lambda x : len(x) > 0) #默认分区 并行执行
如果需要指定分区数用 repartition 算子
rdd.repartition(10).filter(lambda x : len(x) > 0) # 将数据分为10分区
repartition 算子他会把数据通过网络进行混淆,并创建出新的分区集合。对数据进行重新分区往往代价比较大。
分区并不是对所有的应用都有好处,比如给定的 RDD 只需要扫描一次,就完全没必要分区。
Spark 中所有的键值对 RDD 都可以进行分区。系统会根据一个针对键值对的函数对元素进行分区。尽管 Spark 没有给出显示控制每个键具体落在哪一个 executor 节点上的方法,但 Spark 可以确保同一组的键出现在同一个节点上。比如,可以使用哈希分区将一个 RDD 分成100分分区,此时键的哈希值对100取模的结果相同的记录会被放在一个节点上。
举例:有一张很大的用户信息表,由(UserId, UserInfo)组成的RDD。有另外一张表(UserId, LinkInfo)对组成的表。存放过去5分钟内某网站各个用户访问的情况。我们要统计用户访问情况,可以使用 Spark join 方法 关联 UserInfo 和 LinkInfo 的组合。简单代码如下:
Python
userDataRdd = #通过文件或其他方式加载数据 结构为(UserId, UserInfo)
def processNewLog():
eventsRdd = #通过文件或其他方式加载过去5分钟用户行为数据 结构为(UserId, LinkInfo)
userAndLinkInfoRdd = userDataRdd.join(eventsRdd)
# 可继续对userAndLinkInfoRdd 进行 map filter 等操作 获取需要的数据
上述代码能正确运行但效率不高。每次调用 processNewLog 是都会进行 join 操作。我们对数据集如何分区一无所知,默认情况下,连接操作会将两个数据集中的所有键的 Hash 值都求出来,将该 Hash 值相同的记录通过网络传同一台机器上,然后在那台机器上对所有键相同的记录进行连接。因为 UserInfo 表比 5 分钟一次的 eventsRdd 要大的多,所以会浪费时间做很多无用工作,在每次调用都会对 UserInfo 表进行 Hash 值计算和夸节点数据混洗,虽然这些数据从来不会变化。
要解决这一问题也很简单。在程序开始时对 UserInfo 做 partitionBy 操作
userDataRdd = userDataRdd.partitionBy(100)
processNewLog 方法保持不变。在 Java 或 Scala 中需要将 HashPartitioner 传入partitionBy方法 比如:
userDataRdd = userDataRdd.partitionBy(new HashPartitioner(100))
eventsRdd 数据仅使用一次 所以指定分区没有什么用处。由于 userDataRdd 调用了 partitionBy,Spark 就知道了该 RDD 是根据键的哈希值来分区的。这样在调用 join时,Spark 会利用到这一点。当调用 userDataRdd.jonin(eventsRdd) 时,Spark 只会对 eventsRdd 进行数据混洗操作,将 eventsRdd 中特定 UserId 的记录发送到 UserDataRdd 的对应分区所在的那台机器上,这样需要的网络传输数据就会大大减少,程序运行效率也显著提高了。
注意 partitionBy 是返回一个新的 RDD 不是将原 RDD 数据进行修改。
其他一些算子也会利用到以后的分区信息 如 sortByKey 和 groupByKey,另一方面 诸如 map 等操作会导致新的 RDD 失去父 RDD 的分区 信息。
在 Spark glom 算子 partitioner 算子可以查看 RDD 的分区情况
能从分区中获益的操作 cogroup、groupWith、 join、 leftOuterJoin、 rightOuterJoin、groupByKey、reduceByKey、combineByKey以及lookup
影响分区方式的操作
Spark 内部知道各种操作如何影响分区方式,并将会对数据进行分区的操作的结果RDD 自动设置为对应的分区器。比如 Join 连接两个 RDD; 由于键相同的元素会被哈希到同一台机器上, Spark 知道输出结果也是 Hash 分区的,这样对连接结果进行诸如 reduceByKey 这样操作时就会明显变快。
不过转换算子就不一定了 比如 map 理论上 map 是可以修改key 的。因此结果就不会有固有分区。不过Spark 提供的 mapValues和flatMapValues。它们可以保证二元组的键保持不变。
这里列出会为生成结果 RDD 设好分区方式的操作 cogroup、 groupWith 、join 、leftOuterJoin、 rightOuterJoin、 groupByKey、 reduceByKey、 combineByKey 、partitionBy、 sort、 mapValues(如果父RDD有分区)、 flatMapValues(如果父RDD有分区)、 filter(如果父RDD有分区)