我们已经了解了在spark命令行当中使用交互方式完成词频统计,本节将阐述在idea当中使用maven环境完成java代码,并进行词频统计。
1 系统、软件以及前提约束
- CentOS 7 64 工作站 作者的机子ip是192.168.100.200,主机名为danji,请读者根据自己实际情况设置
- 已在linux中完成scala交互方式的词频统计
https://www.jianshu.com/p/92257e814e59 - 已经有待统计的文件word上传到HDFS,名字为/word
- idea 2018.2
2 操作
- 1 创建一个maven工程,修改pom.xml中的依赖
<repositories>
<repository>
<id>cloudera</id>
<name>cloudera</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.7.0</version>
</dependency>
</dependencies>
- 2 在src下创建一个SparkWordCount.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class SparkWordCount {
public static void main(String[] args) {
//在windows下执行,必须设置本地的hadoop安装路径,倘若打成jar包,上传到linux,则不需要设置
System.setProperty("hadoop.home.dir", "C:\\hadoop2.7.2");
SparkConf conf = new SparkConf();
//设置master
conf.setMaster("local[*]");
//设置应用的名称
conf.setAppName(SparkWordCount.class.getSimpleName());
//获取sparkcontext
JavaSparkContext sc = new JavaSparkContext(conf);
// 读hdfs中数据
JavaRDD<String> lines = sc.textFile("hdfs://192.168.100.200:9000/word");
// split
JavaRDD<String> words = lines.flatMap(t -> Arrays.asList(t.split(" ")).iterator());
// map
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> new Tuple2<>(word, 1));
// combine
JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey((a, b) -> a + b);
JavaPairRDD<Integer, String> beforeSwap = result.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> sorted = beforeSwap.sortByKey(false);
JavaPairRDD<String, Integer> finalRes = sorted.mapToPair(tp -> tp.swap());
finalRes.saveAsTextFile("hdfs://192.168.100.200:9000/outputjava");
sc.stop();
}
}
}
}
- 3 执行,在HDFS服务的/outputjava下查看结果。
以上就是在idea中使用maven+java完成spark下的词频统计。