之前阅读了Spark的ML API文档,也看了里面介绍的example,正好之前自己写过Logistic Regression的算法并预测了下Kaggle上的新手村任务之一:Titanic。所以这里也想用Spark MLlib自带的LR算法也实践下,一是对比下各自的预测结果,二是实践出真知,读API文档不实践乃憾事也。
第一步:特征工程
这个其实是解决一个实际大数据问题的重中之重,不过因为这个文档重在实践Spark,所以特征工程我还是沿用之前自己写的LR算法的时候的做法,简单的从Kaggle官网的训练集和测试集上生成能够喂给算法的训练集以及对应的测试集。
下面是对数据的一个展示,方便接下来代码展示时的理解:
数据集都是csv格式的,首先是训练集(Survived是标签):
SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled,Survived
1,0,1,0,0,0,1,0,1,0,0,1,-0.56136323207,-0.502445171436,0
1,0,0,1,1,0,0,1,0,1,0,0,0.613181832266,0.786845293588,1
0,0,1,0,0,0,1,1,0,0,0,1,-0.267726965986,-0.488854257585,1
1,0,0,1,0,0,1,1,0,1,0,0,0.392954632703,0.420730236069,1
0,0,1,0,0,0,1,0,1,0,0,1,0.392954632703,-0.486337421687,0
0,0,1,0,0,1,0,0,1,0,0,1,-0.427101530014,-0.478116428909,0
然后是测试集(没有标签Survived):
SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled
0,0,1,0,0,1,0,0,1,0,0,1,0.307534608854,-0.496637106488
1,0,1,0,0,0,1,1,0,0,0,1,1.25623006816,-0.511497104137
0,0,1,0,0,1,0,0,1,0,1,0,2.39466461933,-0.463334726327
0,0,1,0,0,0,1,0,1,0,0,1,-0.261682666729,-0.481703633213
1,1,1,0,0,0,1,1,0,0,0,1,-0.641160850452,-0.416740425935
第二步:用Spark MLlib自带LR建模并预测
直接上代码吧:
public class TitanicLogisticRegressionWithElasticNet {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaLogisticRegressionWithElasticNetExample")
.getOrCreate();
// $example on$
// Load training data
Dataset<Row> training = spark.read().format("csv").option("header", true).option("inferSchema", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
// System.out.println("\n------- Read csv data:");
// training.printSchema();
// training.show(5, false);
String origStr = "SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled";
String[] arrOrig = origStr.split(",");
VectorAssembler vectorAssem = new VectorAssembler()
.setInputCols(arrOrig).setOutputCol("features");
Dataset<Row> feaTrain = vectorAssem.transform(training);
// System.out.println("\n------- assembled out:");
// feaTrain.printSchema();
// feaTrain.show(5, false);
feaTrain = feaTrain.select("features", "Survived");
System.out.println("\n------- after selected:");
feaTrain.printSchema();
feaTrain.show(5, false);
LogisticRegression lr = new LogisticRegression()
.setLabelCol("Survived")
.setMaxIter(10000)
.setRegParam(0.0)
.setElasticNetParam(0.8);
// Fit the model
LogisticRegressionModel lrModel = lr.fit(feaTrain);
// Print the coefficients and intercept for logistic regression
System.out.println("\n+++++++++ Binomial logistic regression's Coefficients: "
+ lrModel.coefficients() + "\nBinomial Intercept: " + lrModel.intercept());
Dataset<Row> testData = spark.read().format("csv").option("header", true).option("inferSchema", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_test_data.csv");
Dataset<Row> feaTest = vectorAssem.transform(testData);
feaTest = feaTest.select("features");
Dataset<Row> result = lrModel.transform(feaTest);
// System.out.println("\n------- after predict:");
// result.printSchema();
// result.show(5, false);
//result = result.withColumn("PassengerId", result.col("prediction"));
result = result.withColumnRenamed("prediction", "Survived");
System.out.println("\n====== after add and rename:");
result.printSchema();
result.show(5, false);
result.select("Survived").write().mode("overwrite").option("header", true).csv("mllib_LR_TitanicResult");
spark.stop();
}
}
列举写代码时遇到的问题
虽然看起来是小问题但是第一次遇到解决起来可不少花时间。
-
问题1:读取csv格式文件
虽然之前阅读DataFrame的API文档的时候官网上有讲DataFrame可以从csv格式的文件中生成DataFrame,但是我在实际写代码中遇到了以下几个问题:
最开始代码是这么写的
Dataset<Row> training = spark.read().format("csv")
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
这个会导致一个问题,在DataFrame.show()的时候发现第一行的列名称也出现在数据当中,而且列的名称变为"_C0 ... _C14",通过google和阅读源码的注释发现这里需要增加一个配置:
Dataset<Row> training = spark.read().format("csv").option("header", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
不过问题还没有结束,再接下来想要使用VectorAssembler来产生features的列向量时遇到一个问题,刚才前面得到的training的schema每一列的数据类型都是string。这会引起VectorAssembler的一个异常(原因见API介绍:不接受string格式的列作为输入),于是还需要想办法把string转成int或者double。然后继续google和看源码注释,发现还需要增加一个配置:
Dataset<Row> training = spark.read().format("csv").option("header", true).option("inferSchema", true)
.load("/home/paul/share/mySparkJavaApiLearning/src/main/resources/kaggle/Titanic/gen_LR_train_data.csv");
至此,我们就得到了有列名称以及有具体非string类型的schema了。
-
问题2: 怎么产生LogisticRegression所需要的特征列向量
在用LogisticRegression训练产生LogisticRegressionModel时,只要简单调用fit()方法即可,不过如果直接对上面的training调用这个方法,会提示没有列向量(默认名为features)。一开始我的思路是查找LogisticRegression,看看是否有setXXXXXX这样的方法可以让LogisticRegression将多个列作为特征列向量来使用,最终我没有找到这样的设置方法(如果确实有,麻烦在评论区回复下,非常感谢!);然后再换个思路,怎么对training进行Transformation产生一个新的DataFrame来满足我们的需求。经过google终于找到了一个特征转换的方法:VectorAssembler,这个API可以实现我们上面的需求:
String origStr = "SibSp,Parch,Cabin_No,Cabin_Yes,Embarked_C,Embarked_Q,Embarked_S,Sex_female,Sex_male,Pclass_1,Pclass_2,Pclass_3,Age_scaled,Fare_scaled";
String[] arrOrig = origStr.split(",");
VectorAssembler vectorAssem = new VectorAssembler()
.setInputCols(arrOrig).setOutputCol("features");
Dataset<Row> feaTrain = vectorAssem.transform(training);
// System.out.println("\n------- assembled out:");
// feaTrain.printSchema();
// feaTrain.show(5, false);
feaTrain = feaTrain.select("features", "Survived");
这样我们就得到了可以用来fit LogisticRegression的训练集,来产生LogisticRegressionModel。然后同样的方式处理测试集,最后通过刚刚训练得到的模型来预测测试集的结果。
第三步:对比Spark的MLlib LogisticRegression结果
具体训练LogisticRegression时设置的参数为:
LogisticRegression lr = new LogisticRegression()
.setLabelCol("Survived")
.setMaxIter(10000)
.setRegParam(0.0)
.setElasticNetParam(0.8);
最终很巧合的是Spark预测出的结果与我之前自己手写的Java版的LogisticRegression的预测竟完全一致。最终得分都是:
虽然这个结果看起来很挫,但是这个一致的结果让我觉得我之前做的工作在正确性上还是可以的。而且我也知道这个得分比较挫的原因是特征工程太low了,只是目前还没有细化下去,毕竟特征工程是预测结果的上限,在这个之上所有不同的机器学习方法只是在用各自的努力去接近这个上限而已。
另外还需要注意的是,Spark的LogisticRegression在训练时明显比我自己写的代码计算速度快,我本以为虚拟机环境加上Spark的各种杂七杂八的流程处理肯定会不如我自己的代码运行的快的,但实际结果却Piapia打脸。。。
总结:
- 感谢google、感谢Stack Overflow、感谢Spark官网的Programming Guide以及example
- 强烈鄙视http://spark.apache.org/docs/latest/api/java/index.html, 如果我打开的方式没有错误的话,感觉它完全没啥用(如果是我打开的方式不对也麻烦在评论区帮我指正,先感谢了!)。
- 源码也很有帮助,就是scala的语法还要再学学,不然看起来不顺畅。
- 对于RDD或者DataFrame的Transformation玩的不够溜,这个确实是基础,也接下来需要找机会多学习和实践。