Hbase二级索引(BaseRegionObserver 协处理器)

概述

HBase 是一款基于 Hadoop 的 key-value 数据库,它提供了对 HDFS 上数据的高效随机读写服务,完美地填补了 Hadoop MapReduce 仅适于批处理的缺陷,正在被越来越多的用户使用。作为 HBase 的一项重要特性,Coprocessor 在 HBase 0.92 版本中被加入,并广受欢迎

利用协处理器,用户可以编写运行在 HBase Server 端的代码。HBase 支持两种类型的协处理器,Endpoint 和 Observer。Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体的执行效率就会提高很多。

另外一种协处理器叫做 Observer Coprocessor,这种协处理器类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。

开发环境

  • maven-3.3.9
  • jdk 1.7
  • cdh-hbase-1.2.0
  • myeclipse 10

hbase协处理器加载

进入hbase命令行

# hbase shell

hbase(main):> disable 'test'    

hbase(main):> alter 'test',CONFIGURATION => {'hbase.table.sanity.checks'=>'false'}         //-----》建立表后,执行一次就行

hbase(main):> alter 'test','coprocessor'=>'hdfs:///code/jars/regionObserver-put5.jar|com.hbase.observer.App|1001'   //----》加载jar包

hbase(main):> alter 'test', METHOD => 'table_att_unset', NAME => 'coprocessor$1'  //--------》卸载jar包

hbase(main):> desc 'test'    //-------》查看表的属性描述

hbase(main):> enable 'test'

完整工程代码

package com.hbase.observer;

/**
 * hbase 二级索引
 * @author wing
 * @createTime 2017-4-7 
 */
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

public class App extends BaseRegionObserver {

    private HTablePool pool = null;

    private final static String SOURCE_TABLE = "test";

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        pool = new HTablePool(env.getConfiguration(), 10);
    }

    @Override
    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c,
            Get get, List<Cell> results) throws IOException {
        HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
        String newRowkey = Bytes.toString(get.getRow());
        String pre = newRowkey.substring(0, 1);

        if (pre.equals("t")) {
            String[] splits = newRowkey.split("_");
            String prepre = splits[0].substring(1, 3);
            String timestamp = splits[0].substring(3);
            String uid = splits[1];
            String mid = "";
            for (int i = 2; i < splits.length; i++) {
                mid += splits[i];
                mid += "_";
            }
            mid = mid.substring(0, mid.length() - 1);
            String rowkey = prepre + uid + "_" + timestamp + "_" + mid;
            System.out.println(rowkey);
            Get realget = new Get(rowkey.getBytes());
            Result result = table.get(realget);

            List<Cell> cells = result.listCells();
            results.clear();
            for (Cell cell : cells) {
                results.add(cell);
            }

        }
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
            Put put, WALEdit edit, Durability durability) throws IOException {
        try {

            String rowkey = Bytes.toString(put.getRow());
            HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));

            String pre = rowkey.substring(0, 2);
            if (pre.equals("aa") || pre.equals("ab") || pre.equals("ac")
                    || pre.equals("ba") || pre.equals("bb") || pre.equals("bc")
                    || pre.equals("ca") || pre.equals("cb") || pre.equals("cc")) {
                String[] splits = rowkey.split("_");
                String uid = splits[0].substring(2);
                String timestamp = splits[1];
                String mid = "";
                for (int i = 2; i < splits.length; i++) {
                    mid += splits[i];
                    mid += "_";
                }
                mid = mid.substring(0, mid.length() - 1);
                String newRowkey = "t" + pre + timestamp + "_" + uid + "_"
                        + mid;
                System.out.println(newRowkey);
                Put indexput2 = new Put(newRowkey.getBytes());
                indexput2.addColumn("relation".getBytes(),
                        "column10".getBytes(), "45".getBytes());
                table.put(indexput2);

            }
            table.close();

        } catch (Exception ex) {

        }

    }

    @Override
    public boolean postScannerNext(
            ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s,
            List<Result> results, int limit, boolean hasMore)
            throws IOException {
        HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
        List<Result> newresults = new ArrayList<Result>();
        for (Result result : results) {
            String newRowkey = Bytes.toString(result.getRow());

            String pre = newRowkey.substring(0, 1);

            if (pre.equals("t")) {
                String[] splits = newRowkey.split("_");
                String prepre = splits[0].substring(1, 3);
                String timestamp = splits[0].substring(3);
                String uid = splits[1];
                String mid = "";
                for (int i = 2; i < splits.length; i++) {
                    mid += splits[i];
                    mid += "_";
                }
                mid = mid.substring(0, mid.length() - 1);
                String rowkey = prepre + uid + "_" + timestamp + "_" + mid;

                Get realget = new Get(rowkey.getBytes());
                Result newresult = table.get(realget);

                newresults.add(newresult);
            }

        }
         results.clear();
        for (Result result : newresults) {
            results.add(result);
        }

        return hasMore;

    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        pool.close();
    }
    
}

通过maven工程打包后上传到hdfs相应目录,再通过命令加载jar包。
即可完成二级索引。

  • 当用户put操作时,会将原rowkey,转换为新的rowkey,再存一份索引。
  • 当用户get操作时,会将rowkey映射为实际的rowkey,再根据实际的rowkey获取实际的结果。
  • 当用户执行scanner操作时,会将scanner的结果映射为实际rowkey的结果,返回给用户。

通过hbase的BaseRegionObserver 协处理器,可以封装处理很多hbase操作。

BaseRegionObserver的java接口(注意hbase版本)
https://hbase.apache.org/1.2/apidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339

推荐阅读更多精彩内容

  • HBase那些事 @(大数据工程学院)[HBase, Hadoop, 优化, HadoopChen, hbase]...
    分痴阅读 3,923评论 3 17
  • 最近在逐步跟进Hbase的相关工作,由于之前对Hbase并不怎么了解,因此系统地学习了下Hbase,为了加深对Hb...
    飞鸿无痕阅读 50,158评论 19 271
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,559评论 18 139
  • Hbase架构与原理 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang所撰写的Goo...
    全能程序猿阅读 86,270评论 2 37
  • HBase存储架构图 HBase Master 为Region server分配region 负责Region s...
    kimibob阅读 5,551评论 0 52