前面讲到Hbase的时候可以通过Java API的方式操作Hbase数据库,由于Java和Scala可以互相调用,本节使用Scala语言通过Spark平台来实现分布式操作Hbase数据库,并且打包部署到Spark集群上面。这样我们对Spark+Scala项目开发有一个完整的认识和实际工作场景的一个体会。
我们创建一个Spark的工程,然后创建一个HbaseJob的object类文件,项目的功能是从Hbase批量读取课程商品表数据然后存储到Hadoop的HDFS上的功能,如代码3.15所示:
【代码3.15】 HbaseJob.scala
package com.chongdianleme.mailimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.{Result,Get,HConnectionManager}import org.apache.hadoop.hbase.util.{ArrayUtils,Bytes}import org.apache.spark._import scopt.OptionParserimport scala.collection.mutable.ListBuffer/**
* Created by 充电了么App - 陈敬雷
* Spark分布式操作Hbase实战
* 网站:http://www.chongdianleme.com
* 充电了么App - 专业上班族职业技能提升的在线教育平台
*/object HbaseJob{caseclassParams(//输入目录的数据就是课程ID,每行记录就一个课程ID,后面根据课程ID作为rowKey从Hbase里查询数据inputPath:String="file:///D:\\chongdianleme\\Hbase项目\\input",outputPath:String="file:///D:\\chongdianleme\\Hbase项目\\output",table:String="chongdianleme_kc",minPartitions:Int=1,mode:String="local")defmain(args:Array[String]){val defaultParams=Params()val parser=new OptionParser[Params]("HbaseJob"){head("HbaseJob: 解析参数.")opt[String]("inputPath").text(s"inputPath 输入目录, default: ${defaultParams.inputPath}}").action((x,c)=>c.copy(inputPath=x))opt[String]("outputPath").text(s"outputPath 输出目录, default: ${defaultParams.outputPath}").action((x,c)=>c.copy(outputPath=x))opt[Int]("minPartitions").text(s"minPartitions , default: ${defaultParams.minPartitions}").action((x,c)=>c.copy(minPartitions=x))opt[String]("table").text(s"table table, default: ${defaultParams.table}").action((x,c)=>c.copy(table=x))opt[String]("mode").text(s"mode 运行模式, default: ${defaultParams.mode}").action((x,c)=>c.copy(mode=x))note("""|For example,the following command runs this app on a HbaseJob dataset:""".stripMargin)}parser.parse(args,defaultParams).map{params=>{println("参数值:"+params)readFilePath(params.inputPath,params.outputPath,params.table,params.minPartitions,params.mode)}}getOrElse{System.exit(1)}println("充电了么App - Spark分布式批量操作Hbase实战 -- 计算完成!")}defreadFilePath(inputPath:String,outputPath:String,table:String,minPartitions:Int,mode:String)={val sparkConf=newSparkConf().setAppName("HbaseJob")sparkConf.setMaster(mode)val sc=newSparkContext(sparkConf)//加载数据文件val data=sc.textFile(inputPath,minPartitions)data.mapPartitions(batch(_,table)).saveAsTextFile(outputPath)sc.stop()}defbatch(keys:Iterator[String],hbaseTable:String)={val lineList=ListBuffer[String]()import scala.collection.JavaConversions._val conf=HBaseConfiguration.create()//每批数据创建一个Hbase连接,多条数据操作共享这个连接val connection=HConnectionManager.createConnection(conf)//获取表val table=connection.getTable(hbaseTable)keys.foreach(rowKey=>{try{//根据rowKey主键也就是课程ID查询数据val get=newGet(rowKey.getBytes())//指定需要获取的列蔟和列get.addColumn("kcname".getBytes(),"name".getBytes())get.addColumn("saleinfo".getBytes(),"price".getBytes())get.addColumn("saleinfo".getBytes(),"issale".getBytes())val result=table.get(get)var nameRS=result.getValue("kcname".getBytes(),"name".getBytes())var kcName="";if(nameRS!=null&&nameRS.length>0){kcName=newString(nameRS);}val priceRS=result.getValue("saleinfo".getBytes,"price".getBytes)var price=""if(priceRS!=null&&priceRS.length>0)price=newString(priceRS)val issaleRS=result.getValue("saleinfo".getBytes,"issale".getBytes)var issale=""if(issaleRS!=null&&issaleRS.length>0)issale=newString(issaleRS)lineList+=rowKey+"\001"+kcName+"\001"+price+"\001"+issale}catch{casee:Exception=>e.printStackTrace()}})//每批数据操作完毕,别忘了关闭表和数据库连接table.close()connection.close()lineList.toIterator}}
代码开发完成后,我们看看怎么部署到Spark集群上去运行,运行的方式和我们的Spark集群怎么部署的有关,Spark集群部署有三种方式:单独Standalone集群部署,Spark on yarn部署,local本地模式三种灵活部署方式,前两种都是分布式部署,后面的是单机方式。一般大数据部门都有Hadoop集群,所以推荐Spark on Yarn部署,这样更方便服务器资源的统一管理和分配。
Spark on Yarn部署非常简单,主要是把Spark的包解压就可以用了,每台服务器上放一份,并且放在相同的目录下。步骤如下:
1)配置scala环境变量
#解压Scala的包,然后vim /etc/profile
export SCALA_HOME=/home/hadoop/software/scala-2.11.8
2)解压tar xvzf spark--bin-hadoop.tgz,每台hadoop服务器上放在同一个目录下
不用任何配置值即可,用spark-submit提交就行。
Spark环境部署好之后,把我们的操作Hbase项目编译打包,一个是项目本身的jar,另一个是项目依赖的jar集合,分别上传到任意一台服务器就行,不要每台服务器都传,在哪台服务器运行就在哪台服务器上上传就行,依赖的jar包放在这个目录/home/hadoop/chongdianleme/chongdianleme-spark-task-1.0.0/lib/下,项目本身的jar包放在这里目录下/home/hadoop/chongdianleme/,然后通过spark-submit提交如下脚本即可:
hadoop fs -rmr /ods/kc/dim/ods_kc_dim_hbase/;
/home/hadoop/software/spark21/bin/spark-submit --jars $(echo /home/hadoop/chongdianleme/chongdianleme-spark-task-1.0.0/lib/*.jar | tr ’ ’ ‘,’) --master yarn --queue hadoop --num-executors 1 --driver-memory 1g --executor-memory 1g --executor-cores 1 --class com.chongdianleme.mail.HbaseJob /home/hadoop/chongdianleme/hbase-task.jar --inputPath /mid/kc/dim/mid_kc_dim_kcidlist/ --outputPath /ods/kc/dim/ods_kc_dim_hbase/ --table chongdianleme_kc --minPartitions 6 --mode yarn
其中hadoop fs -rmr /ods/kc/dim/ods_kc_dim_hbase/;是为了下次执行这个任务避免输出目录已经存在,我们提前先把输出先删掉,执行完之后输出目录会重新生成。
脚本参数说明:
–jars 是你程序依赖的所有jar存放的目录
–master 是指定在哪里跑,在Hadoop的Yarn上跑写Yarn,本地方式写Local。
– queue 如果是Yarn方式,指定分配到哪个队列的资源上。
– num-executors 指定跑几个Task。
–driver.maxResultSize driver的最大内存设置,默认1G比较小。超过了会OOM,可以根据情况设置大一些。
– executor-memory 每个Task分配内存。
– executor-cores每个Task分配几个虚拟CPU。
– class 你的程序的入口类,后面跟jar包,在后面是Java或Scala的main函数的业务参数。
这就是我们从编程,编译打包、部署到服务器如何分布式运行的完整过程,后面章节讲的Spark分布式机器学习也是这么来打包和部署的。
此文章有对应的配套视频,除了Spark项目案例实战和分布式部署,其它更多精彩文章请大家下载充电了么app,可获取千万免费好课和文章,配套新书教材请看陈敬雷新书:《分布式机器学习实战》(人工智能科学与技术丛书)
其它深度学习框架也有不错的开源实现,比如MXNet,后面请大家关注充电了么app,课程,微信群,更多内容请看新书《分布式机器学习实战(人工智能科学与技术丛书)》
【新书介绍】
《分布式机器学习实战》(人工智能科学与技术丛书)【陈敬雷编著】【清华大学出版社】https://item.jd.com/12743009.html
新书特色:深入浅出,逐步讲解分布式机器学习的框架及应用配套个性化推荐算法系统、人脸识别、对话机器人等实战项目
【新书介绍视频】
分布式机器学习实战(人工智能科学与技术丛书)新书【陈敬雷】https://ke.qq.com/course/3067704?flowToken=1029963
视频特色:重点对新书进行介绍,最新前沿技术热点剖析,技术职业规划建议!听完此课你对人工智能领域将有一个崭新的技术视野!职业发展也将有更加清晰的认识!
【精品课程】
《分布式机器学习实战》大数据人工智能AI专家级精品课程https://ke.qq.com/course/393750?flowToken=1028919
【免费体验视频】
人工智能百万年薪成长路线/从Python到最新热点技术https://ke.qq.com/course/package/31251?flowToken=1029962
从Python编程零基础小白入门到人工智能高级实战系列课
https://ke.qq.com/course/package/29782?flowToken=1028733
视频特色:本系列专家级精品课有对应的配套书籍《分布式机器学习实战》,精品课和书籍可以互补式学习,彼此相互补充,大大提高了学习效率。本系列课和书籍是以分布式机器学习为主线,并对其依赖的大数据技术做了详细介绍,之后对目前主流的分布式机器学习框架和算法进行重点讲解,本系列课和书籍侧重实战,最后讲几个工业级的系统实战项目给大家。课程核心内容有互联网公司大数据和人工智能那些事、大数据算法系统架构、大数据基础、Python编程、Java编程、Scala编程、Docker容器、Mahout分布式机器学习平台、Spark分布式机器学习平台、分布式深度学习框架和神经网络算法、自然语言处理算法、工业级完整系统实战(推荐算法系统实战、人脸识别实战、对话机器人实战)、就业/面试技巧/职业生涯规划/职业晋升指导等内容。
【充电了么App】
本书在充电了么App里有对应的视频课程,更多学习资源也可以通过下载充电了么App客户端,也可以从各大应用商店里搜索“充电了么”自行下载。充电了么是专注上班族职业技能提升的在线教育平台。这里有海量免费课程,在这里你可以学习牛人的实际工作经验,也能够大幅提升职业技能,提高工作效率,带来经济效益!除了陈敬雷老师的课以外,还有上千万好课免费分享。全都在充电了么App上。充电了么APP是专注上班族职业培训充电学习的在线教育平台。各大安卓商店和苹果App Store搜索“充电了么”即可下载。按照下图输入网址也可以下载哦~
充电了么官网:http://www.chongdianleme.com/
充电了么App官网下载地址:https://a.app.qq.com/o/simple.jsp?pkgname=com.charged.app
功能特色如下:
【全行业职位】 - 专注职场上班族职业技能提升
覆盖所有行业和职位,不管你是上班族,高管,还是创业都有你要学习的视频和文章。其中大数据智能AI、区块链、深度学习是互联网一线工业级的实战经验。
除了专业技能学习,还有通用职场技能,比如企业管理、股权激励和设计、职业生涯规划、社交礼仪、沟通技巧、演讲技巧、开会技巧、发邮件技巧、工作压力如何放松、人脉关系等等,全方位提高你的专业水平和整体素质。
【牛人课堂】 - 学习牛人的工作经验
1.智能个性化引擎:
海量视频课程,覆盖所有行业、所有职位,通过不同行业职位的技能词偏好挖掘分析,智能匹配你目前职位最感兴趣的技能学习课程。
2.听课全网搜索
输入关键词搜索海量视频课程,应有尽有,总有适合你的课程。
3.听课播放详情
视频播放详情,除了播放当前视频,更有相关视频课程和文章阅读,对某个技能知识点强化,让你轻松成为某个领域的资深专家。
【精品阅读】 - 技能文章兴趣阅读
1.个性化阅读引擎:
千万级文章阅读,覆盖所有行业、所有职位,通过不同行业职位的技能词偏好挖掘分析,智能匹配你目前职位最感兴趣的技能学习文章。
2.阅读全网搜索
输入关键词搜索海量文章阅读,应有尽有,总有你感兴趣的技能学习文章。
【机器人老师】 - 个人提升趣味学习
基于搜索引擎和智能深度学习训练,为您打造更懂你的机器人老师,用自然语言和机器人老师聊天学习,寓教于乐,高效学习,快乐人生。
【精短课程】 - 高效学习知识
海量精短牛人课程,满足你的时间碎片化学习,快速提高某个技能知识点。