最近进行 HBase 表跨集群迁移,使用组内同事给的方案 : bulkload,但是 bulkload 完之后出现了一系列预料之外的问题,记录如下:
hbase 表跨集群迁移步骤
- 目标集群创建 HBase 表,未进行预分区;
- 源集群 hbase 表对应的 hdfs 路径创建 snaphost ,注意此 snapshot 是 hdfs snaphost;
- distcp 拷贝 hdfs 快照数据到目标集群
- 目标集群遍历拷贝数据目录,逐个 bulkload 到 hbase
问题
bulkload 结束后,这张 700 亿行的大表,只有一个 region,花了近一周的时间 minor compaction,region split 好;
原先以为,拷贝的数据里面除了列族数据,还包含 region 信息(/region_id/.regioninfo),本来以为 bulkload 会自动处理 region,通过了解了一番源码发现,事情并非如此。
源码分析 bulkload 过程
1 初始化一个线程池,线程池 corePoolSize 来源于参数配置 hbase.loadincremental.threads.max,如果未配置,默认取 jvm 可以用到的处理器的个数(Runtime.getRuntime().availableProcessors())。
2 遍历搜索过滤出 HFile 文件:遍历目录,搜索 hfile。途中经过一系列校验,判断是否有 families,family name 是否合法性,跳过非 hfile 文件,例如 _ 开头的文件,引用,HFileLink,最后判断 HFile 格式的有效性。
扫描过程中会检查 HFile 文件的大小是否超出 region 大小的阈值(hbase.hregion.max.filesize,未配置的话默认是 10G),如果超出阈值,会打印提示这可能会导致出现 oversplitting 的问题。
将遍历后的 hfile 以对象 LoadQueueItem(byte[] family, Path hfilePath) 的方式放入队列 :Deque。
这一步就把 .regioninfo 就排除掉了,所以这个拷贝过来的 region 信息对于 bulkload 是无用了。
famliy 存在性校验:再经过一次筛选,判断是否有获取到的 family 是否是即将导入 HBase 表中的 family。
3 groupOrSplitPhase 阶段
这个阶段判断 hfile 判断应该写到表的哪个 region,如果跨 region 了,需要进行 split。
获取当前表所有 region 的 startkeys endkeys
final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
拿到即将导入的 hfile 的 startkey,通过二分查找算法在 startkey 列表里面搜索,如果搜到匹配的 startkey 直接返回数组索引值,没搜到,返回插入点,插入点是第一个大于值的索引位置。
int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
Bytes.BYTES_COMPARATOR);
if (idx < 0) {
// not on boundary, returns -(insertion index). Calculate region it
// would be in.
idx = -(idx + 1) - 1;
}
final int indexForCallable = idx;
通过这个索引值,判断是否有跨 region 的 hfile,有的话需要 split。怎么判断是否不需要 split,也就是有合适的 region,满足其中两个条件之一即可:
- hfile 的 endkey 小于表 region 的 endkey
- 表 region endeky 为空,说明是最后一个 region ,理所当然可以写入
boolean lastKeyInRange =
Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) {
// split key 即为匹配 region 的 endkey
List<LoadQueueItem> lqis = splitStoreFile(item, table,
startEndKeys.getFirst()[indexForCallable],
startEndKeys.getSecond()[indexForCallable]);
return lqis;
}
// group regions.
regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");
拆分是把当前 HFile 拆分成两半,top 和 bottom 两部分,保留元数据,重建 bloom 过滤等,生成新的 HFile ,拆分策略是:根据匹配 region 的 endkey 的位置拆分成两个。
/**
* Split a storefile into a top and bottom half, maintaining
* the metadata, recreating bloom filters, etc.
*/
static void splitStoreFile(
Configuration conf, Path inFile,
HColumnDescriptor familyDesc, byte[] splitKey,
Path bottomOut, Path topOut) throws IOException
{
// Open reader with no block cache, and not in-memory
Reference topReference = Reference.createTopReference(splitKey);
Reference bottomReference = Reference.createBottomReference(splitKey);
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
}
bulkLoadPhase:bulkload 阶段
计算出 region 信息之后,就是正式的 load 阶段,最终定位到 HStore 里面的 bulkLoadFile 方法
通过 StoreFile reader 读取 StoreFile ,获取写锁,往 storefile 中新增数据。
private void bulkLoadHFile(StoreFile sf) throws IOException {
StoreFile.Reader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
// Append the new storefile into the list
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
// the lock.
this.lock.writeLock().unlock();
}
notifyChangedReadersObservers();
LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
if (LOG.isTraceEnabled()) {
String traceMessage = "BULK LOAD time,size,store size,store files ["
+ EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
+ "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
LOG.trace(traceMessage);
}
}
结论
对于待迁移的 HBase 大表, bulkload 前尽可能在建表时做好预分区