一、前言
本文示例协处理器实现根据scan条件对指点列进行count、avg。这里推荐HBase技术社区文章,该文非常详细的介绍了如何开发并部署一个HBase Endpoint Coprocessor。本文基于该文基础上,介绍如何在参数中传入Scan、Filter等参数。
二、环境准备
1、下载安装Protobuf,请根据当前hbase集群使用的Protobuf版本来进行安装。
2、配置环境变量
3、创建测试表
三、Protobuf生成序列化类
protobuf本身支持的数据类型不多,如果参数需要使用一个对象或者scan range,scan filter怎么办?本文主要介绍如何传入一个Scan对象
1、构建proto环境
可以找到hbase中找到hbase-protocol项目,protobuf目录下有着hbase已经定义好的许多proto,本文需要使用的Scan对象在Client.proto中定义,将需要的或者所有proto文件拷贝到上述安装Protobuf环境的机器上。
2、创建ClifeProto.proto文件,内容如下:
syntax = "proto2";
option java_package = "com.clife.data.hbase.coprocessor.aggregate";
option java_outer_classname = "ClifeProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "Client.proto";
message ClifeAvgRequest {
required string family = 1;
required string columns = 2;
optional Scan scan = 3;
}
message ClifeAvgResponse {
required int64 count = 1;
required string values = 2;
}
service ClifeService {
rpc getAvg(ClifeAvgRequest)
returns (ClifeAvgResponse);
}
注意ClifeProto.proto需要与Client.proto在同一目录下,如图:
3、生成java类
protoc --java_out=./ ClifeProto.proto
--java_out后面是指定生成java类的输出目录
执行完后可以在上图的com目录下找到对应的java类。
四、Endpoint Coprocessor服务端实现
1、构建maven项目
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-examples</artifactId>
<version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
在maven package的时候可以将上述依赖排除掉,避免打包后jar很大。
将步骤三中生成的java类,拷贝到ClifeProto.proto文件中配置的com.clife.data.hbase.coprocessor.aggregate包下。
2、构建Endpoint实现类
public class ClifeProtosEndPoint extends ClifeProtos.ClifeService implements Coprocessor, CoprocessorService {
protected static final Log log = LogFactory.getLog(ClifeProtosEndPoint.class);
private RegionCoprocessorEnvironment env;
/**
* 计算平均值
* 根据客户端传入的scan条件,对指定字段进行求和,
* 将求和结果与数据条数返回客户端,由客户端完成求平均
* 1、客户端需要传入参数:
* 1)scan(可选),可以通过scan设置rowkeyRange、timeRange、filter等
* 2)family(必须),每次rpc请求只允许操作一个列簇
* 3)colums(必须),需要统计的列,多个列之间用逗号分隔,如:"weight,age"
* 2、返回值:
* 1)Count: Long,查询的数据条数
* 2)Values:String,columns的求和结果,如:"weight:234,age:345"
* @param controller
* @param request
* @param done
*/
@Override
public void getAvg(RpcController controller, ClifeProtos.ClifeAvgRequest request, RpcCallback<ClifeProtos.ClifeAvgResponse> done) {
ClifeProtos.ClifeAvgResponse response = null;
long counter = 0L;
List<Cell> results = new ArrayList<>();
InternalScanner scanner = null;
try {
log.info("Start Clife avg endpoint.........................");
Scan scan = null;
ClientProtos.Scan cScan = request.getScan();
if (cScan != null) {
scan = ProtobufUtil.toScan(request.getScan());
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
if (startRow != null && stopRow != null)
log.info("StartRow = " + RowKeyUtil.convertByteRunDataRowKeyToString(startRow) +
", StopRow = " + RowKeyUtil.convertByteRunDataRowKeyToString(stopRow));
} else {
scan = new Scan();
}
byte[] cf = Bytes.toBytes(request.getFamily());
scan.addFamily(cf);
//传入列的方式 sales,sales
String colums = request.getColumns();
log.info("Input colums: " + colums);
Map<String, Long> columnMaps = new HashedMap();
for (String column : colums.split(",")) {
columnMaps.put(column, 0L);
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(column));
}
scanner = this.env.getRegion().getScanner(scan);
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
if (results.size() > 0) {
counter++;
}
for (Cell cell : results) {
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
Long temp = Long.parseLong(value);
columnMaps.put(column, columnMaps.get(column) + temp);
}
results.clear();
} while (hasMoreRows);
StringBuffer values = new StringBuffer();
for (String key : columnMaps.keySet()) {
Long value = columnMaps.get(key);
values.append(key).append(":").append(value).append(",");
}
log.info("Clife avg server result: " + values);
response = ClifeProtos.ClifeAvgResponse.newBuilder()
.setCount(counter)
.setValues(values.toString())
.build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
}
}
log.info("Row counter from this region is "
+ env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
done.run(response);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
}
@Override
public Service getService() {
return this;
}
}
3、Endpoint Coprocessor客户端实现
public class ClifeProtosExample {
/**
* 效率最高的方式
* 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback<R>)方法获取表的总条数
* @param table HBase表名
* @return 返回表的总条数
*/
public static long execFastEndpointCoprocessor(Table table, final Scan scan, final String family, final String cloumes) {
long start_t = System.currentTimeMillis();
//定义总的 rowCount 变量
final AtomicLong totalRowCount = new AtomicLong();
final Map<String, AtomicLong> sumMap = new HashMap<>();
try {
Batch.Callback<ClifeProtos.ClifeAvgResponse> callback = new Batch.Callback<ClifeProtos.ClifeAvgResponse>() {
@Override
public void update(byte[] bytes, byte[] bytes1, ClifeProtos.ClifeAvgResponse clifeAvgResponse) {
//更新Count值
totalRowCount.getAndAdd(clifeAvgResponse.getCount());
String values = clifeAvgResponse.getValues();
for(String kv : values.split(",")) {
String[] kvs = kv.split(":");
String key = kvs[0];
Long value = Long.parseLong(kvs[1]);
if (!sumMap.containsKey(key)) {
final AtomicLong sum = new AtomicLong();
sum.getAndAdd(value);
sumMap.put(key, sum);
} else {
sumMap.get(key).getAndAdd(value);
}
}
}
};
final ClientProtos.Scan cScan = ProtobufUtil.toScan(scan);
table.coprocessorService(ClifeProtos.ClifeService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<ClifeProtos.ClifeService, ClifeProtos.ClifeAvgResponse>() {
@Override
public ClifeProtos.ClifeAvgResponse call(ClifeProtos.ClifeService aggregationService) throws IOException {
ClifeProtos.ClifeAvgRequest requet =
ClifeProtos.ClifeAvgRequest.newBuilder()
.setScan(cScan)
.setFamily(family)
.setColumns(cloumes)
.build();
BlockingRpcCallback<ClifeProtos.ClifeAvgResponse> rpcCallback = new BlockingRpcCallback<>();
aggregationService.getAvg(null, requet, rpcCallback);
ClifeProtos.ClifeAvgResponse response = rpcCallback.get();
return response;
}
}, callback);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
System.out.println("耗时:" + (System.currentTimeMillis() - start_t));
System.out.println("totalRowCount:" + totalRowCount.longValue());
for (String key : sumMap.keySet()) {
Double value = sumMap.get(key).doubleValue();
System.out.println(key + " avg = " + value / totalRowCount.longValue());
}
return totalRowCount.longValue();
}
}
注意:
1)本文协处理器在服务端只做求和,平均值在客户端完成;
2)传入的Scan对象,支持rowkey range、time range、filter等scan的条件过滤
五、部署及调用
1、maven编译
2、将编译好的jar上传到hdfs某目录下,注意文件所属用户组。如:/hbase/coprocessor
3、协处理器装载
协处理器的装载分为动态和静态两种,参照这篇文章。本文介绍的协处理器存在很重的业务性,并不适合动态加载。本文的目的也只是介绍如何在协处理器中使用Scan等对象。
alter 'test_coprocessor',METHOD=>'table_att','COPROCESSOR'=>'hdfs://nameservice1/hbase/coprocessor/clife-data-hbase-1.0.8.jar||com.clife.data.hbase.coprocessor.aggregate.ClifeProtosEndPoint||'