Consul Raft协议源码分析上篇——日志复制

背景

前面一篇文章我们描述了raft 协议的实现数据一致性的基础知识,有了前面的基础知识背景,能很好的帮助我们理解consul 基于raft算法的实现,理论指导实践,永远不过时。

我们以consul key value 的一个例子来理清整个流程,以写一个key value来看,是我们日常开发中用的最多的一个例子,让我们来一起看看consul server到底是怎么实现的,背后的逻辑是什么。

Consul Agent 请求

客户端发起一个put key value的http请求,由kvs_endpoint.go 的KVSEndpoint func 处理,put的方法会路由给KVSPut 处理,除了一些校验外和请求标识,比如是否有获取锁acquire或者release,这里提下一个检查,就是value的大小检查,和web 容器一样检查防止请求数据太大,可以通过参数kv_max_value_size 控制,如果超过返回状态码413,标准的http 状态码。

检查都OK后,consul agent就开始请求consul server了,当然还是rpc 操作

// Copy the value
buf := bytes.NewBuffer(nil)
// 这里才开始读请求的数据。
if _, err := io.Copy(buf, req.Body); err != nil {
   return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()

// Make the RPC
var out bool
// 开始请求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
   return nil, err
}

// Only use the out value if this was a CAS
// 没有出错的话,这里就成功返回了
if applyReq.Op == api.KVSet {
        return true, nil
}

请求的是consul 下面的kvs_endpoint.go 下面的Apply 方法,所以我们的重点要来了

Server Apply

consul server的 apply方法,代码还是show下,这里还有两个逻辑说明下。

// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
   // 检查机房dc是否匹配,不是就转发到对应到dc的server。
   if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
      return err
   }
   // 中间不重要的去了,省得太多...
   // 对权限token 应用ACL policy
   ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
   if err != nil {
      return err
   }
   if !ok {
      *reply = false
      return nil
   }

   // Apply the update.
   // 这里是开启raft 算法的之旅的入口。
   resp, err := k.srv.raftApply(structs.KVSRequestType, args)
   if err != nil {
      k.logger.Error("Raft apply failed", "error", err)
      return err
   }
   if respErr, ok := resp.(error); ok {
      return respErr
   }

   // Check if the return type is a bool.
   if respBool, ok := resp.(bool); ok {
      *reply = respBool
   }
   return nil
}

在真正开始执行raft 算法前,主要做了如下两件事:

先检查了dc是否是当前dc,如果不是会路由到正确的dc,这页是consul 支持多机房部署的一个很好的特性,路由很方便,这也是多机房部署consul是很好的选择。
检查是否启用了acl策略,如果有,需要检查,没有对应的token是不能操作的。
上面2件事都没有问题后,开始执行raft apply操作,我们真正感兴趣的就要出来了,下面让我们开始盘apply,

经过一盘,在真正执行raft前,consul还做了一些加工,不能蛮搞,是非常严谨的,上面通过raftApply,经过几跳后,会执行到raftApplyWithEncoder方法,这里做的工作是很重要的,所以还是拿出来说下,是涨知识的地方,代码如下:

// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
   if encoder == nil {
      return nil, fmt.Errorf("Failed to encode request: nil encoder")
   }
   // 对请求编码。
   buf, err := encoder(t, msg)
   if err != nil {
      return nil, fmt.Errorf("Failed to encode request: %v", err)
   }

   // Warn if the command is very large
   if n := len(buf); n > raftWarnSize {
      s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
   }

   var chunked bool
   var future raft.ApplyFuture
   switch {
   case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
      //请求的数据大小如果小于512 * 1024 即512k,则做一次log执行。
      future = s.raft.Apply(buf, enqueueLimit)
   default:
      //超过了512k,则需要分chunk,每个chunk做为一个log来应用。
      chunked = true
      //这里就是每个log一次future。
      future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
   }

   //阻塞,等待raft协议完成。
   if err := future.Error(); err != nil {
      return nil, err
   }

   resp := future.Response()

   //...
   return resp, nil
}

这里通过注释,你也可以看出,主要关心4件事情:

  1. 把请求编码,这个不是我们的重点,后面有时间可以单独分析。
  2. 检查是否要拆包,是否要拆成多个raft command 来执行,这里有个参数控制,SuggestedMaxDataSize consul 默认设置是512k,如果超过这个则拆,否则可以一次raft 协议搞定。
  3. 有一个超时时间,默认是30秒,后面会用到。
  4. 最后事阻塞等待完成,是logfuture。

为什么要拆包

这些事raft 算法不会提的,这个事工程实践才会有的一些优化,此时你也和我一样,为啥要做这个优化呢,有什么好处,解决什么问题,这是我们做一个架构师必须要有的思考。

consul的官方就给出了解释,所以阅读优秀的代码就是一种享受,看注释就能知道为啥这样做,下面是他们对SuggestedMaxDataSize的注释:

// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024

理解就是rpc的请求io 不能太大,因为还有非常重要的心跳包,如果发心跳包出现延迟,就而影响leader的稳定,这个事一个非常重要的优化措施。

说完了拆包优化逻辑后,我们看下ApplyLog的逻辑,代码如下:

// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
   metrics.IncrCounter([]string{"raft", "apply"}, 1)

   var timer <-chan time.Time
   if timeout > 0 {
      timer = time.After(timeout)
   }

   // Create a log future, no index or term yet
   logFuture := &logFuture{
      log: Log{
         Type:       LogCommand,
         Data:       log.Data,
         Extensions: log.Extensions,
      },
   }
   logFuture.init()

   select {
   case <-timer:
      return errorFuture{ErrEnqueueTimeout}
   case <-r.shutdownCh:
      return errorFuture{ErrRaftShutdown}
   case r.applyCh <- logFuture:
      return logFuture
   }
}

这里主要关心这个applyCh channel,consul 在初始化leader的时候给创建的一个无缓冲区的通道,所以如果leader的协程在干其他的事情,那这个提交log就阻塞了,时间最长30s,写入成功,就返回了logFuture,也就事前面我们看到future的阻塞。

到这里整个consul leader server的插入请求从接受到阻塞等待的逻辑就完成了,consul server 有个核心的go routine 在watch 这个applyCh,从定义可以看出,是应用raft log的channel。

分组提交

consul leader 在初始化完成后,会启动一个核心的go routine,执行rpc,leader 验证,这个我们前面分析过,还有一个最重要的就事raft log应用了,代码如下:

case newLog := <-r.applyCh://这个是前面我们提交log future的
   if r.getLeadershipTransferInProgress() {
      r.logger.Debug(ErrLeadershipTransferInProgress.Error())
      newLog.respond(ErrLeadershipTransferInProgress)
      continue
   }
   // Group commit, gather all the ready commits
   ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
   for i := 0; i < r.conf.MaxAppendEntries; i++ {
      select {
      case newLog := <-r.applyCh:
         ready = append(ready, newLog)
      default:
         break GROUP_COMMIT_LOOP
      }
   }

   // Dispatch the logs
   if stepDown {
      // we're in the process of stepping down as leader, don't process anything new
     //如果发现我们不是leader了,直接响应失败 
     for i := range ready {
         ready[i].respond(ErrNotLeader)
      }
   } else {
      r.dispatchLogs(ready)
   }

这里的一个重要的点就是组提交,我们在基础篇提过,这里就是实现了,就是读applyCh的log,这个里做了组提交的优化,最多一次发送MaxAppendEntries个,默认位64个,如果并发高的情况下,这里是能读到一个batch的,或者没有了,就不等了,这里是不能等的,因为raft算法要保证顺序,这里是单线程出来的,下面就开始dispatch log了,代码如下:

// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
   now := time.Now()
   defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)

   //获取当前leader的任期编号,这个不会重复是递增的,如果有心的leaer了,会比这个大。
   term := r.getCurrentTerm()
   //log 编号,写一个加1
   lastIndex := r.getLastIndex()

   n := len(applyLogs)
   logs := make([]*Log, n)
   metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))

   //设置每个log的编号和任期
   for idx, applyLog := range applyLogs {
      applyLog.dispatch = now
      lastIndex++
      applyLog.log.Index = lastIndex
      applyLog.log.Term = term
      logs[idx] = &applyLog.log
      r.leaderState.inflight.PushBack(applyLog)
   }

   // Write the log entry locally
   // log先写入本地持久化,consul大部分的版本底层用的是boltdb,boltdb
   // 是一个支持事物的数据库,非常方便,这里会涉及io操作。
   if err := r.logs.StoreLogs(logs); err != nil {
      r.logger.Error("failed to commit logs", "error", err)
      //如果写失败,则直接响应,前面的future阻塞就会唤醒。
      for _, applyLog := range applyLogs {
         applyLog.respond(err)
      }
      //更新自己为follower
      r.setState(Follower)
      return
   }
   //这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制
     //这里是记录下leader自己的结果,因为过半leader也算一份。
   r.leaderState.commitment.match(r.localID, lastIndex)

   // Update the last log since it's on disk now
   // 更新最新log entry的编号,写到这里了。
   r.setLastLog(lastIndex, term)

   // Notify the replicators of the new log
   // 开始异步发送给所有的follower,这个leader主go routine的活就干完了。
   for _, f := range r.leaderState.replState {
      asyncNotifyCh(f.triggerCh)
   }
}

这个dispatchlog的逻辑注释里基本写清楚了,核心的go routine 经过一顿操作后,最主要就是两点:

本地持久化log

记录自己写成功,因为计算过半时,leader自己这一份也算在里面,这个很重要。
又异步交给了replicate go routine来处理,他就去继续去分组提交了,大概率如此循环往复,不知疲倦的给replication routine 派活。

复制GoRoutine

replication routine 会监听triggerCh channel,接受领导的任务,这个比较简单,就开始真正发给各自的follower了,代码如下:

case <-s.triggerCh:
   lastLogIdx, _ := r.getLastLog()
   //这个后面没有异步了,就是这个rpc调用,判断
   shouldStop = r.replicateTo(s, lastLogIdx)

replicateTo 就是rpc调研,真正远程rpc给follower,等待响应。对于响应的结果怎么处理,怎么真正应用到本地,还没有分析,带下一篇提交篇,因为插入请求还wait在哪里呢是不是。

总结

写着写着文章又很长了,如果你读到了这里,就给我点个赞,关注下,我会马不停蹄的开始下一篇。

文本注意从consul leader server接受请求,做一些检查,token校验,分配发送,然后异步交给了leader的核心goroutine,核心go routine通过分组合并,计算好log 编号和任期term。就交给了replication routine,replication routine 把log 先本地持久化,然后异步发给所有的follower,等待他们的结果,到底是commit 应用到本地状态机怎么实现的,下面一篇见,欢迎关注和转发。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容