1.序
我的工程是maven工程,通过maven不需要理会包的加载问题,很是方便。如果你还没有使用maven来管理工程的话那强烈建议你使用maven,尽管前期学习有点麻烦(主要是maven的默认下载镜像是国外)
2.搭建详情
下面是我建工程的截图
3.测试wordcount程序
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.pwsoft</groupId>
<artifactId>SparkStudy</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.4.Final</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
</project>
wordcount程序源码
object MyWordCount {
def main(args: Array[String]) {
//获取SparkContext
val spark = SparkSession
.builder
.appName("Spark Pi").master("local")
.getOrCreate()
var sc = spark.sparkContext
//读取文件,返回这个文件的行数
val count = sc.textFile("F:\\vmware\\share\\soft\\spark-1.6.0-bin-hadoop2.6\\README.md").count()
println(count)
// val lines = spark.sparkContext.textFile("F:\\vmware\\share\\soft\\spark-1.6.0-bin-hadoop2.6\\README.md")
// val wordcount = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_).collect()
// wordcount.foreach(pair => println(pair._1 + " : " + pair._2))
/**
* flatMap产生 MapPartitionsRDD
* map 产生 MapPartitionsRDD
* reduceByKey 产生 ShuffledRDD
* sortByKey 产生 ShuffledRDD
*/
spark.sparkContext.textFile("F:\\vmware\\share\\soft\\spark-1.6.0-bin-hadoop2.6\\README.md").flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey(_+_).map(pair => (pair._2, pair._1)).sortByKey(false).collect()
.map(pair => (pair._2, pair._1)).foreach(pair => println(pair._1 + " : " + pair._2))
//为了可以通过web控制台看到信息,加一个写循环不让程序结束
while (true) {}
spark.stop()
}
}