借助于spark的分布式特性,机器学习与spark的结合可以解决数据规模大、复杂运算时间久的问题。
spark提供MLlib组件用于满足机器学习的需求。
本文将从机器学习数据读取、数据操作、特征处理、模型训练、结果评估、模型保存六个方面展开。
一、基础操作
1、sparksession
(1) 创建SparkSession
SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能[1]。
任何Spark程序的第一步都是先创建SparkSession。
From pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing').getOrCreate()
(2) SparkSession与SparkContext[1]
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context(例如对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext)。
但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext
至于图中的RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
这些概念在编码过程中偶尔会出现,对于spark分布式运行的详细架构本文就不深入了,只需对SparkSession、SparkContext有个大致的了解。
2、数据加载
(1) 本地数据读取
使用spark方式读取本地csv。
df = spark.read.csv('XXX.csv',inferSchema=True,header=True)
使用pandas方式读取本地csv,转换pandas dataframe为spark dataframe。
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
pandas_df = pd.read_csv('/home/logsaas/pyspark/lalafile/movie_ratings_df.csv')
sc = SparkContext()
sqlContest = SQLContext(sc)
df = sqlContest.createDataFrame(pandas_df)
(2) 数据库读取
# hive数据库读取
spark.sql('select * from XX')
(3) Spark_DataFrame与Pandas_DataFrame区别[2]
Spark中DataFrame与Pandas中DataFrame的主要区别是工作方式的不同。
区别 | pandas | spark |
---|---|---|
工作方式 | 单机模式,没有并行机制,不支持Hadoop,处理大量数据有瓶颈 | 分布式并行计算框架,所有的数据和操作自动并行分布在各个集群结点上,支持Hadoop,能处理大量数据 |
DataFrame可变性 | 可变 | Spark中RDDs是不可变的,因此DataFrame也是不可变的 |
相互转换 | 从spark_df转换:pandas_df = spark_df.toPandas() | 从pandas_df转换:spark_df = SQLContext.createDataFrame(pandas_df) |
3、数据操作
由于spark dataframe和python dataframe的区别,随之而来的操作差别也比较大。
操作 | pandas | spark |
---|---|---|
取列信息 | df[“name”] | df.select(“name”) |
取满足条件的信息 | df[df[‘age’]>21] | df.filter(df[‘age’]>21) |
groupby | df.groupby(“A”).avg(“B”) | from pyspark.sql import functions df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() |
data合并 | append/concat等 | df.join() |
去重统计 | len(set(df['title'])) | df.select('title').distinct().count() |
新增一列 | df['newcol']=lists | df.withColumn('newcol',lists) |
其他 | ··· | ··· |
常见的spark dataframe操作如下:
# **基础描述**
spark_df.count() # 行数
spark_df.columns # 列名称
spark_df.printSchema() # 结构及属性显示
spark_df.show(5,False) # truncate=False表示不压缩显示
spark_df.describe().show() # 均值/最值等描述
# **dataframe操作**
# 取'age','mobile'两列
spark_df.select('age','mobile').show(5)
# 新增一列:age_after_10_yrs
spark_df.withColumn("age_after_10_yrs",(spark_df["age"]+10)).show(10,False)
# 新建一列age_double,将age转换为double属性
spark_df.withColumn('age_double',spark_df['age'].cast(DoubleType())).show(10,False)
# 筛选mobile==Vivo的信息
spark_df.filter(spark_df['mobile']=='Vivo').show()
spark_df.filter(spark_df['mobile']=='Vivo').select('age','ratings','mobile').show()
spark_df.filter(spark_df['mobile']=='Vivo').filter(spark_df['experience'] >10).show()
# 去重统计
spark_df.select('mobile').distinct().show()
# 行去重
spark_df=spark_df.dropDuplicates()
# 删除列
df_new=spark_df.drop('mobile')
# groupby操作
spark_df.groupBy('mobile').count().show(5,False)
spark_df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
spark_df.groupBy('mobile').agg({'experience':'sum'}).show(5,False) # 根据mobile分区,计算experience的sum
# udf 自建sql函数
from pyspark.sql.functions import udf
def price_range(brand):
if brand in ['Samsung','Apple']:
return 'High Price'
elif brand =='MI':
return 'Mid Price'
else:
return 'Low Price'
brand_udf=udf(price_range,StringType()) # create udf using python function # 输出为string格式
# 新建一列price_range
spark_df.withColumn('price_range',brand_udf(spark_df['mobile'])).show(10,False)
# 使用lamba创建udf
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType()) # using lambda function
# 新建一列age_group
spark_df.withColumn("age_group", age_udf(spark_df.age)).show(10,False)
在spark中很少会用for循环去处理一个个特征,一般使用函数/自建UDF,批量处理掉了。
比如计算Review列每个数据的长度。
python模式
review_length = []
for info in text_df['Review']:
···· review_length.apend(length(info))
text_df['length'] = review_length
pyspark模式
from pyspark.sql.functions import length text_df=text_df.withColumn('length',length(text_df['Review']))
3、特征处理
以下为pyspark.ml.feature提供的特征处理功能,满足了大部分机器学习的特征处理需求。
标准化与归一化
函数 | 备注 |
---|---|
NGram | 正则标准化,不需要fit 直接transform |
Normalizer | 归一化函数,使它的范数或者数值范围在一定的范围内 |
MaxAbsScaler | 归一化函数,将列标准化到[0,1]之间,每一个值都除以本列的绝对值最大的数,先fit然后 transform() |
MinMaxScaler | 最大最小归一化,先fit然后 transform() |
StandardScaler | 对列进行标准化,先fit再transform |
分箱处理
函数 | 备注 |
---|---|
Binarizer | 将数值型特征的二值化。将数据框中的某一列按照阈值划分为只包含0,1的列 |
Bucketizer | 将连续特征按照splits值进行分箱 |
QuantileDiscretizer | 将连续列进行分箱操作,numBuckets 表示分箱数目 |
文本特征处理
函数 | 备注 |
---|---|
CountVectorizer | 只考虑词汇在文本中出现的频率 |
HashingTF | 自然语言处理的场景中,hashingTF使用的比较多 |
IDF | TF-IDF 作为特征提取 |
StringIndexer | 针对单个类别型特征进行转换。把字符串的列按照出现频率进行排序,出现次数最高的对应的Index为0,依次下排 |
IndexToString | 索引转字符串 |
Tokenizer | 将字符串列转换成小写并按空格切分 |
RegexTokenizer | 基于正则的方式进行文档切分成单词组 |
Word2Vec | 将words转换成一个vectorSize维的向量 |
OneHotEncoderEstimator | 独热编码相关 |
RFormula | 文本类特征处理,先 StringIndexer 再 OneHotEncoderEstimator |
特征操作
函数 | 备注 |
---|---|
Imputer | 缺失值填补,默认使用均值或中值(“median”)填补,要计算均值所以要先 fit(),然后再transfrom() |
StopWordsRemover | 英文停用词移除 |
SQLTransformer | 使用SQL语句创建新的列,直接transform() |
BucketedRandomProjectionLSH | 基于欧几里德距离的空间度量 |
MinHashLSH | 基于Jaccard距离的空间度量 |
ElementwiseProduct | 计算inputCol与scaling内积,不需要训练,直接transform |
PolynomialExpansion | 特征变换,将特征拓展比如[x,y],如果degree=2,则拓展成[x,xy,y,xx,y*y],所以直接transform即可 |
OneHotEncoder | 独热编码,特征用一个二进制数字来表示。例如[0, 0, 3]处理后为[ 1., 0., 0., 1., 0., 0., 0., 0., 1.] |
ChiSqSelector | 依据卡方检验,计算类别特征与分类标签的关联性。该函数只有先训练才能知道挑选哪些特征值,所以要先fit,应用的时候再transform |
vector 组合
函数 | 备注 |
---|---|
VectorIndexer | 数据集中的类别特征转换。可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引 |
VectorSizeHint | 允许用户显式指定列的向量大小 |
VectorAssembler | 用于将多个列合并为一个向量列,直接transform即可,经常用的 |
VectorSlicer | 通过对这些索引的值进行筛选得到新的向量集 |
压缩降维
函数 | 备注 |
---|---|
PCA | 对特征进行PCA降维,先fit然后 transform() |
DCT | 离散余弦变换(Discrete Cosine Transform),用于将数据或图像的压缩 |
FeatureHasher | 特征哈希,相当于一种降维技巧 |
# 正则标准化,不需要fit 直接transform
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df)
# 归一化函数,使它的范数或者数值范围在一定的范围内
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).head().features
# 归一化函数,将列标准化到[0,1]之间
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()
# 最大最小归一化,先fit然后 transform()
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
model.transform(df)
# 对列进行标准化,先fit再transform
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
model.transform(df)
# 按阈值1划分values,结果输出到features,结果为0/1
binarizer = Binarizer(threshold=1.0, inputCol="values", outputCol="features")
new_df = binarizer.transform(df)
# 将连续特征按照splits值进行分箱
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")],inputCol="values", outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)
# 将连续列进行分箱操作,numBuckets 表示分箱数目
qds = QuantileDiscretizer(numBuckets=2,inputCol="values", outputCol="buckets", relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df)
# 只考虑词汇在文本中出现的频率
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df) # model.vocabulary
new_df = model.transform(df)
# 词频统计
hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
hashingTF.transform(df)
# 文本tf-idf计算
idf = IDF(minDocFreq=3, inputCol="tf", outputCol="idf")
model = idf.fit(df) # model.idf
model.transform(df)
# 针对单个类别型特征进行转换,把字符串的列按照出现频率进行排序
stringIndexer = StringIndexer(inputCol="label",outputCol="indexed", handleInvalid="error",stringOrderType="frequencyDesc")
model = stringIndexer.fit(stringIndDf)
td = model.transform(stringIndDf)
# 将字符串列转换成小写并按空格切分
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df)
# 基于正则的方式进行文档切分成单词组
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df)
# Word2Vec
word2Vec = Word2Vec(vectorSize=5, seed=42,inputCol="sentence", outputCol="model")
model = word2Vec.fit(doc)
model.transform(doc)
# OneHotEncoderEstimator
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df)
# RFormula 文本类特征处理
rf = RFormula(formula="y ~ x + s")
model = rf.fit(df)
model.transform(df)
# 缺失值填补,要计算均值所以要先 fit(),然后再transfrom()
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()
# 移除停顿词
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df)
# 使用SQL语句创建新的列
sqlTrans = SQLTransformer(statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df)
# 欧几里德距离的空间度量
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
model = brp.fit(df)
new_df = model.transform(df)
# 基于Jaccard距离的空间度量
mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
model = mh.fit(df)
model.transform(df)
# 计算inputCol与scaling内积,不需要训练,直接transform
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]), inputCol="values", outputCol="eprod")
new_df = ep.transform(df)
# PolynomialExpansion特征变换
px = PolynomialExpansion(degree=2, inputCol="dense", outputCol="expanded")
px.transform(df)
# 独热编码
encoder = OneHotEncoder(inputCol="indexed", outputCol="features")
encoder.transform(td)
# 依据卡方检验特征处理
selector = ChiSqSelector(numTopFeatures=1,outputCol="selectedFeatures")
model = selector.fit(df)
new_df = model.transform(df)
# 数据集中的类别特征转换
indexer = VectorIndexer(maxCategories=2, inputCol="a", outputCol="indexed")
model = indexer.fit(df)
model.transform(df)
# 允许用户显式指定列的向量大小
sizeHint = VectorSizeHint(inputCol="vector", size=3, handleInvalid="skip") # 先指定大小
vecAssembler = VectorAssembler(inputCols=["vector", "float"], outputCol="assembled") # 整合成一列
pipeline = Pipeline(stages=[sizeHint, vecAssembler])
pipelineModel = pipeline.fit(df) # 功能模块合并
pipelineModel.transform(df)
# 用于将多个列合并为一个向量列,直接transform即可,经常用的
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df)
# 通过对这些索引的值进行筛选得到新的向量集
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).head().sliced
# PCA
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df)
# 离散余弦变换压缩数据
dct = DCT(inverse=False, inputCol="vec", outputCol="resultVec")
df2 = dct.transform(df1)
# 特征哈希,相当于一种降维技巧
hasher = FeatureHasher(inputCols=cols, outputCol="features")
hasher.transform(df).head().features
4、模型训练
分类模块
model | 备注 |
---|---|
LinearSVC | 线性分类支持向量机 |
LogisticRegression | 逻辑回归 |
DecisionTreeClassifier | 决策树分类 |
GBTClassifier | GBDT梯度提升决策树 |
RandomForestClassifier | 随机森林 |
NaiveBayes | 朴素贝叶斯 |
MultilayerPerceptronClassifier | 多层感知机分类器 |
OneVsRest | 将多分类问题简化为二分类问题 |
# 线性分类支持向量机
svm = LinearSVC(maxIter=5, regParam=0.01) # maxIter最大迭代次数5次,regParam正则化参数
model = svm.fit(df)
# 逻辑回归
blor = LogisticRegression(regParam=0.01, weightCol="weight")
blorModel = blor.fit(bdf)
# 决策树分类
dt = DecisionTreeClassifier(maxDepth=2, labelCol="indexed") # 限定决策树的最大可能深度为2
model = dt.fit(td)
# GBDT梯度提升决策树
gbt = GBTClassifier(maxIter=5, maxDepth=2, labelCol="indexed", seed=42)
model = gbt.fit(td)
# 随机森林
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)
model = rf.fit(td)
# 朴素贝叶斯
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")
model = nb.fit(df)
# 多层感知机
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[2, 2, 2], blockSize=1, seed=123)
model = mlp.fit(df)
# OneVsRest
lr = LogisticRegression(regParam=0.01)
ovr = OneVsRest(classifier=lr)
model = ovr.fit(df)
聚类模块
model | 备注 |
---|---|
BisectingKMeans | 二分类KMeans |
KMeans | k均值聚类算法 |
GaussianMixture | 高斯混合模型 |
LDA | LDA主题聚类 |
PowerIterationClustering | 幂迭代聚类 |
# 二分类KMeans
bkm = BisectingKMeans(k=2, minDivisibleClusterSize=1.0)
model = bkm.fit(df)
transformed = model.transform(df)
# k均值聚类算法
kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(df)
transformed = model.transform(df)
# 高斯混合模型
gm = GaussianMixture(k=3, tol=0.0001,maxIter=10, seed=10)
model = gm.fit(df)
transformed = model.transform(df)
# LDA主题聚类
lda = LDA(k=2, seed=1, optimizer="em")
model = lda.fit(df)
# 幂迭代聚类
pic = PowerIterationClustering(k=2, maxIter=40,weightCol="weight")
assignments = pic.assignClusters(df)
回归模块
model | 备注 |
---|---|
AFTSurvivalRegression | 生存分析的对数线性模型 |
DecisionTreeRegressor | 决策树回归模型 |
GBTRegressor | 全称梯度下降树回归模型 |
IsotonicRegression | 保序回归 |
# 生存分析的对数线性模型
aftsr = AFTSurvivalRegression()
model = aftsr.fit(df)
# 决策树回归模型
dt = DecisionTreeRegressor(maxDepth=2, varianceCol="variance")
model = dt.fit(df)
# 全称梯度下降树回归模型
gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
model = gbt.fit(df)
# 保序回归
ir = IsotonicRegression()
model = ir.fit(df)
推荐模块
model | 备注 |
---|---|
ALS | (Alternatingleastsquares。交替最小二乘法 |
# 推荐系统
als = ALS(rank=10, maxIter=5, seed=0)
model = als.fit(df)
5、结果评估
model | 备注 |
---|---|
BinaryClassificationEvaluator | 二分类评估 |
RegressionEvaluator | 回归评估 |
MulticlassClassificationEvaluator | 多分类评估 |
ClusteringEvaluator | 聚类评估 |
evaluator = BinaryClassificationEvaluator(rawPredictionCol="raw")
evaluator.evaluate(dataset)
evaluator = RegressionEvaluator(predictionCol="raw")
evaluator.evaluate(dataset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)
evaluator = ClusteringEvaluator(predictionCol="prediction")
evaluator.evaluate(dataset)
6、模型保存
直接使用save保存模型,使用load加载训练结果。
# rf_classifier为RandomForestClassificationModel训练完的模型
rf_classifier.save("xxx/RF_model")
# 模型调用
rf=RandomForestClassificationModel.load("xxx/RF_model")
model_preditions=rf.transform(test_df)
二、运行案例
对上面的各个过程的方法进行组装,以随机森林代码为例:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler # 特征处理
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassificationModel
spark=SparkSession.builder.appName('random_forest').getOrCreate()
# 数据读入
pandas_df = pd.read_csv('xxx/affairs.csv')
sc = SparkContext() # 初始化;SparkContext则是客户端的核心
sqlContest = SQLContext(sc) # SQLContext是Spark SQL进行结构化数据处理的入口
df = sqlContest.createDataFrame(pandas_df) # pandas df 转换为 spark df 格式
# 数据属性
print((df.count(),len(df.columns)))
df.printSchema()
df.describe().select('summary','rate_marriage','age','yrs_married','children','religious').show()
df.groupBy('affairs').count().show()
df.groupBy('rate_marriage').count().show()
df.groupBy('rate_marriage','affairs').count().orderBy('rate_marriage','affairs','count',ascending=True).show()
df.groupBy('religious','affairs').count().orderBy('religious','affairs','count',ascending=True).show()
df.groupBy('children','affairs').count().orderBy('children','affairs','count',ascending=True).show()
df.groupBy('affairs').mean().show()
# 特征处理
df_assembler = VectorAssembler(inputCols=['rate_marriage', 'age', 'yrs_married', 'children', 'religious'], outputCol="features") # 把特征组装成一个list
df = df_assembler.transform(df)
df.printSchema()
df.show(5,truncate=False)
# 数据集划分
model_df=df.select(['features','affairs'])
train_df,test_df=model_df.randomSplit([0.75,0.25])
train_df.count()
train_df.groupBy('affairs').count().show()
test_df.groupBy('affairs').count().show()
# 模型构建
rf_classifier=RandomForestClassifier(labelCol='affairs',numTrees=50).fit(train_df)
rf_predictions=rf_classifier.transform(test_df)
rf_predictions.show()
# 结果查看
rf_classifier.featureImportances # 各个特征的权重
# 模型效果
rf_predictions.groupBy('prediction').count().show()
rf_predictions.select(['probability','affairs','prediction']).show(10,False)
# 多分类模型——准确率
rf_accuracy=MulticlassClassificationEvaluator(labelCol='affairs',metricName='accuracy').evaluate(rf_predictions)
print('The accuracy of RF on test data is {0:.0%}'.format(rf_accuracy))
print(rf_accuracy)
# 多分类模型——精确率
rf_precision=MulticlassClassificationEvaluator(labelCol='affairs',metricName='weightedPrecision').evaluate(rf_predictions)
print('The precision rate on test data is {0:.0%}'.format(rf_precision))
# AUC
rf_auc=BinaryClassificationEvaluator(labelCol='affairs').evaluate(rf_predictions)
print(rf_auc)
# 模型保存
rf_classifier.save("/home/logsaas/pyspark/lalafile/RF_model")
# 模型调用
rf=RandomForestClassificationModel.load("/home/logsaas/pyspark/lalafile/RF_model")
model_preditions=rf.transform(test_df)
model_preditions.show()
参考资料
[1] SparkSession与SparkContext:https://blog.csdn.net/qq_35495339/article/details/98119422
[2] Spark中DataFrame与Pandas中DataFrame的区别:https://blog.csdn.net/u013129944/article/details/80019546
[3] pyspark 官网解释:http://spark.apache.org/docs/latest/api/python/pyspark.ml.html
[4] pyspark.ml.feature函数中文简介:https://blog.csdn.net/yw_vine/article/details/80117759
[5] Spark特征工程:https://blog.csdn.net/u012050154/article/details/60766387
[6] http://dblab.xmu.edu.cn/blog/1709-2/ 子雨
[7] Machine Learning with PySpark by Pramod Singh
[8] https://github.com/Apress/machine-learning-with-pyspark