leveldb源码解析之二Put实现

导读

我们还是按照上一篇博文的那三个函数顺序往下走:PutGetDelete,以自上而下的视角一点一点剖析leveldb。那么本篇就主要讲Put的实现。Put的实现讲的还是偏上层的,因为Put可能引发compaction,而compaction等细节内容还是需要专门一篇文章才能描述清楚。那么我们开始吧!

流程

还记得我们上一篇讲解的Put的流程是咋样的吗?

Put

Put操作首先将操作记录写入log文件,然后写入memtable,返回写成功。整体来看是这样,但是会引发下面的问题:

1. 写log的时候是实时刷到磁盘的吗?
2. 写入的时候memtable过大了咋办?
3. 同时多个线程并发写咋办?
......

下面的分析中就会面对这些问题,有些答案很清晰,有些涉及到超级多细节。

step by step

在开始之前,我们知道Write操作是要记录到log文件中的。那么一个记录它的格式是怎样的呢?看图:

Paste_Image.png

这里特别解释一下,Delete操作也是通过Put实现的,只是图中的类型字段是0,而正常Write的操作类型是1,由此区分写操作和删除操作!

step 1

Put interface

很简单吧。这里讲解一下那三个参数:

  1. WriteOptions:提供一些写操作的配置项,例如要不要写log的时候马上flush磁盘
  2. key和value就是对应的keyValue,slice只是作者自己封装的char数组存储数据而已。<b>大牛喜欢把所有东西都封装一下,赋予数据结构意义!</b>这在大的工程里面是很有意义的,既方便操作,也方便思考(这样就不用思考底层的真实的char数组还是啥)。

step 2

Paste_Image.png

这里的WriteBatch的意思就是多个写并起来操作的意思。这里可以学习一个思维,支持多key写,单key写就是key为1的特例。底层完全按照WriteBatch的实现方式


step 3

下面是真实的各种复杂逻辑的操作了。下面的代码有点长,所以基本都在代码中注释讲解了。部分调用函数的细节下面会继续,请耐心欣赏,毕竟人家也不是几分钟就写出来了,哈哈:


Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync; //log是否马上刷到磁盘,如果false会有数据丢失的风险
  w.done = false;        //标记写入是否完成

  //串行化writer,如果有其他writer在执行则进入队列等待被唤醒执行
  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }

  //writer的任务可能被其他writer帮忙执行了
  //如果是则直接返回
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == NULL); //写入前的各种检查
  //是否该停写,是否该切memtable,是否该compact

  //获取本次写入的版本号,其实就是个uint64
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  //这里writer还是队列中第一个,由于下面会队列前面的writers也可能合并起来,所以last_writer指针会指向被合并的最后一个writer
  if (status.ok() && my_batch != NULL) {                          // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);          //这里会把writers队列中的其他适合的写操作一起执行
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);  //把版本号写入batch中
    last_sequence += WriteBatchInternal::Count(updates);          //updates如果合并了n条操作,版本号也会向前跳跃n

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates)); //写log了,不容易啊,等那么久
      bool sync_error = false;
      if (status.ok() && options.sync) { //马上要flush到磁盘
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);  //恩,是时候插入memtable中了
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) { //在这里唤醒已经帮它干完活的writer线程,让它早早回家,别傻傻等了
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) { //唤醒还没干活的等待的第一位,叫醒它自己去干活,老子干完了,哈哈,典型FIFO,公平
    writers_.front()->cv.Signal();
  }

  return status;
}

嗯,核心的write代码已经在这里欣赏完了。基本逻辑分为五步:

  1. 队列化请求
  1. 写入的前期检查和保证
  2. 按格式组装数据为二进制
  3. 写入log文件和memtable
  4. 唤醒队列的其他人去干活,自己返回

这个函数的内容就这么多,但是有些内容调用我们还没有讲,下面会选择部分重中之重的讲。例如添加前的检查保证,compaction。

step 4 MakeRoomForWrite函数

我们在step 3的时候知道了MakeRoomForWrite函数的存在,就是写入前的各种检查,例如:

  1. memtable满了达到大小的限制了没?没有直接可以插入
  1. memtable达到太大之后需要切换为Imuable memtable,这时候需要检查旧的Imuable memtable已经load到磁盘没
  2. 由于Imuable需要load入磁盘,level 0的文件数超过限制没
    。。。。。。

在进入逻辑代码之前需要解释一个概念:<b>compaction</b>
<b>compaction</b>是指:Imuable memtablecompact到level 0 或者level n compact 到level n + 1, 就是数据向搞level流动的操作,这个操作会保证数据在level中是全局有序的,除了level 0.

这里只是简单列举几个需要考虑的逻辑,具体检查看下面MakeRoomForWrite的实现:

