Consul源码分析——Raft实现

背景

raft协议是维持整个consul生态中最重要的一个协议,它负责维护consul server之间的强一致性

作为基于 Paxos 的一种变种算法,它简化了状态,通过加入时间差、领导选举等概念使一致性算法变得更简单、易懂

相关名词:

  • Leader election: 领导选举,是目前落地的一致性算法不可避免的一个步骤,为了达到一致性,必须要已一个共同认可的节点做出所有操作。而raft协议的节点都会处于三种状态之一:
    • Leader: 操作的真正处理者,它负责将改动确认生效,并将其同步到其他节点。
    • Follower: 负责将来自 Leader 的改动请求写入本地 Log,返回成功。
    • Candidate: 如果 Leader 发生了故障,没有收到心跳的 Followers 们会进入 Candidate 状态,并开始选主,并保持该状态直到选主结束。
  • Log Replication: 当 Leader 被选举出来以后,所有的写请求都必须要到 Leader 执行,Leader 会将其作为 Log entry 追加到日志中,然后给其他 Follower 发送请求,当绝大部分的 ((N/2)+1)的 Follower replicated 成功后,就代表该写入事件成功了,就会返回结果状态到客户端。
  • Log Compaction: 在实际的应用场景中,因为磁盘空间是有限的,日志不能无限地增长,否则即使系统需要重启也需要耗费大量的时间。所以, raft 会对节点进行 snapshot 操作,执行成功后,会将 snapshot 之前的日志丢弃掉。

更详细的raft工作流程参见这个动画


关键数据结构

Raft node结构体

type Raft struct {
    raftState //保存状态变量结构体
    protocolVersion ProtocolVersion
    applyCh chan *logFuture //负责异步发送logs到主线程去提交以及应用到有限状态机
    conf Config //提供Raft初始化所需配置信息
    fsm FSM //负责客户端执行命令的有限状态机
    fsmMutateCh chan interface{} //负责发送状态改变更新到有限状态机
    fsmSnapshotCh chan *reqSnapshotFuture //负责触发一个新的快照
    lastContact     time.Time //最后一次和主通信时间,用来计算主是否还可用
    lastContactLock sync.RWMutex
    leader     ServerAddress //当前的主的地址
    leaderLock sync.RWMutex
    leaderCh chan bool //负责通知主的改变
    leaderState leaderState //只有状态为leader的时候使用该字段
    localID ServerID //储存自己的server ID,避免发送RPC请求给自己
    localAddr ServerAddress //储存自己的地址
    logger *log.Logger //日志
    logs LogStore //可靠地日志存储
    configurationChangeCh chan *configurationChangeFuture //负责通知leader做配置更替
    configurations configurations //从log/snapshot里记录最新的配置和最新的committed配置
    rpcCh <-chan RPC //传输层RPC通道
    shutdown     bool
    shutdownCh   chan struct{} //负责退出
    shutdownLock sync.Mutex //并发锁
    snapshots SnapshotStore //负责存储和读取snapshots
    userSnapshotCh chan *userSnapshotFuture //负责响应用户发起的快照操作
    userRestoreCh chan *userRestoreFuture //负责响应用户发起的恢复操作
    stable StableStore //状态持久化存储,负责持久化raftState的部分字段
    trans Transport //传输层
    verifyCh chan *verifyFuture //发送异步确认消息到主线程,确认当前仍然是主
    configurationsCh chan *configurationsFuture //安全的获取主线程以外发送过来的配置数据
    bootstrapCh chan *bootstrapFuture //用于尝试从主线程以外触发初始化操作
    observersLock sync.RWMutex //保护观察者的锁
    observers     map[uint64]*Observer //观察者列表
}

raftState 结构体

type raftState struct {
    currentTerm uint64 //当前项,StableStore的缓存
    commitIndex uint64 //最高的已提交的日志入口
    lastApplied uint64 //最新以应用到有限状态机的日志
    lastLock sync.Mutex //保护接下来的四个字段的锁
    lastSnapshotIndex uint64 //缓存的最新的快照索引
    lastSnapshotTerm  uint64 //缓存的快照项
    lastLogIndex uint64 //LogStore中缓存的最新的日志索引
    lastLogTerm  uint64 //LogStore中缓存的最新的日志项
    routinesGroup sync.WaitGroup //记录运行中的goroutines
    state RaftState //当前的RaftState(0-Follower 1-Candidate 2-Leader 3-Shutdown)
}

出生点

raft作为Consul server之间的共识机制,在agent创建时就应该初始化,因此以agent.go为入口:

  1. 在agent的启动方法中:
func (a *Agent) Start() error {}

根据agent的运行时配置(RuntimeConfig)中的ServerMode参数,若该字段为true则

  1. 创建新的Consul server
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
  1. 该函数会根据传入配置构造一个新的Consul server,内部会初始化当前server的raft server:
