导读
我们还是按照上一篇博文的那三个函数顺序往下走:Put
,Get
和Delete
,以自上而下的视角一点一点剖析leveldb。那么本篇就主要讲Put的实现。Put的实现讲的还是偏上层的,因为Put可能引发compaction,而compaction等细节内容还是需要专门一篇文章才能描述清楚。那么我们开始吧!
流程
还记得我们上一篇讲解的Put的流程是咋样的吗?
Put操作首先将操作记录写入log文件,然后写入memtable,返回写成功。整体来看是这样,但是会引发下面的问题:
1. 写log的时候是实时刷到磁盘的吗?
2. 写入的时候memtable过大了咋办?
3. 同时多个线程并发写咋办?
......
下面的分析中就会面对这些问题,有些答案很清晰,有些涉及到超级多细节。
step by step
在开始之前,我们知道Write操作是要记录到log文件中的。那么一个记录它的格式是怎样的呢?看图:
这里特别解释一下,Delete操作也是通过Put实现的,只是图中的类型字段是0,而正常Write的操作类型是1,由此区分写操作和删除操作!
step 1
很简单吧。这里讲解一下那三个参数:
- WriteOptions:提供一些写操作的配置项,例如要不要写log的时候马上flush磁盘
- key和value就是对应的keyValue,slice只是作者自己封装的char数组存储数据而已。<b>大牛喜欢把所有东西都封装一下,赋予数据结构意义!</b>这在大的工程里面是很有意义的,既方便操作,也方便思考(这样就不用思考底层的真实的char数组还是啥)。
step 2
这里的WriteBatch的意思就是多个写并起来操作的意思。这里可以学习一个思维,支持多key写,单key写就是key为1的特例。底层完全按照WriteBatch的实现方式
step 3
下面是真实的各种复杂逻辑的操作了。下面的代码有点长,所以基本都在代码中注释讲解了。部分调用函数的细节下面会继续,请耐心欣赏,毕竟人家也不是几分钟就写出来了,哈哈:
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync; //log是否马上刷到磁盘,如果false会有数据丢失的风险
w.done = false; //标记写入是否完成
//串行化writer,如果有其他writer在执行则进入队列等待被唤醒执行
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
//writer的任务可能被其他writer帮忙执行了
//如果是则直接返回
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL); //写入前的各种检查
//是否该停写,是否该切memtable,是否该compact
//获取本次写入的版本号,其实就是个uint64
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
//这里writer还是队列中第一个,由于下面会队列前面的writers也可能合并起来,所以last_writer指针会指向被合并的最后一个writer
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer); //这里会把writers队列中的其他适合的写操作一起执行
WriteBatchInternal::SetSequence(updates, last_sequence + 1); //把版本号写入batch中
last_sequence += WriteBatchInternal::Count(updates); //updates如果合并了n条操作,版本号也会向前跳跃n
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); //写log了,不容易啊,等那么久
bool sync_error = false;
if (status.ok() && options.sync) { //马上要flush到磁盘
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_); //恩,是时候插入memtable中了
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
while (true) { //在这里唤醒已经帮它干完活的writer线程,让它早早回家,别傻傻等了
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) { //唤醒还没干活的等待的第一位,叫醒它自己去干活,老子干完了,哈哈,典型FIFO,公平
writers_.front()->cv.Signal();
}
return status;
}
嗯,核心的write代码已经在这里欣赏完了。基本逻辑分为五步:
- 队列化请求
- 写入的前期检查和保证
- 按格式组装数据为二进制
- 写入log文件和memtable
- 唤醒队列的其他人去干活,自己返回
这个函数的内容就这么多,但是有些内容调用我们还没有讲,下面会选择部分重中之重的讲。例如添加前的检查保证,compaction。
step 4 MakeRoomForWrite函数
我们在step 3的时候知道了MakeRoomForWrite函数的存在,就是写入前的各种检查,例如:
- memtable满了达到大小的限制了没?没有直接可以插入
- memtable达到太大之后需要切换为Imuable memtable,这时候需要检查旧的Imuable memtable已经load到磁盘没
- 由于Imuable需要load入磁盘,level 0的文件数超过限制没
。。。。。。
在进入逻辑代码之前需要解释一个概念:<b>compaction</b>
<b>compaction</b>是指:Imuable memtablecompact到level 0 或者level n compact 到level n + 1, 就是数据向搞level流动的操作,这个操作会保证数据在level中是全局有序的,除了level 0.
这里只是简单列举几个需要考虑的逻辑,具体检查看下面MakeRoomForWrite的实现:
Status DBImpl::MakeRoomForWrite(bool force) { // force代表需要强制compaction与否
mutex_.AssertHeld(); //保证进入该函数前已经加锁
assert(!writers_.empty());
bool allow_delay = !force; //allow_delay代表compaction可以延后
Status s;
while (true) {
if (!bg_error_.ok()) { //如果后台任务已经出错,直接返回错误
// Yield previous error
s = bg_error_;
break;
} else if (//level0的文件数限制超过8,睡眠1ms,简单等待后台任务执行
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { //level 0超过8个sst文件了
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { //memtable的大小限制,default为4MB
// There is room in current memtable
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
Log(options_.info_log, "Current memtable full; waiting...\n"); //等待之前的imuable memtable完成compact到level0
bg_cv_.Wait(); // bg_cv_为后台程序的条件变量,后台程序就是做compact的
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { //level0的文件数超过12,强制等待
// There are too many level-0 files.
Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
} else { //下面是切换到新的memtable和触发旧的进行compaction
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); //生成新的log文件
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
//删除旧的log对象分配新的
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_; //切换memtable到Imuable memtable
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction(); //如果需要进行compaction,后台执行
}
}
return s;
}
由上面的代码可知,这是一个while循环,直到确保数据库可以写入了才会返回。
流程大概是:
1. 后台任务有无出现错误?出现错误直接返回错误。(compaction是后台线程执行的)
2. level 0 的文件数超过8个,则等待1s再继续执行,因为level 0的文件数目需要严格控制
3. 如果memtable的大小小于4MB(默认值,可以修改),直接返回可以插入;
4. 到达4说明memtable已经满了,这时候需要切换为Imuable memtable。所以这时候需要等待旧的Imuable memtable compact到level 0,进入等待
5. 到达5说明旧的Imuable memtable已经compact到level 0了,这时候假如level 0的文件数目到达了12个,也需要等待
6. 到达6说明旧的Imuable memtable已经compact到磁盘了,level 0的文件数目也符合要求,这时候就可以生成新的memtable用于数据的写入了。
由此我们知道这里并没有涉及到具体的compaction是如何进行的,只是保证memtable可以写入和可能触发后台的compaction。至于compaction的逻辑,后面需要专门的一篇文章才能说明清楚。
好了,MakeRoomForWrite函数就说这么多。
step 5 BuildBatchGroup函数
还有一个函数我觉得可以说一下,那就是BuildBatchGroup函数。前面我们说到了writer会被队列化,而只有排在第一位的writer才会往下执行。而writer都是乐于助人的,它有可能会把排在它后面的writer的活也拿过来一起干了。那么这段帮忙的逻辑就是在BuildBatchGroup函数中。
请看这个函数做了写啥:
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
assert(!writers_.empty());
Writer* first = writers_.front(); //当前执行的writer兄弟
WriteBatch* result = first->batch;
assert(result != NULL);
size_t size = WriteBatchInternal::ByteSize(first->batch); // 计算要自己要写入的byte大小
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
//计算maxsize,避免自己帮太多忙了导致写入数据过大
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) { //sync类型不同,你的活我不帮你了
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
if (w->batch != NULL) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) { //这个帮忙数据量过大了,我也不帮忙了
// Do not make batch too big
break;
}
// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch); //来吧,你的活我可以帮你干了
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}
看上面代码,这段代码的逻辑比较简单。就是遍历队列后面的writer们,一直到遇到帮不了忙的writer就返回了。那么帮不帮忙的标准是啥?
两个标准:
- sync类型是否一样(我不需要马上flush到磁盘而你要,你的活还是自己干吧)
- 写入的数据量是不是过大了?(避免单次写入数据量太大)
只要没有符合这两个限制条件,就可以帮忙,合并多条write操作为一条操作!
总结
哇呜,看了这么多代码,是不是对level db的write操作有了一定的了解了。其实就是文章开始说的,
- 先写入log文件
- 再写入memtable
这两个逻辑而已。然后在写入之前需要保证可以写入啊,不然内存会跪的。然后这个检查会导致后台一系列的compaction可能。
至于什么多个操作合并为一个操作,完全是优化,为了更高的性能!
记住了,我们讲完了Put操作,其实Delete操作也已经讲完了。为什么?不知道的好好看上面的正文内容吧。下一篇文章,我们将迎来Get函数,我们会一起进入level db的查找数据之旅!