使用newAPIHadoopRDD方法,将hbase记录读出到RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.sql.hive.HiveContext
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
//创建HBase配置
val hconf = HBaseConfiguration.create()
hconf.set("hbase.zookeeper.quorum","sparkmaster1")
//设置表名
hconf.set(TableInputFormat.INPUT_TABLE, "wetag")
val hbaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
调用Result方法访问元素
val cnvRDD = hbaseRDD.map{ case (_, result) =>
val key = Bytes.toString(result.getRow)
var sex: String = null
var age: String = null
var province: String = null
if (result.containsColumn("attr".getBytes, "population.sex".getBytes)) sex = Bytes.toString(result.getValue("attr".getBytes,"population.sex".getBytes))
if (result.containsColumn("attr".getBytes, "population.age".getBytes)) age = Bytes.toString(result.getValue("attr".getBytes,"population.age".getBytes))
if (result.containsColumn("attr".getBytes, "region.work_province".getBytes)) province = Bytes.toString(result.getValue("attr".getBytes,"region.work_province".getBytes))
(key.reverse, sex, age, province)
}
val ax = cnvRDD.take(20)
ax.foreach(println)