十余天没有学习Spark了,不是我在偷懒,而是前段时间一直在研究形态学算法提取波形的问题。现在算法差不多搞定啦,但是用Python开发的,有空有能力时再补上Spark的程序。还是先来记录一下分布式矩阵的用法吧~
一般采用分布式矩阵进行存储都在数据量非常大的情况下进行,处理速度和效率与其存储格式息息相关。MLlib提供了四种分布式矩阵存储形式,分别为:行矩阵,带有行索引的行矩阵,坐标矩阵和块矩阵,据说分块矩阵并不常用。
行矩阵
行矩阵以行作为基本的矩阵存储格式,每一行的内容都可以单独取出来进行操作,列的作用相较小。
带索引的行矩阵
为了方便在系统调试的过程中对行矩阵的内容进行观察和显示,MLlib提供了带索引的行矩阵。
坐标矩阵
坐标矩阵是一种带有坐标标记的矩阵,其中的每一个具体数据都有一组坐标进行标示,类型格式如下:
(x: Long, y: Long, values: Double)
分块矩阵
顾名思义,就是将矩阵分块(好废话哦)。分块矩阵可由带索引的行矩阵IndexedRowMatrix或坐标矩阵CoordinateMatrix调用toBlockMatrix()方法来进行转换。
例程
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.{Matrix, Matrices, Vectors, Vector}
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, IndexedRowMatrix, IndexedRow, CoordinateMatrix, MatrixEntry}
object RDDMatrix {
def main(args: Array[String]): Unit = {
println("--------------------------本地矩阵-------------------------------")
val mx = Matrices.dense(2, 3, Array(1, 2, 3, 4, 5, 6)) // 创建2行3列的本地矩阵,Matrices.dense是矩阵重组的调用方法
println(mx)
println("--------------------------分布式行矩阵------------------------------")
val conf = new SparkConf().setAppName("Distributed matrix").setMaster("local")
val sc = new SparkContext(conf)
val path = "F:/ScalaProject/test/collaborativeFilter/src/main/resources/Kmeans.txt"
val rdd = sc.textFile(path).map(_.split(" ").map(_.toDouble)) // 转化成Double类型的向量存储
val rdd1 = rdd.map(line => Vectors.dense(line)) // 转换成向量存储
val rm = new RowMatrix(rdd1) // 读入行矩阵
// 如果打印rm中的具体内容,结果显示是数据的内存地址。这表明RowMatrix只是一个转化操作,并不运行最终结果。
println(rm.numRows()) // 打印行数
println(rm.numCols()) // 打印列数
println("--------------------------带索引的行矩阵---------------------------")
val rdd2 = rdd1.map(vd => new IndexedRow(vd.size, vd)) // 转化格式
val irm = new IndexedRowMatrix(rdd2) // 建立索引行矩阵实例
println(irm.getClass) // 打印类型
irm.rows.foreach(println)// 打印内容数据
println("---------------------------坐标矩阵--------------------------------")
val rdd3 = rdd.map(vue => (vue(0).toLong, vue(1).toLong, vue(2))). // 转化成坐标格式
map(vue2 => new MatrixEntry(vue2._1, vue2._2, vue2._3)) // 转化成坐标矩阵格式
// vue(0)和vue(1)分别是行和列坐标的坐标轴标号,vue(2)是具体内容
// ._1 和 ._2 是scala语句中元组参数的序数专用标号,分别是传入第二个和第三个值
val crm = new CoordinateMatrix(rdd3) // 直接打印CoordinateMatrix实例的对象也仅仅是内存地址
crm.entries.foreach(println)
println("--------------------------分块矩阵---------------------------")
// 将坐标矩阵转换成2x2的分块矩阵并存储,尺寸通过参数传入
val matA = irm.toBlockMatrix(2,2).cache()
// 查看其分块情况
matA.blocks.collect.foreach(println)
println(matA.numColBlocks)
println(matA.numRowBlocks)
}
}
原始数据:
1 2 2
1 1 1
1 3 3
2 2 2
3 4 5
4 3 3
2 2 2
4 4 1
运行结果:
--------------------------本地矩阵-------------------------------
1.0 3.0 5.0
2.0 4.0 6.0
--------------------------分布式行矩阵------------------------------
8
3
--------------------------带索引的行矩阵---------------------------
class org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
IndexedRow(3,[1.0,2.0,2.0])
IndexedRow(3,[1.0,1.0,1.0])
IndexedRow(3,[1.0,3.0,3.0])
IndexedRow(3,[2.0,2.0,2.0])
IndexedRow(3,[3.0,4.0,5.0])
IndexedRow(3,[4.0,3.0,3.0])
IndexedRow(3,[2.0,2.0,2.0])
IndexedRow(3,[4.0,4.0,1.0])
---------------------------坐标矩阵--------------------------------
MatrixEntry(1,2,2.0)
MatrixEntry(1,1,1.0)
MatrixEntry(1,3,3.0)
MatrixEntry(2,2,2.0)
MatrixEntry(3,4,5.0)
MatrixEntry(4,3,3.0)
MatrixEntry(2,2,2.0)
MatrixEntry(4,4,1.0)
--------------------------分块矩阵---------------------------
((1,1),2 x 1 CSCMatrix
(1,0) 19.0)
((1,0),2 x 2 CSCMatrix
(1,0) 18.0
(1,1) 21.0)
2
2
至于分块矩阵为什么会输出这么奇怪的结果,还没有研究明白,等搞明白再补上吧!