Hbase RowKey 设计
使用Spark或通过REST/API 方式存取Hbase,性能影响最大的因素在于Hbase 的结构设计。Hbase 结构设计包括两个方面
- rowKey 的设计
- rowKey 和Hbase 表预分区
rowKey 数据尽量保持短小精悍,同时还要能与业务数据的主键有关联。
同时尽量散列。这样才能保证数据均匀的存储到Hbase 的Region里。数据均匀分不到Hbase Region 中,检索的速度才够快。
rowKey 设计有以下几种思路
- 拼接业务主键,转换为md5
- 拼接业务主键,按照一个Hash 算法截取前缀再拼接业务主键
第一种方法:数据分布比较平均,处理简单,当无法从rowKey 中抽取业务主键,因此需在Hbase 数据列中额外花费额外的空间存储。
第二种方法:数据分布也比较平均,需专门实现Hash 算法和抽取业务主键的方法,当节省了数据存成空间。
几种构造RowKey 的方法
// 对指定的列构造rowKey,采用Hash前缀拼接业务主键的方法
def rowKeyWithHashPrefix(column: String*): Array[Byte] = {
val rkString = column.mkString("")
val hash_prefix = getHashCode(rkString)
val rowKey = Bytes.add(Bytes.toBytes(hash_prefix), Bytes.toBytes(rkString))
rowKey
}
// 对指定的列构造rowKey, 采用Md5 前缀拼接业务主键方法,主要目的是建表时采用MD5 前缀进行预分区
def rowKeyWithMD5Prefix(separator:String,length: Int,column: String*): Array[Byte] = {
val columns = column.mkString(separator)
var md5_prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(columns))
if (length < 8){
md5_prefix = md5_prefix.substring(0, 8)
}else if (length >= 8 || length <= 32){
md5_prefix = md5_prefix.substring(0, length)
}
val row = Array(md5_prefix,columns)
val rowKey = Bytes.toBytes(row.mkString(separator))
rowKey
}
// 对指定的列构造RowKey,采用MD5方法
def rowKeyByMD5(column: String*): Array[Byte] = {
val rkString = column.mkString("")
val md5 = MD5Hash.getMD5AsHex(Bytes.toBytes(rkString))
val rowKey = Bytes.toBytes(md5)
rowKey
}
// 直接拼接业务主键构造rowKey
def rowKey(column:String*):Array[Byte] = Bytes.toBytes(column.mkString(""))
// Hash 前缀的方法:指定列拼接之后与最大的Short值做 & 运算
// 目的是预分区,尽量保证数据均匀分布
private def getHashCode(field: String): Short ={
(field.hashCode() & 0x7FFF).toShort
}
Hbase RowKey 设计和Hbase 建表
为了提高Hbase 写入速度,预分区是一种非常重要的技术手段。预分区之后,数据会被均匀分散到不同的region 中,这样不会出现写热点,从而提高Hbase写入速度。
/**
* Hbase自带了两种pre-split的算法,分别是 HexStringSplit 和 UniformSplit
* 如果我们的row key是十六进制的字符串作为前缀的,就比较适合用HexStringSplit
* @param tablename 表名
* @param regionNum 预分区数量
* @param columns 列簇数组
*/
def createHTable(connection: Connection, tablename: String,regionNum: Int, columns: Array[String]): Unit = {
val hexsplit: HexStringSplit = new HexStringSplit()
// 预先构建分区,指定分区的start key
val splitkeys: Array[Array[Byte]] = hexsplit.split(regionNum)
val admin = connection.getAdmin
val tableName = TableName.valueOf(nameSpace + ":" + tablename)
if (!admin.tableExists(tableName)) {
if(!admin.getNamespaceDescriptor(nameSpace).getName.equals(nameSpace))
admin.createNamespace(NamespaceDescriptor.create(nameSpace).build())
val tableDescriptor = new HTableDescriptor(tableName)
if (columns != null) {
columns.foreach(c => {
val hcd = new HColumnDescriptor(c.getBytes()) //设置列簇
hcd.setMaxVersions(1)
hcd.setCompressionType(Algorithm.GZ) //设定数据存储的压缩类型.默认无压缩(NONE)
tableDescriptor.addFamily(hcd)
})
}
admin.createTable(tableDescriptor,splitkeys)
}
}
/**
* short预分区建表:0X0000~0X7FFF
* @param connection
* @param tablename 表名
* @param regionNum 预分区数量
* @param columns 列簇数组
*/
def createHTable(connection: Connection, tablename: String,regionNum: Short, columns: Array[String]): Unit = {
val admin = connection.getAdmin
val tableName = TableName.valueOf(nameSpace+ ":" + tablename)
if (!admin.tableExists(tableName)) {
if(!admin.getNamespaceDescriptor(nameSpace).getName.equals(nameSpace))
admin.createNamespace(NamespaceDescriptor.create(nameSpace).build())
val tableDescriptor = new HTableDescriptor(tableName)
if (columns != null) {
columns.foreach(c => {
val hcd = new HColumnDescriptor(c.getBytes()) //设置列簇
hcd.setMaxVersions(1)
hcd.setCompressionType(Algorithm.GZ) //设定数据存储的压缩类型.默认无压缩(NONE)
tableDescriptor.addFamily(hcd)
})
}
val start = (0x7FFF / regionNum).toShort
val end = (0x7FFF - start).toShort
admin.createTable(tableDescriptor,Bytes.toBytes(start),Bytes.toBytes(end),regionNum)
}
}
第一种建表方式,需要在存取数据时采用MD5 算法构造rowKey, 第二种需要构造Hash前缀的rowKey.
通过以上方式建表和查询能大幅提高Hbase 写入和读取速度,并且不会出现热点region。
可参考我在Github 上实现:https://github.com/Smallhi/example