if err := s.setupRaft(); err != nil {
    s.Shutdown()
    return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
  1. 在setupRaft()方法中做了一系列的raft相关操作:
    1. 创建一个有限状态机
    2. 创建一个raft的传输层
    3. 根据情况创建后端(all in-memory或者full disk-based)
    4. 根据情况初始化(bootstrap字段置为true或者为dev模式)
    5. 创建一个channel可靠的传输leader的通知
    6. 将前面创建的状态机,日志,存储后端,传输层等组装为server的raft node,并返回,完成raft的初始化创建:
  s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
  if err != nil {
      return err
  }
  return nil
  1. NewRaft()函数负责创建raft,并运行:
    1. 尝试恢复老的状态,如果存在(snapshots, logs, peers)
    2. 确保server地址和ID有效
    3. 创建Raft node 结构体实例
    4. 初始化为一个follower
    5. 尝试从集群中恢复数据(term,log,snapshot)
    6. 设置传输层心跳handler
    7. 开启后台工作,并返回。goFunc()方法开启一个goroutine并适当的处理一个routine的开启,增加,存在,减少之间的竞争。
  r.goFunc(r.run)
  r.goFunc(r.runFSM)
  r.goFunc(r.runSnapshots)
  return r, nil

运行流程

r.run()

  1. r.run()方法,开启一个长期运行的goroutine跑Raft有限状态机:
    1. 在收到退出信号之前,一直在Follower,Candidate,Leader三种状态之间转换

若当前作为Follower

1. 创建一个定时器设定超时时间为配置的HeartbeatTimeout参数的1~2倍里面随机的值`heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)`
1. Follower可以处理RPC请求
1. Follower拒绝configurationChangeCh、applyCh、verifyCh、userRestoreCh通道传来的请求,(这些操作只能是Leader来完成),统一返回`v.respond(ErrNotLeader)`
1. Follower可以处理配置查询请求:
```
case c := <-r.configurationsCh:
        c.configurations = r.configurations.Clone()
        c.respond(nil)
```
5. Follower可以处理liveBootStrap请求:
```
case b := <-r.bootstrapCh:
        b.respond(r.liveBootstrap(b.configuration))
```

6. 若定时器到期,则重置定时器,之后比较最后一次与Leader通信的时间到现在,是否超出定时器的时间
    1. 若没有超出,则继续等待
    1. 若超出,则清除当前raft节点的Leader信息,进入Candidate状态

若当前作为Candidate

  1. 创建一个选举channel,投自己一票,同时发送rpc请求给其他的server请求给自己投票
  2. 创建一个定时器设定超时时间为配置的HeartbeatTimeout参数的1~2倍里面随机的值heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
  3. 如果从vote channel传回消息:
    1. 如果当前的任期值大于自己的任期值,则放弃Candiate,回到Follower,并返回
    2. 如果是一张有效的投票,则自己的票数+1
    3. 如果自己的总票数大于需要的票数(所有能投票的voters数/2 + 1),则自己当选Leader,并返回
  4. Follower拒绝configurationChangeCh、applyCh、verifyCh、userRestoreCh通道传来的请求,(这些操作只能是Leader来完成),统一返回v.respond(ErrNotLeader)
  5. Follower可以处理配置查询请求:
  case c := <-r.configurationsCh:
          c.configurations = r.configurations.Clone()
          c.respond(nil)
  1. 如果定时器超时,则返回,重新发起选举流程

若当前作为Leader

  1. 通知其他节点,我现在是Leader
  2. 初始化leaderState
  3. 为每一个节点创建一个channel,负责异步的复制replication到其他的节点
  4. 进入leader loop
    1. 创建一个租期通道,时间为LeaderLeaseTimeout字段配置
    2. 如果自己的状态为Leader则处理rpcCh、commitCh、verifyCh、userRestoreCh、configurationsCh、configurationChangeChIfStable、bootstrapCh、applyCh通道的请求
    3. 如果租期到了,check是否在最近的lease周期里面,能够和半数以上的节点通信,如果不能,让出Leader位置,如果能够,则按一定算法续一个租期

r.runFSM()

  1. r.runFSM()方法是一个长期运行的goroutine,负责应用logs到有限状态机
    1. 根据请求的不同分别做commit、restore、snapshot操作:
  for {
      select {
      case ptr := <-r.fsmMutateCh:
          switch req := ptr.(type) {
          case *commitTuple:
              commit(req)
          case *restoreFuture:
              restore(req)
          default:
              panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
          }
      case req := <-r.fsmSnapshotCh:
          snapshot(req)

      case <-r.shutdownCh:
          return
      }
  }

r.runSnapshots()

  1. r.runSnapshots()方法是一个长期运行的goroutine,负责管理给有限状态机创建快照
    1. 根据配置的快照间隔时间,创建一个定时器,时间为间隔时间的1~2倍中的任意数值,若定时器到时间,则判断是否需要做快照,若需要,则触发快照操作
    2. 用户可以手动触发快照操作
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342