刚完成SparkR的支持,顺手记录下流程…
1 编译SparkR
1.1 生成SparkR的lib包
# where /home/username/R is where R is installed and /home/username/R/bin contains the files R and RScript
export R_HOME=/home/username/R
cd $SPARK_HOME/R
./install-dev.sh
1.2 编译Spark
./dev/make-distribution.sh --tgz -Psparkr \
-Pyarn -Phadoop-2.7 \
-Dhadoop.version=2.7.2-307 -Phive \
-Phive-thriftserver -Pnetlib-lgpl \
-Pspark-ganglia-lgpl -DskipTests \
-Denforcer.skip=true
1.3 local模式运行SparkR
./bin/sparkR --master local[2]
测试用例参见官方文档:R on Spark
2 Yarn模式运行SparkR
依据SparkR的执行原理, 其需要在每个Executor(JVM)中启动一个R进程,因此,若集群节点未安装R环境的需要通过参数指定的方式自定义R环境。其过程如下:
2.1 创建R的可执行程序包
我们的Linux环境为Centos, R官方仅提供了yum安装方式,未提供Binary的完整程序包,因此需要自己制作,此处我们选择使用Conda进行制作(Conda 是一个开源的软件包管理系统和环境管理系统,用于安装多个版本的软件包及其依赖关系,并在它们之间轻松切换)。
2.1.1 Conda 安装
Anaconda 官方网站下载适合节点环境版本的Conda,下载文件为.sh后缀的脚本文件。
//安装
bash Anaconda2-5.0.1-Linux-x86_64.sh
//默认安装位置为用户根目录下
由于官网的镜像在境外,访问速度太慢,为了能够加快访问的速度,需要设置清华镜像清华大学开源软件镜像站-anaconda。
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
conda config --set show_channel_urls yes
2.1.2 使用Conda创建R Essentials
安装R Essentials至当前环境
conda install -c r r-essentials
创建R环境
conda create -p /home/chenchengchen/r_env --copy -y -q r-essentials -c r
命令执行完成时将在"/home/chenchengchen/r_env"目录生成执行R必要的binary及lib.
值得注意的是:Conda生成的R脚本中使用了目录的绝对路径,需要对其进行修改,否则,在Yarn container中将发生找不到文件的错误。
cd /home/chenchengchen/r_env/bin/
# 将R文件中的"/home/chenchengchen"替换为".",替换的路径决定下面的打包路径及spark环境配制,此处一定注意;
# 本例采用sed进行替换(vim命令也可),# sed -i 's/原字符串/新字符串/' filepath; 因其语法原因,若路径中存在"/"需要使用"\"转义
sed -i "s/\/home\/chenchengchen/./g" R
将Conda提取的R环境打包成zip,即可交由Spark使用.
# 进入r_env目录
cd /home/chenchengchen/r_env
# 依据上述sed的重命名的路径,将r_env中文件打包到zip的根目录(解压开没有目录结构,直接为r_env中的众文件)
zip -r -p r_env.zip *
2.2 Yarn模式提交SparkR(shell 脚本,spark-submit类似)
sparkR --master yarn \
--deploy-mode client \
--queue your.queue.name \
--archives ~/r_env.zip#r_env \ #
--conf spark.executorEnv.RHOME=./r_env \ ## R环境配制与打包路径有关
--conf spark.r.command=./r_env/bin/Rscript \
提交成功后,执行如下代码进行测试:
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
测试用例参见官方文档:R on Spark
2.2.1 Yarn Cluster模式
上述举例为Yarn Client模式,若要使用Yarn Cluster模式,还需指定"spark.yarn.appMasterEnv.RHOME=./r_env";
3 第三方 R依赖包安装
在SparkR控制台,以命令行方式安装第三方R依赖包,安装方式有如下两种:
(1)网络安装(以mvtnorm为例)
install.packages(“mvtnorm”, repos=“http://cran.us.r-project.org”)
(2)本地安装
# 首先下载mvtnorm.tar.gz至本地
path <- /home/chenchengchen/mvtnorm.tar.gz
#安装依赖包
install.packages(path, repos=NULL, type="source")
执行安装命令后,第三方依赖包会被安装至$SPARK_HOME/R/lib目录下。
## 目录结构
mvtnorm SparkR sparkr.zip
虽然已安装至lib目录下,但真正上传集群的sparkr.zip中并无mvtnorm库,需要删除sparkr.zip重新打包
# 删除原sparkr.zip
rm sparkr.zip
# 重新打包sparkr.zip文件
# 要在R/lib目录下操作,否则会现打不到文件异常
zip -r sparkr.zip *
重新启动sparkR 即可正常运行
3.1 测试 demo
df <- createDataFrame(list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), c("a", "b", "c"))
showDF(df)
+---+---+---+
| a| b| c|
+---+---+---+
| 1|1.0| 1|
| 2|2.0| 2|
| 3|3.0| 3|
+---+---+---+
schema <- structType(structField("a", "integer"), structField("b", "double"), structField("c", "string"), structField("d", "double"))
df1 <- dapply(df, function(x) {
library(mvtnorm)
x <- cbind(x, x$a * rmvnorm(n=1, mean=c(1))) }, schema)
showDF(df1)
+---+---+---+------------------+
| a| b| c| d|
+---+---+---+------------------+
| 1|1.0| 1|0.8024247855452726|
| 2|2.0| 2|1.6048495710905453|
| 3|3.0| 3| 2.407274356635818|
+---+---+---+------------------+