运行:
运行spark版的python:pyspark
Scala版:spark-shell
RDD:
数据集会被自动整理成RDD在集群上并行进行
RDD可以有执行一系列的动作(actions),这些动作可以返回值(values),转换(transformations),或者指向新的RDD的指针。
常见行动操作:(会引起实际的计算)
.count()//相当于RDD的len()
.take(n)//取RDD中n个元素
.collect()输出全部数据
.saveAsTextFile()输出到文件
.reduce()对RDD应用一个reduce函数【reduce函数,对所有元素逐一应用,每次的输入是新的一个跟上一次reduce的返回值lambda x,y: x+y】
常见转化操作:(惰性求值所以不会计算)
RDD1.union(RDD2)//连接RDD
spark基本结构:
每一个spark都用一个驱动器程序来发起并行(spark shellsc)
driver通过SparkContext连接集群(sc)
sc的各种方法都是可以自动进行并行计算的
每一个计算图称为一个RDD
专门用来做键值对操作的RDD叫pair RDD
有对应的一套函数用来根据key进行操作(类似SQL,reduceByKey,groupByKey,join,排序)
PageRank例子:
1.每个页面向相邻页面发送(自己的排序值/邻居数量)
2.每个页面把排序值更新为:0.15+0.85*邻居发过来的值之和
python接口:
在python中import并初始化RDD对象,写好后编写sbt(推荐)或者Maven构建文件注明构建的方式,最后用spark-submit程序运行就可以
特别地,一些比较大的包(比如SQL)pyspark里面不会内建,而是要import一下
要看有什么能用的函数,只要sc.然后按Tab
SQL:
使用
context = SQLContext(sc)//一切sql类都是SQLContext的子类
results = context.sql(“SELECT * FROM people”)
发送一条SQL,返回一个RDD,每个元素一行
查询:
df.select($"colA", $"colB")
数据格式:
1.DataFrame
// 创建含有单列数据的 DataFrame
val df = sqlContext.range(0, 16)
2.Dataset:
更新版本的DataFrame,有的时候会重载旧的DataFrame
因此当我们从1.6.x迁移到2.0.0的时候, 无需任何修改就直接用上了DataSet.
能在RDD的基础上扩充功能,要变成Dataset:
SQLContext.toDS()
能在RDD的基础上扩充功能,要变成Dataset:
SQLContext.toDS()
数据倾斜:
每台机器数据量不一样,每个元素处理的数据量不一样...之类
Steaming:
用于把数据分batch
创建Steaming:
1.创建 StreamContext 对象
需要两个参数,一个是 SparkConf 对象(sc),一个是分块间隔时间。val ssc = new StreamingContext(sc, Seconds(2))
2.创建DStream对象(数据流对象)
val lines = ssc.socketTextStream("localhost", 9999)
在 DStream 中,每一条记录都是一行文本
3.对流进行操作
就对DStream 操作就行,用map之类的,返回新的DStream
4.启动流
ssc.start()
DStream还支持滑动窗口之类的
Steaming通过checkpoint保证其24小时连续工作
MlLib:
跟sklearn里面的简洁度差不多:
1.生成LabelPoint(用来保存样本的数据和标签的数据结构)
posExamples=spamFeature.map(lambda features:LabeledPoint(1,features)) //把label设为1
trainingData=posExamples.union(negExamples) //因为有union的存在,所以把正负数据集并在一起比python还要简单
*也可以转化成其他的数据结构,比如稀疏矩阵(Vector.dense(ndarray)),Rating之类,但是这些都不包含数据处理,比如加法都是没有的
2.model=LogisticRegressionWithSGD.train(trainingData)
TFIDF:
HashingTF把每个单词所需要的向量长度S % n =hash值,默认是2的20次方,所以产生的向量长度不超过2的20次方
不能并行的时候:
对于不能并行的算法,比如去掉停用词之类的,不能直接写在pyspark上,需要用sc.map(这里写所需python代码)这样就可以在每个单节点上调用。
缓存RDD以重复使用:
调用MLlib前,写sc.cache()(一开始就写也没关系)
进一步支持的其他数据科学工具:
任何 TensorFlow 程序都可以轻松地移植到 TensorFlowOnSpark 上。通常,只需要修改十行以内的 Python 代码。
Scala基本语法:
定义一个新变量一定要用var
常量用val
有很强的隐式类型转换:比如如果char没有toint方法而string有,你硬是在char上调用tolin就会被转成stri