虽说hbase适合写多读少,但是hbase的读性能也是非常强悍的,hbase有如此好的读性能其中少不了BlockCache。BlockCache是regionserver级别的一种缓存,目前有三种实现方式:LruBlockCache、SlabCache和BucketCache,本文只对BucketCache的实现方式进行剖析。本文从BucketCache的内存模型、读写流程以及使用配置三方面进行说明。
BucketCache可以指定三种不同的存储介质:onHeap(java堆上内存)、offHeap(java堆外内存)和file(文件),不管使用哪种存储介质,内部的内存模型、读写流程都是一致的。
内存模型
关于内存模型,BucketCache初始化时默认会申请14个不同大小的Bucket,一种Bucket存储一种指定BlockSize的数据块,每个Bucket的大小默认为2M,不同大小的Bucket之间的内存是可以互相使用的,从而保证的内存的使用率。BucketCache的内存模型如下图所示:
读写流程
BucketCache中一共包括5个模块:ramCache、backingMap、ioEngine、writerThreads和BucketAllocator。
ramCache:block在写入BucketCache中指定的存储介质之前会先存储在ramCache这map中。
backingMap:记录写入BucketCache的BlockKey和对应Block在BucketCache中的offset。
ioEngine:实际写入存储介质的类,将Block数据写入对应地址的空间中。
writerThreads:多个线程,主要负责异步将Block写入存储介质中,每个线程都有一个支持并发的队列,用来存储Block。
BucketAllocator:为Block分配存储介质上的空间,主要就是获取一个存储介质上的offset,不同Bucket大小有对应的BucketAllocator。
BucketCache中的读写流程如下图所示:
写入流程:
1.进入到BucketCache类中的Block会首先将BlockKey和对应的Block存入到ramCache这个map,之后将该Block存入到对应writerThread线程对应的队列中。
2.writerThread线程持续地从队列中获取所有的Block。
3.调用对应Bucket大小的BucketAllocator为对应大小的Block分配内存,也就是获取一个存储介质上的offset。
4.调用ioEngine模块将Block写入到分配好的空间上。
cacheBlockWithWait:
cacheBlockWithWait方法是BucketCache写入Block的入口函数。
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean wait) {
if (!cacheEnabled) {
return;
}
if (backingMap.containsKey(cacheKey)) {
return;
}
//(1)将传入的Block构造成RAMQueueEntry对象,并存入ramCache中。
RAMQueueEntry re =
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
if (ramCache.putIfAbsent(cacheKey, re) != null) {
return;
}
//(2)将Block构造成的RAMQueueEntry对象,存入写线程的并发队列中。
int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
boolean successfulAddition = false;
if (wait) {
try {
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
successfulAddition = bq.offer(re);
}
// (3)如果存入并发队列失败,则将 ramCache中的RAMQueueEntry对象移出,并记录失败次数。
if (!successfulAddition) {
ramCache.remove(cacheKey);
cacheStats.failInsert();
} else {
this.blockNumber.incrementAndGet();
this.heapSize.addAndGet(cachedItem.heapSize());
blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
}
}
WriterThread.run
异步写入存储介质的后台线程。
public void run() {
List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();
try {
while (cacheEnabled && writerEnabled) {
try {
try {
// Blocks
//(1) 获取并发队列中存放的所有RAMQueueEntry(Block)
entries = getRAMQueueEntries(inputQueue, entries);
} catch (InterruptedException ie) {
if (!cacheEnabled) break;
}
//(2)BucketAllocator分配内存,并调用ioEngine模块将Block写入指定的存储介质中,写入成后存入backingMap中。
doDrain(entries);
} catch (Exception ioe) {
LOG.error("WriterThread encountered error", ioe);
}
}
} catch (Throwable t) {
LOG.warn("Failed doing drain", t);
}
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
}
doDrain
将队列中的Block写入ioEngine模块指定的存储介质中,并将对应的entry写入backingMap中。
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
if (entries.isEmpty()) {
return;
}
final int size = entries.size();
BucketEntry[] bucketEntries = new BucketEntry[size];
//(1)循环遍历所有的RAMQueueEntry,调用对应大小Bucket的bucketAllocator分配空间,并使用ioEngine将block写入指定的介质。
//如果bucketAllocator分配空间时报错则跳过该entry,并在后面会将ramCache中对应的Block移除。
//如果因为存储介质内存满了,则会调用存储介质的释放空间,如果该存储介质正常释放空间则进行休眠,稍后重试。
int index = 0;
while (cacheEnabled && index < size) {
RAMQueueEntry re = null;
try {
re = entries.get(index);
if (re == null) {
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
index++;
continue;
}
//(2)通过bucketAllocator获取block在ioEngine中的偏移值,之后使用ioEngine将block写入指定的介质。
BucketEntry bucketEntry =
re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
ioErrorStartTime = -1;
}
index++;
}
//(3)如果bucketAllocator获取到block在ioEngine的offset(可能是由于block的len大于定义的所有Bucket指定的大小),
//则跳过该block,之后ramCache会将该block移除。
catch (BucketAllocatorException fle) {
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
bucketEntries[index] = null;
index++;
}
//(4)如果ioEngine指定的介质内存满了则调用freeSpace,同LRU算法进行释放内存,如果正在释放内存就调用休眠,之后再重试。
catch (CacheFullException cfe) {
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
if (!freeInProgress) {
freeSpace("Full!");
} else {
Thread.sleep(50);
}
} catch (IOException ioex) {
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
LOG.error("Failed writing to bucket cache", ioex);
checkIOErrorIsTolerated();
}
}
// Make sure data pages are written are on media before we update maps.
// (5)如果ioEngine指定的介质是磁盘,则需要先同步到磁盘之后再添加到backingMap
try {
ioEngine.sync();
} catch (IOException ioex) {
LOG.error("Failed syncing IO engine", ioex);
checkIOErrorIsTolerated();
// Since we failed sync, free the blocks in bucket allocator
for (int i = 0; i < entries.size(); ++i) {
if (bucketEntries[i] != null) {
bucketAllocator.freeBlock(bucketEntries[i].offset());
bucketEntries[i] = null;
}
}
}
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
// success or error.
for (int i = 0; i < size; ++i) {
BlockCacheKey key = entries.get(i).getKey();
// Only add if non-null entry.
if (bucketEntries[i] != null) {
backingMap.put(key, bucketEntries[i]);
}
// Always remove from ramCache even if we failed adding it to the block cache above.
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
if (ramCacheEntry != null) {
heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
} else if (bucketEntries[i] != null){
// Block should have already been evicted. Remove it and free space.
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
try {
lock.writeLock().lock();
if (backingMap.remove(key, bucketEntries[i])) {
blockEvicted(key, bucketEntries[i], false);
}
} finally {
lock.writeLock().unlock();
}
}
}
//(6)已使用的内存大于总内存的95%则进行内存释放
long used = bucketAllocator.getUsedSize();
if (used > acceptableSize()) {
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
}
return;
}
读取流程:
1. 首先从RAMCache中查找。对于还没有来得及写入到bucket的缓存block,一定存储在RAMCache中。
2. 如果在RAMCache中没有找到,再在BackingMap中根据blockKey找到对应物理偏移地址offset。
3. 根据物理偏移地址offset可以直接从内存中查找对应的block数据。
getBlock:
从Bucketcache中获取对应的block的入口方法。
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
if (!cacheEnabled) {
return null;
}
//(1)从ramCache中获取对应的Block,获取到了则将统计缓存击中加1,并返回。
RAMQueueEntry re = ramCache.get(key);
if (re != null) {
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
}
re.access(accessCount.incrementAndGet());
return re.getData();
}
//(2)从backingMap中获取对应Block在存储介质上的offset。
BucketEntry bucketEntry = backingMap.get(key);
if (bucketEntry != null) {
long start = System.nanoTime();
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
try {
lock.readLock().lock();
//(3)在获取锁期间,有可能backingMap发生了变化,所以这里需要再次校验。
if (bucketEntry.equals(backingMap.get(key))) {
int len = bucketEntry.getLength();
ByteBuffer bb = ByteBuffer.allocate(len);
//(4)根据backingMap中记录的offset以及len,对应的存储介质中读取指定的字节。
int lenRead = ioEngine.read(bb, bucketEntry.offset());
if (lenRead != len) {
throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
}
CacheableDeserializer<Cacheable> deserializer =
bucketEntry.deserializerReference(this.deserialiserMap);
//(5)将从介质中读取出来的字节序列化为Cacheable 对象。
Cacheable cachedBlock = deserializer.deserialize(bb, true);
long timeTaken = System.nanoTime() - start;
//(6)统计缓存命中次数。
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
cacheStats.ioHit(timeTaken);
}
bucketEntry.access(accessCount.incrementAndGet());
if (this.ioErrorStartTime > 0) {
ioErrorStartTime = -1;
}
return cachedBlock;
}
} catch (IOException ioex) {
LOG.error("Failed reading block " + key + " from bucket cache", ioex);
checkIOErrorIsTolerated();
} finally {
lock.readLock().unlock();
}
}
//(7)ramCache和backingMap中都没有找到对应的Block,则将未命中统计加1.
if (!repeat && updateCacheMetrics) {
cacheStats.miss(caching, key.isPrimary(), key.getBlockType());
}
return null;
}
使用配置
BucketCache分为三种存储介质:onHeap、offHeap、file。下面对这三种配置进行分别说明。
onHeap模式
<hbase.bucketcache.ioengine>heap</hbase.bucketcache.ioengine>
//bucketcache占用整个jvm内存大小的比例
<hbase.bucketcache.size>0.4</hbase.bucketcache.size>
//bucketcache在combinedcache中的占比
<hbase.bucketcache.combinedcache.percentage>0.9</hbase.bucketcache.combinedcache.percentage>
offHeap模式
<hbase.bucketcache.ioengine>offheap</hbase.bucketcache.ioengine>
<hbase.bucketcache.size>0.4</hbase.bucketcache.size>
<hbase.bucketcache.combinedcache.percentage>0.9</hbase.bucketcache.combinedcache.percentage>
file模式
<hbase.bucketcache.ioengine>file:/cache_path</hbase.bucketcache.ioengine>
//bucketcache缓存空间大小,单位为MB
<hbase.bucketcache.size>10 * 1024</hbase.bucketcache.size>
//高速缓存路径
<hbase.bucketcache.persistent.path>file:/cache_path</hbase.bucketcache.persistent.path>
今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。