背景
raft协议是维持整个consul生态中最重要的一个协议,它负责维护consul server之间的强一致性
作为基于 Paxos 的一种变种算法,它简化了状态,通过加入时间差、领导选举等概念使一致性算法变得更简单、易懂
相关名词:
- Leader election: 领导选举,是目前落地的一致性算法不可避免的一个步骤,为了达到一致性,必须要已一个共同认可的节点做出所有操作。而raft协议的节点都会处于三种状态之一:
- Leader: 操作的真正处理者,它负责将改动确认生效,并将其同步到其他节点。
- Follower: 负责将来自 Leader 的改动请求写入本地 Log,返回成功。
- Candidate: 如果 Leader 发生了故障,没有收到心跳的 Followers 们会进入 Candidate 状态,并开始选主,并保持该状态直到选主结束。
- Log Replication: 当 Leader 被选举出来以后,所有的写请求都必须要到 Leader 执行,Leader 会将其作为 Log entry 追加到日志中,然后给其他 Follower 发送请求,当绝大部分的 ((N/2)+1)的 Follower replicated 成功后,就代表该写入事件成功了,就会返回结果状态到客户端。
- Log Compaction: 在实际的应用场景中,因为磁盘空间是有限的,日志不能无限地增长,否则即使系统需要重启也需要耗费大量的时间。所以, raft 会对节点进行 snapshot 操作,执行成功后,会将 snapshot 之前的日志丢弃掉。
更详细的raft工作流程参见这个动画
关键数据结构
Raft node结构体
type Raft struct {
raftState //保存状态变量结构体
protocolVersion ProtocolVersion
applyCh chan *logFuture //负责异步发送logs到主线程去提交以及应用到有限状态机
conf Config //提供Raft初始化所需配置信息
fsm FSM //负责客户端执行命令的有限状态机
fsmMutateCh chan interface{} //负责发送状态改变更新到有限状态机
fsmSnapshotCh chan *reqSnapshotFuture //负责触发一个新的快照
lastContact time.Time //最后一次和主通信时间,用来计算主是否还可用
lastContactLock sync.RWMutex
leader ServerAddress //当前的主的地址
leaderLock sync.RWMutex
leaderCh chan bool //负责通知主的改变
leaderState leaderState //只有状态为leader的时候使用该字段
localID ServerID //储存自己的server ID,避免发送RPC请求给自己
localAddr ServerAddress //储存自己的地址
logger *log.Logger //日志
logs LogStore //可靠地日志存储
configurationChangeCh chan *configurationChangeFuture //负责通知leader做配置更替
configurations configurations //从log/snapshot里记录最新的配置和最新的committed配置
rpcCh <-chan RPC //传输层RPC通道
shutdown bool
shutdownCh chan struct{} //负责退出
shutdownLock sync.Mutex //并发锁
snapshots SnapshotStore //负责存储和读取snapshots
userSnapshotCh chan *userSnapshotFuture //负责响应用户发起的快照操作
userRestoreCh chan *userRestoreFuture //负责响应用户发起的恢复操作
stable StableStore //状态持久化存储,负责持久化raftState的部分字段
trans Transport //传输层
verifyCh chan *verifyFuture //发送异步确认消息到主线程,确认当前仍然是主
configurationsCh chan *configurationsFuture //安全的获取主线程以外发送过来的配置数据
bootstrapCh chan *bootstrapFuture //用于尝试从主线程以外触发初始化操作
observersLock sync.RWMutex //保护观察者的锁
observers map[uint64]*Observer //观察者列表
}
raftState 结构体
type raftState struct {
currentTerm uint64 //当前项,StableStore的缓存
commitIndex uint64 //最高的已提交的日志入口
lastApplied uint64 //最新以应用到有限状态机的日志
lastLock sync.Mutex //保护接下来的四个字段的锁
lastSnapshotIndex uint64 //缓存的最新的快照索引
lastSnapshotTerm uint64 //缓存的快照项
lastLogIndex uint64 //LogStore中缓存的最新的日志索引
lastLogTerm uint64 //LogStore中缓存的最新的日志项
routinesGroup sync.WaitGroup //记录运行中的goroutines
state RaftState //当前的RaftState(0-Follower 1-Candidate 2-Leader 3-Shutdown)
}
出生点
raft作为Consul server之间的共识机制,在agent创建时就应该初始化,因此以agent.go为入口:
- 在agent的启动方法中:
func (a *Agent) Start() error {}
根据agent的运行时配置(RuntimeConfig)中的ServerMode
参数,若该字段为true则
- 创建新的Consul server
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
- 该函数会根据传入配置构造一个新的Consul server,内部会初始化当前server的raft server:
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
- 在setupRaft()方法中做了一系列的raft相关操作:
- 创建一个有限状态机
- 创建一个raft的传输层
- 根据情况创建后端(all in-memory或者full disk-based)
- 根据情况初始化(bootstrap字段置为true或者为dev模式)
- 创建一个channel可靠的传输leader的通知
- 将前面创建的状态机,日志,存储后端,传输层等组装为server的raft node,并返回,完成raft的初始化创建:
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
if err != nil {
return err
}
return nil
- NewRaft()函数负责创建raft,并运行:
- 尝试恢复老的状态,如果存在(snapshots, logs, peers)
- 确保server地址和ID有效
- 创建Raft node 结构体实例
- 初始化为一个follower
- 尝试从集群中恢复数据(term,log,snapshot)
- 设置传输层心跳handler
- 开启后台工作,并返回。goFunc()方法开启一个goroutine并适当的处理一个routine的开启,增加,存在,减少之间的竞争。
r.goFunc(r.run)
r.goFunc(r.runFSM)
r.goFunc(r.runSnapshots)
return r, nil
运行流程
r.run()
- r.run()方法,开启一个长期运行的goroutine跑Raft有限状态机:
- 在收到退出信号之前,一直在Follower,Candidate,Leader三种状态之间转换
若当前作为Follower
1. 创建一个定时器设定超时时间为配置的HeartbeatTimeout参数的1~2倍里面随机的值`heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)`
1. Follower可以处理RPC请求
1. Follower拒绝configurationChangeCh、applyCh、verifyCh、userRestoreCh通道传来的请求,(这些操作只能是Leader来完成),统一返回`v.respond(ErrNotLeader)`
1. Follower可以处理配置查询请求:
```
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
```
5. Follower可以处理liveBootStrap请求:
```
case b := <-r.bootstrapCh:
b.respond(r.liveBootstrap(b.configuration))
```
6. 若定时器到期,则重置定时器,之后比较最后一次与Leader通信的时间到现在,是否超出定时器的时间
1. 若没有超出,则继续等待
1. 若超出,则清除当前raft节点的Leader信息,进入Candidate状态
若当前作为Candidate
- 创建一个选举channel,投自己一票,同时发送rpc请求给其他的server请求给自己投票
- 创建一个定时器设定超时时间为配置的HeartbeatTimeout参数的1~2倍里面随机的值
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
- 如果从vote channel传回消息:
- 如果当前的任期值大于自己的任期值,则放弃Candiate,回到Follower,并返回
- 如果是一张有效的投票,则自己的票数+1
- 如果自己的总票数大于需要的票数(所有能投票的voters数/2 + 1),则自己当选Leader,并返回
- Follower拒绝configurationChangeCh、applyCh、verifyCh、userRestoreCh通道传来的请求,(这些操作只能是Leader来完成),统一返回
v.respond(ErrNotLeader)
- Follower可以处理配置查询请求:
case c := <-r.configurationsCh:
c.configurations = r.configurations.Clone()
c.respond(nil)
- 如果定时器超时,则返回,重新发起选举流程
若当前作为Leader
- 通知其他节点,我现在是Leader
- 初始化leaderState
- 为每一个节点创建一个channel,负责异步的复制replication到其他的节点
- 进入leader loop
- 创建一个租期通道,时间为LeaderLeaseTimeout字段配置
- 如果自己的状态为Leader则处理rpcCh、commitCh、verifyCh、userRestoreCh、configurationsCh、configurationChangeChIfStable、bootstrapCh、applyCh通道的请求
- 如果租期到了,check是否在最近的lease周期里面,能够和半数以上的节点通信,如果不能,让出Leader位置,如果能够,则按一定算法续一个租期
r.runFSM()
- r.runFSM()方法是一个长期运行的goroutine,负责应用logs到有限状态机
- 根据请求的不同分别做commit、restore、snapshot操作:
for {
select {
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case *commitTuple:
commit(req)
case *restoreFuture:
restore(req)
default:
panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
}
case req := <-r.fsmSnapshotCh:
snapshot(req)
case <-r.shutdownCh:
return
}
}
r.runSnapshots()
- r.runSnapshots()方法是一个长期运行的goroutine,负责管理给有限状态机创建快照
- 根据配置的快照间隔时间,创建一个定时器,时间为间隔时间的1~2倍中的任意数值,若定时器到时间,则判断是否需要做快照,若需要,则触发快照操作
- 用户可以手动触发快照操作