本案例旨在综合使用Spark Core 和Spark Sql完成业务需求,具有一定的参考价值。
案例需求
- 筛选出符合查询条件的数据
- 统计出每天搜索uv排名前3的搜索词
- 按照每天的top3搜索词的uv搜索总次数,倒序排序
- 将统计结果输出
案例数据
日期 | 搜索词 | 用户 | 城市 | 平台 | 版本 |
---|---|---|---|---|---|
2017-05-17 | Hadoop | user1 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user2 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user2 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user3 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user4 | 北京 | Android | 1.2 |
2017-05-17 | Scala | user1 | 天津 | Android | 1.2 |
2017-05-17 | Hadoop | user3 | 天津 | ISO | 1.2 |
2017-05-17 | Scala | user4 | 天津 | ISO | 1.2 |
2017-05-17 | Scala | user6 | 南京 | Android | 1.2 |
2017-05-18 | Scala | user1 | 天津 | Android | 1.2 |
2017-05-18 | Scala | user3 | 天津 | ISO | 1.2 |
2017-05-18 | Scala | user4 | 天津 | ISO | 1.2 |
2017-05-18 | Scala | user6 | 南京 | Android | 1.2 |
2017-05-18 | Spark | user7 | 天津 | Android | 1.2 |
2017-05-18 | Spark | user9 | 天津 | ISO | 1.2 |
2017-05-18 | Spark | user4 | 天津 | ISO | 1.2 |
2017-05-18 | Spark | user6 | 南京 | Android | 1.2 |
2017-05-18 | Spark | user6 | Android | 1.2 | |
2017-05-18 | Hadoop | user1 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user2 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user5 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user3 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user4 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user1 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user2 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user5 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user3 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user4 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user6 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user7 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user1 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user2 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user5 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user3 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user4 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user2 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user5 | 北京 | Android | 1.2 |
2017-05-18 | Mongodb | user3 | 北京 | Android | 1.2 |
2017-05-18 | redis | user4 | 北京 | Android | 1.2 |
原始数据存储在hdfs中,数据项之间使用\t进行分割,部分数据项可能会有缺失。
实现思路
- 读取hdfs上的原始数据并转换为RDD
- 使用filter算子顾虑有效的数据
- 从系统日志中获取有用的数据项并封装成Row对象
- 将RDD<Row>转换为Dataset<Row>
- 按日期分组统计搜索词的搜索次数
- 使用窗口函数row_number获取每日top 3热词
- 将统计结果输出
示例代码
以本地环境为例,生成环境只需把master和文件路径变更一下即可。
Spark API 实现方式
//创建SparkSession对象
SparkSession session =SparkSession.builder()
.appName("DailyTop3Keyword")
.master("local[*]")
.getOrCreate();
//创建JavaSparkContext对象
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(session.sparkContext());
//过滤条件
List<String> list = Arrays.asList(new String[]{"北京","天津","南京"});
//使用广播变量进行性能优化
final Broadcast<List<String>> cities = jsc.broadcast(list);
//加载系统日志
JavaRDD<Row> rdd = jsc.textFile("D:/keywords.txt")
//过滤掉无效的日志信息
.filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(String line) throws Exception {
boolean flg = false;
for(String city : cities.value()){
if(line.contains(city)){
flg =true;
break;
}
}
return flg;
}
}).cache()
//从每行日志中获取有用信息并封装成Row对象
.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String line) throws Exception {
String[] values = line.split("\t");
return RowFactory.create(values[0],values[1],values[2]);
}
}
);
//将JavaRDD<Row>转换成Dataset<Row>
Dataset<Row> rows = session.createDataFrame(rdd, new StructType(new StructField[]{
DataTypes.createStructField("date", DataTypes.StringType, true),
DataTypes.createStructField("keyword", DataTypes.StringType, true),
DataTypes.createStructField("user", DataTypes.StringType, true)
}));
//按日期统计关键词的搜索次数
Dataset<Row> kv = rows
.select(new Column("date"),new Column("keyword"),new Column("user"))
.groupBy("date","keyword")
.agg(countDistinct("user").alias("kv"))
.orderBy(new Column("date").asc(),new Column("kv").desc());
//使用窗口函数row_number获取每日排名前三的搜索关键词
kv.select(new Column("date")
,new Column("keyword")
,new Column("kv")
,row_number().over(Window.partitionBy(new Column("date")).orderBy(new Column("kv").desc())).alias("rank"))
.where("rank <=3")
.show();
SQL脚本实现方式
最后两步也可以使用SQL脚本的方式进行实现。
//将倒数第三步的结果注册成一个临时表rows
rows.createOrReplaceTempView("rows");
//按日期统计关键词的搜索次数并将统计结果注册成临时表kv
session.sql("select date,keyword,count(distinct user) kv from rows group by date,keyword order by date asc,kv desc")
.createOrReplaceTempView("kv");
//使用窗口函数row_number获取每日排名前三的搜索关键词
session.sql("select * from (select date,keyword,kv,row_number() over(partition by date order by kv desc) rank from kv) tmp where rank <= 3")
.show();
示例数据统计结果
date | keyword | kv | rank |
---|---|---|---|
2017-05-18 | Hadoop | 6 | 1 |
2017-05-18 | kafka | 5 | 2 |
2017-05-18 | Scala | 4 | 3 |
2017-05-17 | Hadoop | 4 | 1 |
2017-05-17 | Scala | 3 | 2 |
示例中需要引入的class
import static org.apache.spark.sql.functions.countDistinct;
import static org.apache.spark.sql.functions.row_number;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;