Spark Streaming之前项目中用过一段时间,最近正好闲下来做一下梳理。
一. 简介:
Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持多种数据源获取数据:
Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后,处理结构保存在HDFS、DataBase等各种地方。
二 流程梳理:
- 1 架构:
python脚本模拟生成日志 + flume + kafka + Spark Streaming
2.2 实现流程:
(1)使用Python脚本生成日志
(2)使用linux crontab定时任务运行脚本
(3)使用flume采集生成的日志
(4)采集到日志放入kafka中
(5)Spark Streaming从kafka中pull数据,进行微批处理
(6)将处理结果存入hbase中
注意:要分步测试,逐层排查问题。
三. 流程详解:
3.1 使用Python脚本编写日志生成器,模拟日志系统产生日志generate_log.py
#coding=UTF-8
import random
import time
url_paths = [
"class/112.html",
"class/128.html",
"class/145.html",
"class/146.html",
"class/131.html",
"class/130.html",
"learn/821",
"course/list"
]
ip_slices = [132,168,175,10,23,179,187,224,73,29,90,169,48,89,120,67,138,168,220,221,98]
http_referers = [
"http://www.baidu.com/s?wd={query}",
"https://www.sogou.com/web?query={query}",
"http://cn.bing.com/search?q={query}",
"https://search.yahoo.com/search?p={query}"
]
search_keyword = [
"Spark实战",
"Storm实战",
"Flink实战",
"Bean实战",
"Spark Streaming实战",
"Spark SQL实战"
]
status_codes = ["200","404","500"]
def sample_url():
return random.sample(url_paths,1)[0]
def sample_ip():
slice = random.sample(ip_slices,4)
return ".".join([str(item) for item in slice])
def sample_referer():
if random.uniform(0,1) > 0.2:
return "-"
refer_str = random.sample(http_referers,1)
query_str = random.sample(search_keyword,1)
return refer_str[0].format(query=query_str[0])
def sample_status_code():
return random.sample(status_codes,1)[0]
def generate_log(count = 10):
time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
f = open("/usr/local/hadoop/data/logs/access.log","w+")
while count >= 1:
query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(local_time=time_str,url= sample_url(),ip=sample_ip(),referer=sample_referer(),status_code=sample_status_code())
#print(query_log)
f.write(query_log + "\n")
count = count - 1
if __name__ == '__main__':
generate_log(100)
3.2 使用linux crontab定时任务运行脚本:
参考网站: http://tool.lu/crontab
每一分钟执行一次的crontab表达式: */1 * * * *
crontab相关linux命令:
service crond start ---查看crontab服务是否启动
crontab -u root -l ---查看root用户当前是否有自动执行计划
3.2.1. 编写shell脚本,调用执行generate_log.py
vi log_generator.sh
! /bin/sh
python /usr/local/hadoop/data/generate_log.py
3.2.2. 增加操作权限
chmod u+x log_generator.sh
3.2.3. 设置crontab定时任务
crontab -e
*/1 * * * * /usr/local/hadoop/data/project/log_generator.sh
3.2.4. 检查确认日志是否成功
3.3 使用flume采集日志生成器产生的日志
就像前面所说的,每一步都要分步进行测试,测试没有问题后,再进行对接使用。
3.3.1. 初期测试flume选型
access.log ==> 控制台输出
source.type --> exec
channel.type --> memory
sink.type -->logger
(1)配置文件 streaming_project.conf
agent.sources=r1
agent.channels=c1
agent.sinks=k1
agent.sources.r1.type=exec
agent.sources.r1.command=tail -F /usr/local/hadoop/data/logs/access.log
agent.sources.r1.shell=/bin/bash -c
agent.sources.r1.channels=c1
agent.sinks.k1.type=logger
agent.sinks.k1.channel = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
(2)启动flume:
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming_project.conf --name agent -Dflume.root.logger=INFO,console
(3)观察flume日志,检查是否能够采集日志。
2.3.2. flume对接kafka
初步调通flume之后,就可以开始使用flume对接kafka了。
(1)更新flume配置文件:streaming_project2.conf
agent.sources=r1
agent.channels=c1
agent.sinks=k1
agent.sources.r1.type=exec
agent.sources.r1.command=tail -F /usr/local/hadoop/data/logs/access.log
agent.sources.r1.shell=/bin/bash -c
agent.sources.r1.channels=c1
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = streamingtopic
agent.sinks.k1.brokerList = slave2:9092
agent.sinks.k1.requiredAcks = 1
agent.sinks.k1.batchSize = 20
agent.sinks.k1.channel = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
(2)启动zk(kafka需要)
zkServer.sh start $ZK_HOME/conf/zoo.cfg
(3)启动kafka server
kafka-server-start.sh $KAFKA_HOME/config/server.properties
(4)查看topic是否创建成功
kafka-topics.sh --list --zookeeper slave2:2181
(5)若未创建,则创建topic
kafka-topics.sh --create --zookeeper slave2:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
(6)查看topic是否创建成功或已存在
kafka-topics.sh --list --zookeeper slave2:2181
(7)启动flume
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming_project2.conf --name agent -Dflume.root.logger=INFO,console
(8)启动kafka consumer 测试是否接收成功
kafka-console-consumer.sh --zookeeper slave2:2181 --topic streamingtopic --from-beginning
如果能成功消费消息,则说明flume对接kafka成功,那下面就可以开始使用spark Streaming对接kafka进行实时处理了。
3.4 kafka对接spark Streaming
Spark Streaming对接kafka使用生产中最常用的方式:Direct Approach(直接方法)(No Receivers)
初步接收消息创建input DStream
代码如下:
package com.crn.spark.project
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ProjectStreaming {
def main(args: Array[String]): Unit = {
if(args.length != 2){
System.err.println("Usage ProjectStreaming: <brokers> <topics>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("ProjectStreaming").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val Array(brokers,topics) = args
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val topicSet = topics.split(",").toSet
val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
message.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}
代码中需要的参数,在启动时以参数的形式进入传入。
测试可以成功接收之后,就要正式进入项目核心部分的开发工作了。
3.5 项目开发:
3.5.1. 需求分析:
实战开发首先要关注就是需求,只有明确了需求,才能真正开始开发。
功能需求1: 统计今天到现在为止实战课程的访问量
这个可以做到每天实时的查看当天网站的访问量.
功能需求2: 今天到现在为止从搜索引擎引流过来的实现课程的访问量
这个可以为不同渠道做广告投资,提供一定的决策分析资料.
3.5.2. 技术实现分析:
【流程】: 数据清洗 > 统计分析 > 统计结果入库
1)数据清洗:
就是按照需求对实时产生的点击日志数据进行筛选过滤,只保留我们需要的内容字段。
下面观察日志信息,分析如何取数据
保留IP,访问时间,课程编号,状态码,搜索引擎地址。
2)统计分析:
为了更好的进行统计分析,我们需要定义一个实体类来封装我们需要保存的日志信息:
下面看一下需求功能1如何实现。功能1: 今天到现在为止,实战课程的访问量
(1) rowkey设计
因为是统计一天的实战课程访问量:故rowkey可以设计为 day_coureseid,比如20180808_118.
注意:rowkey有设计原则,但设计方法并不是一成不变的。RowKey的设计要强烈依靠业务,要考虑到如何设计,更要考虑到如何更好的查询。
因为是要统计每天的课程访问量,使用日期+课程id的方式,后期就可以直接以日期为前缀条件进行筛选查询。
依次类推,可以用来统计***电商每天不同电商品类的销售额情况,而在电商交易信息中,有包含商品品类的信息。同样设计rowkey时,就可以使用日期+商品品类的方式进行设计。
而查询时,就可以依靠日期前缀进行查询。
按照rowkey的设计规则,现在日志中的时期格式,还不符合我们的规范,所以需要我们进行一下转换:编写日期时间工具类DateUtils,将日志中的时间转换为自己想要的格式.
数据清洗结果类似如下: ClickLog(102.10.55.22,2017110812121201,128,-)
功能1的rowkey设计为:yyyyMMdd_courseid,示例:20180808_118.
rowkey设计好后,我们就可以使用数据库来进行存储我们的统计结果了。spark streaming把统计结果写入到数据库中,可视化前端根据:yyyyMMdd把数据库里面的统计结果展示出来。
(2)储存介质选择
选择什么数据库作为统计结果的储存呢?
a. 关系型数据库RDBMS: mysql,oracle
day course_id click_count
20171111 1 10
存储在关系型数据库,当一个批次的数据过来之后,需要先将之前的数据查出来进行加和,然后再执行更新操作。
b. 非常关系型数据库Nosql: Hbase,redis
使用hbase只需要调用一个API即可搞定更新操作.
通过对比,明显就可以看出来使用hbase存储更方便。那咱就确定选用hbase作为存储介质。
(3)实现流程
接下来我们需要做的就是:
1) 进行Hbase表设计
create 'course_clickcount', 'info'
2)编写操作hbase的数据访问层DAO代码
操作Hbase的工具类(HBaseUtils.java):
package com.crn.spark.project.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;
/**
* HBase操作工具类:Java工具类建议采用单例模式封装
*/
public class HBaseUtils {
HBaseAdmin admin = null;
Configuration configuration = null;
/**
* 私有构造方法
*/
private HBaseUtils(){
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum","master,slave2");
configuration.set("hbase.rootdir","hdfs://master:9000/hbase");
//ctrl+alt+T
try {
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
private static HBaseUtils instance = null;
public static synchronized HBaseUtils getInstance(){
if(null == instance){
instance = new HBaseUtils();
}
return instance;
}
/**
* 根据表名获取到HTable实例
* @param tableName
* @return
*/
public HTable getTable(String tableName){
HTable table = null;
try {
table = new HTable(configuration,tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
/**
* 添加一条记录到HBase表
* @param tableName 表名
* @param rowkey rowkey
* @param cf columnFamily
* @param column 列
* @param value 写入的值
*/
public void put(String tableName,String rowkey,String cf,String column,String value){
HTable table = getTable(tableName);
Put put = new Put(rowkey.getBytes());
put.add(cf.getBytes(),column.getBytes(),value.getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String tableName = "course_clickcount";
String rowkey = "20171111_188";
String cf = "info";
String column = "click_count";
String value ="2";
HBaseUtils.getInstance().put(tableName,rowkey,cf,column,value);
}
}
封装实战课程点击量的实体类(ClickCourseCount.scala):
package com.crn.spark.project.domain
case class ClickCourseCount(dayCourse:String,clickCount:Long)
保存操作hbase的DAO(ClickCourseCountDao.scala):
package com.crn.spark.project.dao
import com.crn.spark.project.domain.ClickCourseCount
import com.crn.spark.project.utils.HBaseUtils
import org.apache.hadoop.hbase.client.{Get, HTable}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
object ClickCourseCountDao {
val tableName = "course_clickcount"
val cf = "info"
val column = "clickcount"
def save(list:ListBuffer[ClickCourseCount]): Unit ={
val htable = HBaseUtils.getInstance().getTable(tableName)
for(clk <- list){
htable.incrementColumnValue(clk.dayCourse.getBytes,
cf.getBytes,
column.getBytes,
clk.clickCount)
}
}
def count(dayCourse:String):Long={
val htable = HBaseUtils.getInstance().getTable(tableName)
val get = new Get(dayCourse.getBytes)
val value = htable.get(get).getValue(cf.getBytes,column.getBytes())
if(null == value){
0L
}else{
Bytes.toLong(value)
}
}
def main(args: Array[String]): Unit = {
val listBuffer = new ListBuffer[ClickCourseCount]
listBuffer.append(ClickCourseCount("20171024_88",1L))
listBuffer.append(ClickCourseCount("20171024_88",2L))
listBuffer.append(ClickCourseCount("20171024_88",3L))
save(listBuffer)
println(count("20171024_88")+"---"+count("20171024_88"))
}
}
- 完善核心代码(ProjectStreaming.scala):
package com.crn.spark.project.spark
import com.crn.spark.project.dao.{ClickCourseCountDao, ClickCourseSearchCountDao}
import com.crn.spark.project.domain.{ClickCourceSearchCount, ClickCourseCount, ClickLog}
import com.crn.spark.project.utils.DateUtils
import com.crn.spark.project.utils.DateUtils.{formatDateToYYYYMMDDStr, getDateFromTimeStr}
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.status.api.v1.RDDPartitionInfo
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object ProjectStreaming {
def main(args: Array[String]): Unit = {
if(args.length != 2){
System.err.println("Usage ProjectStreaming: <brokers> <topics>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("ProjectStreaming").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val Array(brokers,topics) = args
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val topicSet = topics.split(",").toSet
val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
//message.map(_._2).count().print()
//132.168.89.224 2018-07-13 05:53:02 "GET /class/145.html HTTP/1.1" 200 https://search.yahoo.com/search?p=Flink实战
val cleanData = message.map(_._2).map{x =>
val strArr = x.split("\t")
strArr(1)
val ip = strArr(0)
val time = DateUtils.formatDateToYYYYMMDDStr(DateUtils.getDateFromTimeStr(strArr(1)))
val refer = strArr(2).split(" ")(1)
val status = strArr(3).toInt
val searchArr = strArr(4).replaceAll("//","/").split("/")
var searchUrl = ""
if(searchArr.length > 2){
searchUrl =searchArr(1)
}else{
searchUrl = searchArr(0)
}
(ip,time,refer,status,searchUrl)
}.filter(_._3.startsWith("/class")).map{x =>
//145.html
val referStr= x._3.split("/")(2)
val refer = referStr.substring(0,referStr.lastIndexOf("."))
ClickLog(x._1,x._2,refer,x._4,x._5)
}
//功能1: 统计今天到现在为止,实战课程的访问量
cleanData.map(x=>(x.time+"_"+x.course,1)).reduceByKey(_+_).foreachRDD{RDD =>
RDD.foreachPartition{rddPartition =>
val list = new ListBuffer[ClickCourseCount]
rddPartition.foreach{pair =>
list.append(ClickCourseCount(pair._1,pair._2))
}
ClickCourseCountDao.save(list)
}
}
//功能2: 统计今天到现在为止从搜索引擎引流过来的,实战课程的访问量
cleanData.filter{x =>x.search != "-"}.map(x=>(x.time+"_"+x.search+"_"+x.course,1)).reduceByKey(_+_).foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val list = new ListBuffer[ClickCourceSearchCount]
partition.foreach { pair =>
list.append(ClickCourceSearchCount(pair._1, pair._2))
}
ClickCourseSearchCountDao.save(list)
}
}
ssc.start()
ssc.awaitTermination()
}
}
需求功能2的功能流程,同样流程实现。这里交代一下rowkey的设计规则:yyyyMM_searchUrl_courseId.