Status DBImpl::MakeRoomForWrite(bool force) { // force代表需要强制compaction与否
  mutex_.AssertHeld();        //保证进入该函数前已经加锁
  assert(!writers_.empty());
  bool allow_delay = !force;  //allow_delay代表compaction可以延后
  Status s;
  while (true) {
    if (!bg_error_.ok()) {    //如果后台任务已经出错,直接返回错误
      // Yield previous error
      s = bg_error_;
      break;
    } else if (//level0的文件数限制超过8,睡眠1ms,简单等待后台任务执行
        allow_delay &&
        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { //level 0超过8个sst文件了

      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { //memtable的大小限制,default为4MB
      // There is room in current memtable
      break;
    } else if (imm_ != NULL) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      Log(options_.info_log, "Current memtable full; waiting...\n"); //等待之前的imuable memtable完成compact到level0
      bg_cv_.Wait();                                                  // bg_cv_为后台程序的条件变量,后台程序就是做compact的
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { //level0的文件数超过12,强制等待
      // There are too many level-0 files.
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      bg_cv_.Wait();
    } else {                                                            //下面是切换到新的memtable和触发旧的进行compaction
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = NULL;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); //生成新的log文件
      if (!s.ok()) {
        // Avoid chewing through file number space in a tight loop.
        versions_->ReuseFileNumber(new_log_number);
        break;
      }

      //删除旧的log对象分配新的
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_; //切换memtable到Imuable memtable
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;   // Do not force another compaction if have room
      MaybeScheduleCompaction();  //如果需要进行compaction,后台执行
    }
  }
  return s;
}

由上面的代码可知,这是一个while循环,直到确保数据库可以写入了才会返回。
流程大概是:

1. 后台任务有无出现错误?出现错误直接返回错误。(compaction是后台线程执行的)
2. level 0 的文件数超过8个,则等待1s再继续执行,因为level 0的文件数目需要严格控制
3. 如果memtable的大小小于4MB(默认值,可以修改),直接返回可以插入;
4. 到达4说明memtable已经满了,这时候需要切换为Imuable memtable。所以这时候需要等待旧的Imuable memtable compact到level 0,进入等待
5. 到达5说明旧的Imuable memtable已经compact到level 0了,这时候假如level 0的文件数目到达了12个,也需要等待
6. 到达6说明旧的Imuable memtable已经compact到磁盘了,level 0的文件数目也符合要求,这时候就可以生成新的memtable用于数据的写入了。

由此我们知道这里并没有涉及到具体的compaction是如何进行的,只是保证memtable可以写入和可能触发后台的compaction。至于compaction的逻辑,后面需要专门的一篇文章才能说明清楚。

好了,MakeRoomForWrite函数就说这么多。

step 5 BuildBatchGroup函数

还有一个函数我觉得可以说一下,那就是BuildBatchGroup函数。前面我们说到了writer会被队列化,而只有排在第一位的writer才会往下执行。而writer都是乐于助人的,它有可能会把排在它后面的writer的活也拿过来一起干了。那么这段帮忙的逻辑就是在BuildBatchGroup函数中。
请看这个函数做了写啥:

WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  assert(!writers_.empty());
  Writer* first = writers_.front();  //当前执行的writer兄弟
  WriteBatch* result = first->batch;
  assert(result != NULL);

  size_t size = WriteBatchInternal::ByteSize(first->batch); // 计算要自己要写入的byte大小

  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
      
  //计算maxsize,避免自己帮太多忙了导致写入数据过大    
  size_t max_size = 1 << 20; 
  if (size <= (128<<10)) {
    max_size = size + (128<<10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) { //sync类型不同,你的活我不帮你了
      // Do not include a sync write into a batch handled by a non-sync write.
      break;
    }

    if (w->batch != NULL) {
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {      //这个帮忙数据量过大了,我也不帮忙了
        // Do not make batch too big
        break;
      }

      // Append to *result
      if (result == first->batch) {
        // Switch to temporary batch instead of disturbing caller's batch
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch); //来吧,你的活我可以帮你干了
      }
      WriteBatchInternal::Append(result, w->batch);
    }
    *last_writer = w;
  }
  return result;
}

看上面代码,这段代码的逻辑比较简单。就是遍历队列后面的writer们,一直到遇到帮不了忙的writer就返回了。那么帮不帮忙的标准是啥?
两个标准:

  1. sync类型是否一样(我不需要马上flush到磁盘而你要,你的活还是自己干吧)
  1. 写入的数据量是不是过大了?(避免单次写入数据量太大)

只要没有符合这两个限制条件,就可以帮忙,合并多条write操作为一条操作!

总结

哇呜,看了这么多代码,是不是对level db的write操作有了一定的了解了。其实就是文章开始说的,

  1. 先写入log文件
  1. 再写入memtable

这两个逻辑而已。然后在写入之前需要保证可以写入啊,不然内存会跪的。然后这个检查会导致后台一系列的compaction可能。
至于什么多个操作合并为一个操作,完全是优化,为了更高的性能!

记住了,我们讲完了Put操作,其实Delete操作也已经讲完了。为什么?不知道的好好看上面的正文内容吧。下一篇文章,我们将迎来Get函数,我们会一起进入level db的查找数据之旅!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,560评论 18 399
  • LevelDB是Google传奇工程师Jeff Dean和Sanjay Ghemawat开源的KV存储引擎,无论从...
    CatKang阅读 4,811评论 5 25
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,382评论 25 707
  • 我喜欢的人,可以不帅不富 却拥有一颗强大的心 他明白自己的责任 也会为了家人去付出一切 我喜欢的人,可以不冷不热 ...
    梅骨赏阅读 147评论 2 1