SparkSQL读取HBase数据

这里的SparkSQL是指整合了Hive的spark-sql cli(关于SparkSQL和Hive的整合,见文章后面的参考阅读).

本质上就是通过Hive访问HBase表,具体就是通过hive-hbase-handler .

环境篇

hadoop-2.3.0-cdh5.0.0

apache-hive-0.13.1-bin

spark-1.4.0-bin-hadoop2.3

hbase-0.96.1.1-cdh5.0.0

部署情况如下图:


测试集群,将Spark Worker部署在每台DataNode上,是为了最大程度的任务本地化,Spark集群为Standalone模式部署。

其中有三台机器上也部署了RegionServer。

这个部署情况对理解后面提到的任务本地化调度有帮助。


配置篇


1. 拷贝以下HBase的相关jar包到Spark Master和每个Spark Worker节点上的$SPARK_HOME/lib目录下.

(我尝试用–jars的方式添加之后,不work,所以采用这种土办法)

$HBASE_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar

$HBASE_HOME/lib/htrace-core-2.01.jar

$HBASE_HOME/lib/protobuf-java-2.5.0.jar

$HBASE_HOME/lib/guava-12.0.1.jar


$HIVE_HOME/lib/hive-hbase-handler-0.13.1.jar


2.配置每个节点上的$SPARK_HOME/conf/spark-env.sh,将上面的jar包添加到SPARK_CLASSPATH

export SPARK_CLASSPATH=$SPARK_HOME/lib/hbase-client-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/hbase-common-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/hbase-protocol-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/hbase-server-0.96.1.1-cdh5.0.0.jar:

$SPARK_HOME/lib/htrace-core-2.01.jar:

$SPARK_HOME/lib/protobuf-java-2.5.0.jar:

$SPARK_HOME/lib/guava-12.0.1.jar:

$SPARK_HOME/lib/hive-hbase-handler-0.13.1.jar:

${SPARK_CLASSPATH}

3.将hbase-site.xml拷贝至${HADOOP_CONF_DIR},由于spark-env.sh中配置了Hadoop配置文件目录${HADOOP_CONF_DIR},因此会将hbase-site.xml加载。

hbase-site.xml中主要是以下几个参数的配置:

hbase.zookeeper.quorum

zkNode1:2181,zkNode2:2181,zkNode3:2181

HBase使用的zookeeper节点

hbase.client.scanner.caching

5000

HBase客户端扫描缓存,对查询性能有很大帮助

另外还有一个参数:zookeeper.znode.parent=/hbase

是HBase在zk中的根目录,默认为/hbase,视实际情况进行配置。

4.重启Spark集群。

 大数据学习交流群:724693112 欢迎想学习大数据和需要大数据学习资料的同学来一起学习。

使用篇

hbase中有表lxw1234,数据如下:

hbase(main):025:0* scan 'lxw1234'

ROW COLUMN+CELL

lxw1234.com column=f1:c1, timestamp=1435624625198, value=name1

lxw1234.com column=f1:c2, timestamp=1435624591717, value=name2

lxw1234.com column=f2:c1, timestamp=1435624608759, value=age1

lxw1234.com column=f2:c2, timestamp=1435624635261, value=age2

lxw1234.com column=f3:c1, timestamp=1435624662282, value=job1

lxw1234.com column=f3:c2, timestamp=1435624697028, value=job2

lxw1234.com column=f3:c3, timestamp=1435624697065, value=job3

1 row(s) in 0.0350 seconds

进入spark-sql,使用如下语句建表:

CREATE EXTERNAL TABLE lxw1234 (

rowkey string,

f1 map,

f2 map,

f3 map

) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,f1:,f2:,f3:")

TBLPROPERTIES ("hbase.table.name" = "lxw1234");

建好之后,就可以查询了:

spark-sql> select * from lxw1234;

lxw1234.com {"c1":"name1","c2":"name2"} {"c1":"age1","c2":"age2"} {"c1":"job1","c2":"job2","c3":"job3"}

Time taken: 4.726 seconds, Fetched 1 row(s)

spark-sql> select count(1) from lxw1234;

1

Time taken: 2.46 seconds, Fetched 1 row(s)

spark-sql>

大表查询,消耗的时间和通过Hive用MapReduce查询差不多。

spark-sql> select count(1) from lxw1234_hbase;

53609638

Time taken: 335.474 seconds, Fetched 1 row(s)

在spark-sql中通过insert插入数据到HBase表时候报错:

INSERT INTO TABLE lxw1234

SELECT 'row1' AS rowkey,

map('c3','name3') AS f1,

map('c3','age3') AS f2,

map('c4','job3') AS f3

FROM lxw1234_a

limit 1;


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 4 times,

most recent failure: Lost task 0.3 in stage 10.0 (TID 23, slave013.uniclick.cloud):

java.lang.ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat

at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat$lzycompute(hiveWriterContainers.scala:74)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.outputFormat(hiveWriterContainers.scala:73)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.getOutputName(hiveWriterContainers.scala:93)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.initWriters(hiveWriterContainers.scala:117)

at org.apache.spark.sql.hive.SparkHiveWriterContainer.executorSideSetup(hiveWriterContainers.scala:86)

at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:99)

at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)

at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:83)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)


Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

at scala.Option.foreach(Option.scala:236)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

这个还有待分析。


关于Spark任务本地化运行

先看这张图,该图为运行select * from lxw1234_hbase;这张大表查询时候的任务运行图。


Spark和Hadoop MapReduce一样,在任务调度时候都会考虑数据本地化,即”任务向数据靠拢”,尽量将任务分配到数据所在的节点上运行。

基于这点,lxw1234_hbase为HBase中的外部表,Spark在解析时候,通过org.apache.hadoop.hive.hbase.HBaseStorageHandler获取到表lxw1234_hbase在HBase中的region所在的RegionServer,即:slave004、slave005、slave006 (上面的部署图中提到了,总共只有三台RegionServer,就是这三台),所以,在调度任务时候,首先考虑要往这三台节点上分配任务。

表lxw1234_hbase共有10个region,因此需要10个map task来运行。

再看一张图,这是spark-sql cli指定的Executor配置:



每台机器上Worker的实例为2个,每个Worker实例中运行的Executor为1个,因此,每台机器上运行两个Executor.

那么salve004、slave005、slave006上各运行2个Executor,总共6个,很好,Spark已经第一时间将这6个Task交给这6个Executor去执行了(NODE_LOCAL Tasks)。

剩下4个Task,没办法,想NODE_LOCAL运行,但那三台机器上没有剩余的Executor了,只能分配给其他Worker上的Executor,这4个Task为ANY Tasks。

正如那张任务运行图中所示。


写在后面

通过Hive和spark-sql去访问HBase表,只是为统计分析提供了一定的便捷性,个人觉得性能上的优势并不明显。

可能Spark通过API去读取HBase数据,性能更好些吧,以后再试。

另外,spark-sql有一点好处,就是可以先把HBase中的数据cache到一张内存表中,然后在这张内存表中,

通过SQL去统计分析,那就爽多了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容