程序入口
在org.apache.hadoop.hbase.master.HMaster中定义了MasterRpcServices提供rpc服务
org.apache.hadoop.hbase.master.MasterRpcServices实现了接口MasterProtos.MasterService.BlockingInterface
其中使用了google的protobuf rpc通信,可以参见另一篇文章:hbase与客户端的通信过程解析,最终实现了createTable接口:
@Override
public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
throws ServiceException {
HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
try {
long procId =
master.createTable(hTableDescriptor, splitKeys, req.getNonceGroup(), req.getNonce());
return CreateTableResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
habse的procedure框架
@Override
public long createTable(
final HTableDescriptor hTableDescriptor,
final byte [][] splitKeys,
final long nonceGroup,
final long nonce) throws IOException {
...
return MasterProcedureUtil.submitProcedure(
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
...
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
submitProcedure(new CreateTableProcedure(
procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch));
latch.await();
...
}
});
}
这里引用了两个概念,一个是NonceProcedureRunnable,意思是以下的代码只能同时执行一次,避免重复执行同一个create table操作;另一个是Procedure,对一些必须保证事务性的操作,hbase实现了一套Procedure操作,方便rollback;
org.apache.hadoop.hbase.procedure2.ProcedureExecutor
定义了几个核心的方法:
- public long submitProcedure(final Procedure proc, final NonceKey nonceKey)
实现了一个Procedure的提交过程,写wal,同时将Procedure加入runnable和rollback队列 - private void execProcedure(final RootProcedureState procStack, final Procedure procedure)
执行一个procedure,如果procedure有下一步要执行的subprocedure,那么继续执行 - private boolean executeRollback(final Procedure proc)
实现一个procedure的rollback - private void load(final boolean abortOnCorruption)
当程序异常终止后通过wal恢复现场
org.apache.hadoop.hbase.procedure2.Procedure
execute()
is called each time the procedure is executed.it may be called multiple times in case of failure and restart, so the code must be idempotent.the return is a set of sub-procedures or null in case the procedure doesn't have sub-procedures. Once the sub-procedures are successfully completed the execute() method is called again, you should think at it as a stack:rollback()
is called when the procedure or one of the sub-procedures is failed.he rollback step is supposed to cleanup the resources created during theexecute() step. in case of failure and restart rollback() may be called multiple times, so the code must be idempotent.
org.apache.hadoop.hbase.procedure2.StateMachineProcedure
实现了一个按照state去以此执行的procedure
- Once the procedure is running, the procedure-framework will call executeFromState(), using the 'state' provided by the user. The implementor can jump between states using setNextState(MyStateEnum.ordinal()).
- The rollback will call rollbackState() for each state that was executed, in reverse order.
对procedure框架的使用:
- 实现procedure类:CreateTableProcedure extends StateMachineProcedure<MasterProcedureEnv, CreateTableState>
- 在Hmaster中start一个ProcedureExecutor,提交CreateTableProcedure
具体的create table过程
@Override
protected Flow executeFromState(final MasterProcedureEnv env, final CreateTableState state)
throws InterruptedException {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " execute state=" + state);
}
try {
switch (state) {
case CREATE_TABLE_PRE_OPERATION:
// Verify if we can create the table
boolean exists = !prepareCreate(env);
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
if (exists) {
assert isFailed() : "the delete should have an exception here";
return Flow.NO_MORE_STATE;
}
preCreate(env);
setNextState(CreateTableState.CREATE_TABLE_WRITE_FS_LAYOUT);
break;
case CREATE_TABLE_WRITE_FS_LAYOUT:
newRegions = createFsLayout(env, hTableDescriptor, newRegions);
setNextState(CreateTableState.CREATE_TABLE_ADD_TO_META);
break;
case CREATE_TABLE_ADD_TO_META:
newRegions = addTableToMeta(env, hTableDescriptor, newRegions);
setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS);
break;
case CREATE_TABLE_ASSIGN_REGIONS:
assignRegions(env, getTableName(), newRegions);
setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE);
break;
case CREATE_TABLE_UPDATE_DESC_CACHE:
updateTableDescCache(env, getTableName());
setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION);
break;
case CREATE_TABLE_POST_OPERATION:
postCreate(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (HBaseException|IOException e) {
LOG.error("Error trying to create table=" + getTableName() + " state=" + state, e);
setFailure("master-create-table", e);
}
return Flow.HAS_MORE_STATE;
}
入代码所示,流程如下:
- CREATE_TABLE_PRE_OPERATION
判断表存在,调用MasterCoprocessor;
注意这行代码:ProcedurePrepareLatch.releaseLatch(syncLatch, this);
意味着当判断过table是否存在后latch释放,客户端请求即可返回,因此create table这个操作对于客户端来说本身是一个异步操作 - CREATE_TABLE_WRITE_FS_LAYOUT
在磁盘上创建对应的目录文件 - CREATE_TABLE_ADD_TO_META
将table加到meta表里边,但是region并不可用 - CREATE_TABLE_ASSIGN_REGIONS
将region分配给regionserver,这一步将region都设置为offline,写到zk的region-in-transition目录下,提供给regionserver接管;同时会enable这个table; - CREATE_TABLE_UPDATE_DESC_CACHE
update一下内存中的cache - CREATE_TABLE_POST_OPERATION
调用MasterCoprocessor;