Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。
1 编写WordCount程序
1)创建一个Maven项目WordCount并导入依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2)编写代码
package com.jackyan.spark
import org.apache.spark.{SparkConf, SparkContext}
object WordCount{
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("wordcount")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.使用sc创建RDD并执行相应的transformation和action
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).saveAsTextFile(args(1))
//4.关闭连接
sc.stop()
}
}
3)打包项目
4)将项目包上传到集群
5)运行程序
bin/spark-submit \
--class com.jackyan.spark.WordCount \
--master yarn \
--deploy-mode client \
./WordCount-jar-with-dependencies.jar \
input
output
注意:input目录是hdfs的目录,将需要统计的文件上传到input目录
6)查看运行结果
[hadoop@hadoop101 spark]$ hdfs dfs -ls
Found 3 items
drwxr-xr-x - hadoop supergroup 0 2022-01-08 10:26 .sparkStaging
drwxr-xr-x - hadoop supergroup 0 2022-01-08 10:25 input
drwxr-xr-x - hadoop supergroup 0 2022-01-08 10:26 output
[hadoop@hadoop101 spark]$ hdfs dfs -cat output/*
(scala,1)
(world,1)
(hello,5)
(jack,1)
(java,1)
(spark,1)
2 本地调试
本地Spark程序调试需要使用local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。如下:
创建SparkConf的时候设置额外属性,表明本地执行:
val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
如果本机操作系统是windows,如果在程序中使用了hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常:
出现这个问题的原因,并不是程序的错误,而是用到了hadoop相关的服务,解决办法是将附加里面的hadoop-common-bin-3.2.2-x64.zip解压到任意目录;
在IDEA中配置Run Configuration,添加HADOOP_HOME变量,配置上面解压的目录