RDD 操作二 常用的 Transformations 与 Actions 方法
原文地址: http://spark.apache.org/docs/latest/programming-guide.html#transformations
仅限交流使用,转载请注明出处。如有错误,欢迎指出!
Henvealf/译
Transformations 转换
这里只有 Scala 的API,Java 与 Python 请看官方API。
Transformation | 啥意思啊 |
---|---|
map(func) | 返回每个元素经过 func 方法处理后,所生成的新元素所组成的分布式数据集合(RDD)。 |
filter(func) | 返回一个通过了方法 func 筛选的元素所组成的数据集合;func 返回 true,即代表通过筛选。 |
flatMap(func) | 与 map 相似,不过每一个输入项能够被映射为0个或多个输出项(所以 func 应该返回一个Seq(队列) 而不是一个单项)。 |
mapPartitions(func) | 与 map 相似,但是他是单独运行在 RDD 的每一个分区(块)上,所以假如 RDD 是类型 T 时,则 func 的类型必须为 Iterator<T> => Iterator<U> 。 |
mapPartitionsWithIndex(func) | 与 mapPartitions 相似,但是会另外提供一个整数值代表分区的下标(index), 所以假如 RDD 是类型 T 时,func 的类型必须为 (Int, Iterator<T>) => Iterator<U>. |
sample(withReplacement, fraction, seed) | 第一参数是指明是否放回,第二个参数是抽取数量,第三个是随机种子。 |
union(otherDataset) | 返回一个包含了源数据集和参数的新数据集 |
intersection(otherDataset) | 返回一个元数据集与参数的相交之后的新的 RDD(就是交集)。 |
distinct([numTasks]) | 对数据集进行去重操作 |
groupByKey([numTasks]) | 当在一个(K,V)数据集上调用他,会返回一个 (K, Iterator<V>) 数据集。 注意 :当你分组他们是为了在每个Key上聚合他们(比如求和或者平均值),使用 reduceByKey 或者 aggregateByKey 将会收益到更高的性能。注意 :输出的并发水平依赖于父 RDD 分区的个数。你能通过可选的 numTasks 参数设置不同的 task 数目。 |
reduceByKey(func, [numTasks]) | 当在一个(K,V)数据集上调用他时,将会返回一个在不同的key上进行了聚合了Value的新的 (K,V) 数据集,聚合的方式是使用 func 方法指定,且必须是 (V,V) => V 类型的。 与 groupByKey 很像,reduce task 的个数是使用第二个可选参数指定的。 |
aggregateByKey(zeroValue)(seqOp, combOp, [NumTasks]) | 当在一个(K,V)类型的数据集上调用他时,他就会返回一个按照 key 来进行聚合后生成的 (K,U) 对,聚合的方式是通过提供一个合并方法和一个中立的“零”值来完成的。允许被聚合的 value 的类型与输入的 value 的类型不一样,这样可以禁止没必要的分配。和 groupByKey相似,reduce task 的个数是使用第二个可选参数指定的。 |
sortByKey([ascending],[numTasks]) | 在(K,V)类型上调用他以实现按 K 排序。返回排序后的键值对。使用第一参数(boolean)值来决定是否为升序。 |
join(otherDataset, [numTasks]) | 通过该方法可以按照 K 来将源数据集合与另一数据集合进行 join 操作。(K, V) 和 (K, W) join结果就是 (K,(V,W))。outer join 就直接使用 leftOuterJoin,rightOuterJoin,fullOuterJoin 。 |
cogroup(otherDataset,[numTasks]) | 当在 (K, V) 类型与 (K, W)类型调用该方法时,会返回一个元祖 (K, (Iterable<V>, Iterable<W>)) 。也可以使用 groupWith 来调用。 |
cartesian(otherDataset) | 当在 T 类型与 U类型 的数据集上调用他,就返回一个 (T,U) 类型的数据集(所有元素组成的键值对)。 |
pipe(command,[envVars]) | 通过 shell 命令将 RDD 的每个分区进行导流, RDD 元素就可以写入到进程的 stdin(标准输入) ,也可以按照字符串的形式将其来按行输出的 stdout(标准输入) |
repatition(numPartitions) | 通过重新修改 partition 的个数来对 RDD 中的数据重新进行洗牌,以平衡分区。他总是对网络上的所有数据进行重洗。 |
Actions 动作
Action | 用法 |
---|---|
reduce(func) | 使用一个 func 来聚合一个数据集,方法应该为两个参数,一个返回值。这个方法必须能够翻转与连接,以至于能够在并发运行时的计算是正确的。 |
collect() | 以数组的形式返回在驱动器上的数据集的所有元素。当一个过滤器或者其他操作返回了一个小的子集时,使用这个方法会变得非常高效。 |
first() | 返回数据集行的第一个元素。 |
take(n) | 以数组的形式返回数据集上的前 n 个元素。 |
takeSample(withReplacement, num, [seed]) | 返回抽样后的数据组成的数组,第一个参数是时候放回取样,第二个就是取样的个数,第三个可选的参数是取样种子。 |
takeOrdered(n, [ordering]) | 返回 RDD 排序后的前 n 个元素。排序方式要么就使用原生的排序方式,要么使用自定义的比较器排序,使用第二个可选参数传入。 |
saveAsTextFile(path) | 将数据集中的元素写成一个文本文件,或者多个文本问价,参数就是文件路径,可以写在本地文件系统,HDFS,或者其他 Hadoop 支持的文件系统中。Spark 会在每一个元素上调用 toString 方法,转成文本文件中的一行文本。 |
saveAsSequenceFile(path) --Java and Scala | 就是将 RDD 中的元素写成 Hadoop SequenceFile到本地文件系统,HDFS,或者其他 Hadoop 支持的文件系统。 且 RDDs 中可用的的键值对必须实现 Hadoop 的 Writable 接口。在 Scala 中,也有许多可用的能够隐含的转型为 Writable 的类型,例如 Int, Double, String。 |
savaAsObjectFile(path) --Java and Scala | 使用 Java 简单的序列化方式,将 RDD 中的元素进行序列化。 可以使用 SparkContext.objectFile() 方法来加载。 |
countByKey() | 按照 数据集中的 Key 进行分组,计算各个 K 的对应的个数。(K,V)返回 (K,Int) |
foreach(func) | 在数据集上的每个元素上运行 func 方法。上面提到了。 |
Spark RDD API 也提供了一些 action 的异步版本,比如 foreach 对应的 foreachAsync 。