本文理论部份基于Andew Ng的公开课,工程实践来自 Spark ML。我是个愚钝的人,纯理论学不动。
Linear Regression
机器学习是一个归纳与演绎的过程,根据已有数据集,训练出模型(归纳),去预测未知的世界(演绎)。在公开课里以波士顿房价为第一个例子,我在赶集房产频道爬了部份小区数据,用于学习
总价(w) | 面积(m^2) | 卧室 |
---|---|---|
498 | 125 | 3 |
370 | 87 | 2 |
498 | 125 | 3 |
510 | 116 | 2 |
1185 | 350 | 5 |
上面只是部分数据,房价一般和地理位置,面积,朝向,是否学区相关,这些指标统称为 feature , 如果只考滤面积这个特征,我们画一下数据分布图
如果我们再把卧室这个 feature 考虑进去,那么新的散点图就是三维的
对于多维 N 个特征,虽然无法对应现实世界物理模型,但是可以对应矩阵,大家脑补
公式推导
对于只有一个特征的线性公式
h(x)=𝜃0 +𝜃1𝑥1
简书没有数学公式编辑,上面𝜃0中的0是小标,实际上应该是𝜃0𝑥0,但是我们默认第0个特征值为1,即 𝑥0=1,所以可以如此简写。
h(x)=𝜃0 +𝜃1𝑥1 +𝜃2𝑥2 =∑𝜃𝑖𝑥𝑖 =𝜃T𝑥
对于拥有多个特征的线性公式也很好理解,其中𝜃和𝑥均为列向量,𝜃T是𝜃的转置向量,T是𝜃的上标。函数 h(x) 可以表示为两个向量的点积(内积),我们最终就是要解出列向量𝜃,使得这条直线更合理。
这里明显有个离群点,[170,96 3] 这组数据异常,涉及到数据抽取和清洗,暂时不提。
如何衡量哪组𝜃更合理
我们定义一个函数,叫做 cost function , 或是 loss function . 这个函数来衡量某一特定𝜃时,所拟合的失真程度,这个程度越小越合理。
J(θ) = 1/2 ∑(h𝜃(𝑥(𝑖)) − 𝑦(𝑖))^2
其中 J 是向量𝜃的函数,h𝜃(𝑥(𝑖)) 为给定𝜃预测出的𝑦(𝑖)值。函数为预测值与对应真实值差的平方和,最后要乘以1/2, 方便求导时使系统为1.
初始默认 𝜃 是一个N维的零(列)向量,那么此时 J 肯定非常大,目的就是找到一组 𝜃 使这个损失函数 J 取最小值。所以我们需要每次改变𝜃 值,不断试错。
上图迭代了四次,终于找到最优解。
梯度下降
公开课使用梯度下降法求近似解,求 θ 最终变成了求 J(θ) 极小值。梯度方向由 J(θ) 对 θ 的偏导数确定,由于极小值,所以为偏导数的反方向。
θj :=θj −α ( J(θ))'
公式中 θj 是每次迭代前的值减去一个变量,α 称为学习速率,这个值不能过大,可能跳过极值,过小也会影响收敛速度。( J(θ))' 是 J(θ) 对 θ 的偏导数(没找到好的公式编辑器)。当我们只考滤一个样本的时候,对J(θ)链式求导
由上图可知导数结果是一个矢量与向量积。h𝜃(𝑥) − 𝑦 是估计值与真实值之差,𝑥j 是当前样本特征值,为列量向。同理推导出当我们有 m 个样本时的梯度为:
θj:=θj+α∑(y(i)−hθ(x(i)))x(i)
就是说我们每次迭代 θ,迭代值 α∑(y(i)−hθ(x(i)))x(i), 这块初次接触有点绕口,通过阅读源码加深理解。
Spark 训练模型
Python 也有很好的机器学习库,相比更通用适合学习。Spark mllib 也比较成熟,借助 RDD 可以实现大数据分布式计算,训练模型效率更高效。先来看看用 Spark 实现例子中的 Linear Regression
准备数据
数据格式为 Label, feature1,feature2,feature3.... 本文中只有2个特征,其中术语 Label 对应 price, feature1 对应面积,feature2 对应卧室数量。
498,125 3
670,141 3
600,137 3
650,150 3
打开 spark-shell 加载数据
scala> val data = sc.textFile("/Users/dzr/code/spark-mllib-data/house.data")
data: org.apache.spark.rdd.RDD[String] = /Users/dzr/code/spark-mllib-data/house.data MapPartitionsRDD[1] at textFile at <console>:27
加载数据文件,生成 RDD[String],生产环境数据一般从HDFS中获取,本机只是用来演示和训练模型。加载数据后,要把数据做特殊处理。
scala> import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.linalg.Vectors
scala> val trainSet = data.map {line=>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble,Vectors.dense(parts(1).split(' ').map(_.toDouble)))}
trainSet: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[3] at map at <console>:31
scala> trainSet.take(2).foreach(println)
(498.0,[125.0,3.0])
(670.0,[141.0,3.0])
Spark 线性回归术语
- SGD: Stochastic Gradient Descent 随机梯度下降
- LabeledPoint: Class that represents the features and labels of a data point, 线性回归属于监督学习,在给定观测特征对应的值叫做 label
- Weight: 权重是一个列向量,就是我们要求得的 𝜃
- Intercept: 简单的理解为 y 轴的截距,即当各特征均为零值时的观测默认值。这项的意义在于,他能捕捉到未观察到的误差(比如我们可能忽略了其它有影响的特征)
失败的建模
scala> import org.apache.spark.mllib.regression._
scala> val numIterations = 100 // 设置迭代次数
numIterations: Int = 100
scala> val stepSize = 1 // 每次迭代步长
stepSize: Int = 1
scala> val minBatchFraction = 1.0 // 样本参与迭代比例
minBatchFraction: Double = 1.0
scala> val model = LinearRegressionWithSGD.train(trainSet, numIterations,stepSize,minBatchFraction)
model: org.apache.spark.mllib.regression.LinearRegressionModel = org.apache.spark.mllib.regression.LinearRegressionModel: intercept = 0.0, numFeatures = 2
scala> model.weights
res48: org.apache.spark.mllib.linalg.Vector = [NaN,NaN]
scala> model.intercept
res49: Double = 0.0
很诡异的事情诞生了,model.weights 列向量居然是 NaN, 也就是说不是有效的 double 浮点数,model.intercept 是0.0也不很正常。为什么用 Matlab 就能拟合出结果,而 Spark 就失败了?
学习速率
咨询了(靓丽青春无敌的美少女学)同事,给两个建义:特征数据做归一化处理,调整学习速率。首先做的是归一化处理,将特征标准差标准化(均值0, 标准差1),还是得到 NaN 值。然后调整 stepSize ,从1, 0.1, 0.01, 0.001,0.0001开始测试,终于在0.001时得到拟合值。
scala> val model = LinearRegressionWithSGD.train(trainSet, 100,0.0001)
model: org.apache.spark.mllib.regression.LinearRegressionModel = org.apache.spark.mllib.regression.LinearRegressionModel: intercept = 0.0, numFeatures = 2
scala> model.weights //拟合
res227: org.apache.spark.mllib.linalg.Vector = [4.225755672693632,0.09458364813972062]
scala> model.predict(new DenseVector(Array(98,3))) // 进行预测,98平,3居的房子大概414万人民币
res228: Double = 414.4078068683951
这里还有个问题,在迭代次数不变,不断调小 stepSize 时,会取不到极小值。当数据已经做归一化处理时,学习速率可以稍大一些。最后预测房价函数为
h(x)= 4.22 * 𝑥1 + 0.09 * 𝑥2
可以直观的看到房价每平4.22W,并且和第二个特征rooms关系不大。
loss cost
拟合的效果好不好,最终要看损失值和均方根误差
scala> val prediction = model.predict(trainSet.map(_.features))
prediction: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[10732] at mapPartitions at GeneralizedLinearAlgorithm.scala:69
scala> val predictionAndLabel = prediction.zip(trainSet.map(_.label))
predictionAndLabel: org.apache.spark.rdd.RDD[(Double, Double)] = ZippedPartitionsRDD2[10734] at zip at <console>:45
scala> for(i<-0 to prediction.collect.length -1) {
| println(predictionAndLabel.collect()(i)._1 + "\t" + predictionAndLabel.collect()(i)._2)}
scala> val loss = predictionAndLabel.map {
| case (p, l) =>
| val err = p - l
| err * err }.reduce(_+_)
loss: Double = 13322.402139229554
scala> val rmse = math.sqrt(loss/prediction.collect.length)
rmse: Double = 28.85567766838698
预测值 真实值
528.5032100311231 498.0
596.1153007942212 670.0
579.2122781034467 600.0
634.1471018484639 650.0
295.99206438483367 320.0
579.2122781034467 550.0
可以看到 rmse 为28.8
小结
边学边动手实践还是蛮快的,终于看到了ML的冰山一角。还有分类算法什么的,慢慢看吧。