Etcd 源码阅读
本文是 etcd-raft 库源码的阅读笔记。希望通过阅读 etcd-raft 库的源码,学习工业场景下对 raft 算法的设计和实现,加深对 raft 的理解。阅读的 etcd 版本为 v3.4.7,在阅读过程中加入了部分注释,同时为了搞清楚一些实现的细节考量,查阅了很多相关 pr,并在源码中标注了链接,地址为:etcd-raft 源码注释 - v3.4.7
【文章尚未完成,持续更新中...】
一、从 raftexample 开始
etcd-raft 的特点在于其设计屏蔽了网络、存储等其他模块,只提供接口交由上层应用者来实现,这样的设计思路使其具有很高的可定制性和可测试性。这也决定了想要使用 etcd,用户需要自己补全核心协议之外的部分。raftexample 是 etcd 官方提供的一个示例程序,简单展示了 etcd-raft 的使用方式。通过大概预览示例,缕清 etcd-raft 模块之间的协作方式,为深入学习 raft 协议的实现奠定基础。
入口
func main() {
//------------初始化参数:cluster,id,kvport,join-------------//
//--------------建立应用层和raft之间的通信通道 --------------//
proposeC := make(chan string)
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft provides a commit stream for the proposals from the http api
// ---------------启动 raft ------------------------//
var kvs *kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
//--------------------启动 kv存储服务---------------------//
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
// the key-value http handler will propose updates to raft
//-------------------启动 http 对外服务-------------------//
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
由此可知,example 中搭建的服务总共由3个部分组成:
- kvstore:持久化模块
- raftnode:核心 raft 组件
- httpKVAPI:http对外服务
服务之间主要通过 3 个通道来进行交互:
- proposeC:用于日志更新
- confChangeC:提交配置变更信息
- commitC:用于将 entries 提交到持久化模块中
httpKVAPI
我们从 httpKVAPI 开始看起,该服务是外界与 etcd 实例交互的接口,通过实现 ServeHTTP
接口来启动网络服务。提供的服务有:
- PUT 方法,通过 proposeC 通道向持久化模块提交键值对变更信息;
- 键值对提交后不需要等待 raft 的响应,返回状态码 204
- GET 方法,对持久化模块发起查询操作;
- POST方法和 DELETE 方法,通过 confChangeC 通道向 raft 组件提交配置变更信息,信息使用 pb 协议进行压缩;
- 信息提交后不需要等待 raft 响应,返回状态码 204
httpKVAPI 通过对外提供 http 服务与用户进行交互,对内,通过 proposeC 和 confChangeC 两个通道与 etcd 核心模块进行交互,而对于只读的 GET 操作,直接访问本地数据库获取结果
kvstore
接下来我们来看一看持久化存储部分。
调用 newKVStore 初始化持久化模块时,会首先从 raft 中读取快照,然后再后台启动一个 goroutine 持续监听 commitC 通道。
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
for data := range commitC {
if data == nil { // data 为 nil,重放快照
// done replaying log; new data incoming
// OR signaled to load snapshot
snapshot, err := s.snapshotter.Load()
//-----错误处理-----
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
continue
}
// 否则将收到的数据进行持久化
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
// .......
}
- RaftNode 中如果有 entry 需要提交,则会写入 commitC 通道。kvstore 从通道中读取要持久化的值,通过 gob 压缩编码后存入数据库中;
- 如果写入 nil,则说明要求该节点的 kvstore 加载快照
除此之外,该模块还提供了对数据库的查询和提交的 API 接口:Lookup(key string)
和 Propse(k, v string)
raftnode
最后是 newRaftNode 部分。
初始化节点
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter)
- 初始化节点时传入了以下参数:
- id:当前节点的 id
- peers:集群中所有节点的 URL 地址
- join:是否加入集群
- getSnapshot:函数类型的参数,支持自定义获取快照的方式
- proposeC 和 confChangeC:与其余两个组件交互的通道
- 在初始化节点时,创建了一个内部的提交通道 commitC,用于提交待持久化的 entries
- commitC 通道与 raft 节点的生命周期互相绑定,停止服务时,从内部关闭 commitC 通道
- 除此之外,该模块还维护了与快照,底层 raft 模块,WAL日志相关的变量,这里暂不展开
- 节点初始化的最后一步是以 goroutine 的形式调用 startRaft 方法,继续完成后续的初始化工作
启动 Raft
func (rc *raftNode) startRaft() {
// 创建 snapshotter,通过 snapshotterReady 通道返回创建的实例,用于管理快照
if !fileutil.Exist(rc.snapdir) {
if err := os.Mkdir(rc.snapdir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
rc.snapshotterReady <- rc.snapshotter
// 创建 WAL
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL() // 加载快照,并重放WAL
//--------创建 raft.config-------------------
// 判断当前节点是首次启动还是重新启动
if oldwal {
rc.node = raft.RestartNode(c)
} else {
startPeers := rpeers
if rc.join {
startPeers = nil
}
rc.node = raft.StartNode(c, startPeers)
}
// 创建并启动 transport 实例,该实例负责节点之间的网络通信
rc.transport = &rafthttp.Transport{
Logger: zap.NewExample(),
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
rc.transport.Start()
// 与集群中的其他节点建立连接
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
go rc.serveRaft()
go rc.serveChannels()
}
serveRaft
func (rc *raftNode) serveRaft() {
//-----------获取当前节点的 url-------------
//--------------创建了一个 listener---------------------
ln, err := newStoppableListener(url.Host, rc.httpstopc)
//-------------------开启连接----------------------
err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
select {
case <-rc.httpstopc:
default:
log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
}
close(rc.httpdonec)
}
serveRaft 用于监听当前节点的指定端口,处理与其他节点的网络连接
-
newStoppableListener()
方法返回的是一个自定义的结构体,该结构体实现了 Accept 接口,同时封装了一个停止通道,用于结束连接,该用法值的学习type stoppableListener struct { *net.TCPListener stopc <-chan struct{} } func newStoppableListener(addr string, stopc <-chan struct{}) (*stoppableListener, error) { ln, err := net.Listen("tcp", addr) //---------- 错误处理 --------------- return &stoppableListener{ln.(*net.TCPListener), stopc}, nil } func (ln stoppableListener) Accept() (c net.Conn, err error) { connc := make(chan *net.TCPConn, 1) errc := make(chan error, 1) go func() { tc, err := ln.AcceptTCP() if err != nil { errc <- err return } connc <- tc }() select { case <-ln.stopc: return nil, errors.New("server stopped") case err := <-errc: return nil, err case tc := <-connc: tc.SetKeepAlive(true) tc.SetKeepAlivePeriod(3 * time.Minute) return tc, nil } }
serveChannels
该函数主要分为两大部分:
第一部分负责监听 proposeC 和 confChangeC 两个通道,将从通道传来的信息传给底层 raft 处理
第二部分负责处理底层 raft 返回的数据,这些数据被封装在 Ready 结构体中,包括了已经准备好读取、持久化、提交或者发送给 follower 的 entries 和 messages 等信息
-
配置了一个定时器,定时器到期时调用节点的 Tick() 方法推动 raft 逻辑时钟前进
func (rc *raftNode) serveChannels() { //...... ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() //...... for { select { case <-ticker.C: // 定时器到期,调用 Tick() 方法推动 raft 逻辑时钟前进 rc.node.Tick() //...... } } }
小结
通过以上概览,我们可以得到 raftexample 实现的总体结构为:
- raftnode 居中协调,其余组件通过 channel 与 raftnode 进行交互
- 用户可以自定义实现各个组件,底层核心库 etcd-raft 由 etcd 负责维护,这样每个 etcd-raft 都是一个独立的状态机,极大的增强了应用的灵活性
- 不同模块分开实现,增强了程序的可测试性
- 底层的 etcd-raft 只实现了状态转移,整个应用的性能由用户来决定
二、etcd-raft
由之前的 raftexample 分析可知:etcd 的 raft 核心库与用户的数据传输实际上是通过 readyC 通道中传递的 Ready 结构体来承载的。Ready 结构体定义在 raft/node.go
文件中,除了 Ready 结构体之外,该文件中还定义了一个 Node 接口。如其名字所示,该接口代表了 raft 集群中的一个节点,其实就是用户与 raft 状态机交互的接口。
由此,etcd-raft 的实现可以看成是两个主要部分组成:
- raft 算法由 raft/raft.go 来实现,其中的 raft 结构体维护了相关的状态,实现了算法的细节;
- 用户通过调用 node 接口及其实现提供的方法,与底层 raft 状态机进行交互。
接下来就分别对这两部分内容进行学习。
Node 接口
-
node 接口定义在
raft/node.go
文件中,通过调用 raft 提供的方法来推进状态机状态的变更,包含了以下方法:type Node interface { // raft 的逻辑时钟,替代真实时间 Tick() // 节点调用 campaign,从 follower 转为 candidate,发起选举 Campaign(ctx context.Context) error // 调用 propose 来发起一个新提案 Propose(ctx context.Context, data []byte) error // 调用 ProposeConfChange 来提交一次集群成员变更的提案,特别注意,除非 leader 可以确定在集群成员变更 entries 之前 // 的所有日志条目都已经被提交,否则该条 entries 有可能被丢弃(覆写) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error // 将 message 运用到状态机 Step(ctx context.Context, msg pb.Message) error // 与应用层交互的通道 Ready() <-chan Ready // 在从 Ready 通道中读取完值后调用,用来通知底层 raft 可以继续下一步操作 Advance() // 提交配置变更 ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState // 用于 leadership transfer TransferLeadership(ctx context.Context, lead, transferee uint64) // 调用 ReadIndex 来发起一个只读请求 ReadIndex(ctx context.Context, rctx []byte) error // 返回状态机当前的状态 Status() Status ReportUnreachable(id uint64) ReportSnapshot(id uint64, status SnapshotStatus) Stop() }
-
用户通过调用
func StartNode(c *Config, peers []Peer) Node
方法,传入配置选项和集群中节点的信息,即可创建一个 Node 实例func StartNode(c *Config, peers []Peer) Node { if len(peers) == 0 { panic("no peers given; use RestartNode instead") } // 生成一个 rawNode,rawNode 直接与 raft 底层库交互,是线程不安全的 rn, err := NewRawNode(c) if err != nil { panic(err) } // 状态转换为 follower,发送配置变更 RPC rn.Bootstrap(peers) // 用 node 封装一个 rawnode,node 是线程安全的 n := newNode(rn) go n.run() // 后台启动 run 方法,监听各类事件 return &n }
-
由代码可知,在 StartNode 通过调用
newNode()
创建了 node 实例,该实例实际上是通过 rawNode 与 raft 进行交互的- rawNode 是线程不安全的 Node,用来实现 multiraft,node 通过封装 rawNode 后,通过各种 channel 将整个处理流程串行化
-
node 实例的成员实际上是一系列的通道
func newNode(rn *RawNode) node { return node{ // node 的所有成员都是 channel propc: make(chan msgWithResult), recvc: make(chan pb.Message), confc: make(chan pb.ConfChangeV2), confstatec: make(chan pb.ConfState), readyc: make(chan Ready), advancec: make(chan struct{}), // tickc 是buffer channel,用于解决潜在的活锁的情况 tickc: make(chan struct{}, 128), done: make(chan struct{}), stop: make(chan struct{}), status: make(chan chan Status), rn: rn, // 封装 rawNode } }
-
通过调用 run 方法,在后台单线程中对这些所有的通道进行监听
func (n *node) run() { //-------定义变量--------- for { if advancec != nil { // 不为空,说明还在等待应用调用 Advance 接口通知应用已经完成之前 ready 的数据 readyc = nil } else if n.rn.HasReady() { // 如果存在一个未被处理的 Ready,为其创建一个 Ready 结构体 rd = n.rn.readyWithoutAccept() readyc = n.readyc // 设置 readyc 不为空,用于后续接收消息 } //------处理中途leader变更的情况 // 监听各通道 select { case pm := <-propc: // 处理本地提交的信息 case m := <-n.recvc: // 处理其他节点发送过来的消息 case cc := <-n.confc: // 处理配置变更的消息 case <-n.tickc: // 处理 tick n.rn.Tick() // 调用 raft.tick() 方法驱动逻辑时钟前进一次 case readyc <- rd: // 处理未处理的 Ready 信息 n.rn.acceptReady(rd) // 标记 Ready 消息已经处理完成 advancec = n.advancec // 用节点的 advancec 初始化 advancec,后续可以从 advancec 接收消息 case <-advancec: // 收到 advancec 通道传来的信息,说明之前的 Ready 已经被处理,通知 raft 可以进行下一轮处理 n.rn.Advance(rd) rd = Ready{} // 重置 rd 为空 advancec = nil // 重置 advancec case c := <-n.status: // 处理请求状态的信息 case <-n.stop: // 处理 stop 信号 } } }
- 由代码可知,所有的消息都是通过封装的 rawNode 传递给 raft 底层状态机的
-
对于接口中的其他方法,通过调用封装的
func (n *node) Step(ctx context.Context, m pb.Message) error
方法发送对应的消息给状态机,实现推动状态机状态的变更
raft 算法的实现
对于该部分内容的学习将结合 raft 大论文来进行
初始化:newRaft()
创建一个新的 raft 节点,主要做了以下工作:
- 初始化各项属性;
- 检查 log 相关的配置,确定是否需要使用已有数据进行恢复,修正 applyIndex 等;
- 设置状态为 follower;
节点的状态
状态汇总
定义了以下几种状态:
const (
StateFollower StateType = iota
StateCandidate
StateLeader
StatePreCandidate
numStates
)
- 常规状态三种:Follower,Candidate,Leader
- 优化:PreCandidate,用于预选举
状态转换
- 通过
becomeFollower,becomeCandidate,becomePreCandidate 和 becomeLeader
几个方法来实现。这些函数的主要逻辑基本是:- 更新 state 切换到对应状态;
- 更新 tick 函数检查是否需要触发不同的事件;
- 调用
reset()
重置超时和设置相关状态(如任期); - 更新 step 函数处理不同状态的消息;
逻辑时钟的实现
在之前介绍的 Node 接口部分我们提到过,Node 接口实现了 Tick()
方法,在节点的 tickc
通道中有消息触发时,会调用底层 rawNode
的 Tick()
方法推动逻辑时钟前进:
// 调用 Tick() 方法会向 tickc 通道发送一个信号,在信号处理端执行驱动 tick 的方法调用
func (n *node) Tick() {
select {
case n.tickc <- struct{}{}:
case <-n.done:
//......
}
}
// Tick advances the internal logical clock by a single tick.
func (rn *RawNode) Tick() {
rn.raft.tick()
}
type raft struct {
//......
tick func()
//......
}
- 实际调用的是 raft 的
tick
成员,该成员是一个函数类型的变量,对应不同的节点状态,可以设置为以下两个函数- 对于 Candidate,preCandidate 和 Follower:
func (r *raft) tickElection()
- 对于 Leader:
func (r *raft) tickHeartbeat()
- 对于 Candidate,preCandidate 和 Follower:
- 设置
tick
成员的时机是在节点身份变更的一系列becomeXXX
方法中完成的,即节点的身份变更后,将其计时器更新为对应身份的计时器
而 Node 接口的 Tick()
方法则是由用户设置了一个定时器,在定时器到期后触发的,具体实例见 raftexample 解析的 serveChannels
部分 。在 raft 结构体中维护了以下几个成员变量:
type raft struct {
//......
electionElapsed int
heartbeatElapsed int
heartbeatTimeout int
electionTimeout int
randomizedElectionTimeout int
//......
}
-
heartbeatTimeout
、electionTimeout
和randomizedElectionTimeout
三个成员变量指定了逻辑时钟的超时值,该值是通过Config
来设置的type Config struct { //...... // ElectionTick is the number of Node.Tick invocations that must pass between // elections. That is, if a follower does not receive any message from the // leader of current term before ElectionTick has elapsed, it will become // candidate and start an election. ElectionTick must be greater than // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid // unnecessary leader switching. ElectionTick int // HeartbeatTick is the number of Node.Tick invocations that must pass between // heartbeats. That is, a leader sends heartbeat messages to maintain its // leadership every HeartbeatTick ticks. HeartbeatTick int //...... }
每次调用 Tick() 函数都会触发一次逻辑时钟的推进,用
electionElapsed
和heartbeatElapsed
两个变量来记录 Tick() 函数调用的次数,当调用次数超过了逻辑时钟的超时值时,即触发超时-
每次节点的身份转换时都会通过调用
reset()
方法来重置超时时间func (r *raft) reset(term uint64) { //...... r.electionElapsed = 0 r.heartbeatElapsed = 0 r.resetRandomizedElectionTimeout() // 重置选举超时 //...... } func (r *raft) resetRandomizedElectionTimeout() { r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout) }
消息的处理
etcd-raft 中将所有的操作都封装到一个消息结构体中,通过 RPC 来在不同的节点之间传输。
消息的类型和结构体定义在 raft/raftpb/raft.proto 文件中,消息处理的入口是 func (r *raft) Step(m pb.Message) error
方法。
消息的处理过程与节点的状态密切相关:
- 不同的节点状态可以处理不同类型的消息;
- 同一类型的消息在不同的节点状态中处理逻辑也可能不同;
消息的类型可以分为两类:驱动节点状态变更(选举相关)和其他业务处理。etcd-raft 将消息的处理过程分为两部分:
- 在入口函数
Step()
中处理状态变更的相关消息,用以确定节点当前的状态; - 后续的消息根据角色的状态不同,通过调用
r.step()
来继续执行;
消息的预处理
func (r *raft) Step(m pb.Message) error {
switch {
case m.Term == 0:
// ......
case m.Term > r.Term:
// ......
case m.Term < r.Term:
// ......
}
switch m.Type{
case pb.MsgHup:
// ......
case pb.MsgVote, pb.MsgPreVote:
// ......
default:
// ......
}
return nil
}
- 该方法的实现逻辑由两部分组成:
- 第一部分先对任期进行检查,这是因为任期在 raft 中充当着逻辑时钟的角色,任期影响节点的状态,而不同的状态对应了执行特定操作(处理不同类型消息)的权限,故而在进一步处理消息前需要先对任期进行检查。检查后的结果为:
- 当前节点任期为0,继续第二部分;
- 当前节点任期大于收到的消息的任期:直接返回,这也是论文中对 server 行为的要求。除此以外,针对以下两种特别场景,etcd 进行了优化;
- 当前节点任期小于收到的消息的任期:按照论文要求变为 follower 后继续第二部分;
- 注意,由于 etcd 加入了 quorum、预投票和只读请求三个可选优化,导致开启不同优化选项时可能会出现问题,故而对任期检查的逻辑进行了修正,这部分内容在优化部分进行分析;
- 第二部分则是分离状态变更消息和其他业务消息的处理逻辑:
- MsgHup:用于本节点发起选举
- 针对 learner 的情况作了优化
- 调用
r.campaign()
方法触发选举
- MsgVote 和 MsgPreVote:用于发起投票
- 投票的规则基于论文的描述,由于加入了预投票以及其他 bug 的修正,当前的投票条件为:
- 之前已经为同一节点投过票
- 或还没有投过票且不知道当前的 leader 是谁
- 或收到的是预投票消息且消息的任期大于节点的任期
- 并且请求方的日志至少和当前节点一样新
- 验证可以投票后,调用
r.send()
方法发起投票; - 如果投票成功,则重置计时器,并记录下是为哪个节点投的票
- 投票的规则基于论文的描述,由于加入了预投票以及其他 bug 的修正,当前的投票条件为:
- 对于其他类型的消息,调用
r.step()
方法来进行处理
- MsgHup:用于本节点发起选举
- 第一部分先对任期进行检查,这是因为任期在 raft 中充当着逻辑时钟的角色,任期影响节点的状态,而不同的状态对应了执行特定操作(处理不同类型消息)的权限,故而在进一步处理消息前需要先对任期进行检查。检查后的结果为:
消息处理的实现
通过预处理过程之后,节点会根据自身的状态调用对应的方法来处理收到的消息。etcd-raft 通过回调函数 r.step()
来实现这一点:
type stepFunc func(r *raft, m pb.Message) error
type raft struct {
// ......
// 回调函数,对应 stepFollower, stepCandidate 和 stepLeader 三个具体方法,
// 用于不同状态的节点对各种 Msg 进行处理
step stepFunc
//......
}
func (r *raft) Step(m pb.Message) error {
//--------- 任期检查 --------------
switch m.Type{
//-------------选举和投票相关消息处理----------
default:
err := r.step(r, m) // 调用回调函数处理其他消息
if err != nil {
return err
}
}
return nil
}
选举
选举流程
etcd-raft 中使用的是逻辑时钟而不是真实的时间
-
选举是由 Follower 或者 Candidate 发起的,发起选举的条件是:选举超时到期。
func (r *raft) tickElection() { r.electionElapsed++ // 如果可以被提升为 leader,同时也满足选举超时,发送一个 MsgHup 消息触发选举 if r.promotable() && r.pastElectionTimeout() { //fmt.Printf("trigger, electionElapsed is: %d, random timeout is: %d\n", // r.electionElapsed, r.randomizedElectionTimeout) r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) // 选举超时是给自己发送 MsgHup 消息 } }
-
之后在
Step
方法中处理 MsgHup 消息:case pb.MsgHup: // 用于本节点发起选举 if r.state != StateLeader { if !r.promotable() { // 该部分用于确保 learner 不会发起选举 return nil } // 取出还未提交到状态机的信息 [applied + 1, committed] ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) if err != nil { r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) } // 在调用 Campaign 之前,需要保证所有之前的配置变更都已经被应用 if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { return nil } // 开始选举 if r.preVote { // 对于预投票,调用预投票选举 r.campaign(campaignPreElection) } else { r.campaign(campaignElection) } } else { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) } // .....
-
最终的选举过程是在
campaign
方法中实现的,大概流程为:-
切换到 preCandidate/Candidate 状态,设置任期,更新 tick 函数为
r.tickElection
,更新 Vote 变量为自己的 id,更新状态为 StatePreCandidate/StateCandidate- 预投票状态是论文中提到的一种优化方式,后文有详细讲解
-
调用
r.poll()
方法为自己投票- 如果是单节点集群,则立刻成为 leader
-
向集群中的其他节点发送 MsgPreVote/MsgVote 消息
- 发送的消息中包括了一些额外的字段:
- 当前任期,当前节点的最后一条日志索引(lastIndex),最后一条日志索引对应的任期(lastTerm),附加的Context(用于 leadership transfer)
- 发送的消息中包括了一些额外的字段:
-
之后进入集票部分,原理是收到 majority 的票数,则选举成功转为 leader,具体代码注释参见预投票部分
func (r *raft) campaign(t CampaignType) { //...... // 根据选举的类型更新节点的状态 if t == campaignPreElection { r.becomePreCandidate() voteMsg = pb.MsgPreVote // PreVote RPCs are sent for the next term before we've incremented r.Term. term = r.Term + 1 } else { r.becomeCandidate() voteMsg = pb.MsgVote term = r.Term } // 调用 poll() 方法给自己投票,如果是单节点系统,则立刻成为 leader,如果是预投票,则发起投票 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { r.campaign(campaignElection) } else { r.becomeLeader() } return } //...... for _, id := range ids { if id == r.id { // 跳过自己 continue } var ctx []byte if t == campaignTransfer { // 支持 leadership transfer ctx = []byte(t) } // 向其他节点发送请求投票 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) } }
-
-
当其他节点收到投票请求后,会对是否投票进行判断:
func (r *raft) Step(m pb.Message) error { //............ // 首先进行任期判断 case m.Term > r.Term: // 消息任期大于当前节点的任期 // -----------优化部分,避免不必要的干扰-------------- //-------------优化部分,单独判断预投票的流程------------- // 当前节点因为收到一条更高任期的消息需要变为 follower,此时需要更新 r.leader,为了让更新更明确,在知晓消息是由当前 leader 发来时,可以直接更新 r.leader 为 r.From,这类消息是:AppendEntries, heartbeat 以及快照相关,对于其他类型的消息,因为不知道发送方是否是 leader,将 r.leader 更新为 none if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { r.becomeFollower(m.Term, m.From) } else { r.becomeFollower(m.Term, None) } } case m.Term < r.Term: //-------------优化部分-------------------- // 消息任期小于当前任期,直接返回 return nil } // 可以继续执行第二部分的情况: // - 消息任期和节点任期相同 // - 消息任期大于节点任期:预投票场景或者当前节点转为 follower 之后继续 switch m.Type { //.................... case pb.MsgVote, pb.MsgPreVote: // We can vote if this is a repeat of a vote we've already cast... canVote := r.Vote == m.From || // ...we haven't voted and we don't think there's a leader yet in this term... (r.Vote == None && r.lead == None) || // ...or this is a PreVote for a future term... (m.Type == pb.MsgPreVote && m.Term > r.Term) // 确定是否可以投票: // - 之前已经投过票的,再次收到了同一节点的请求投票 // - 还没有投过票且不知道当前的 leader 是谁 // - 收到的是预投票消息且消息的任期大于节点的任期 // 满足以上三条中的任意一条,则 canVote = true if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { // isUpToDate 中进行日志比较 // 满足投票条件 // 注意,响应 MsgPreVote 时的任期使用的是来自消息的任期,而不是本地的任期 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) if m.Type == pb.MsgVote { // 投票成功,清空计时器,并保存为哪个节点投票 // Only record real votes. r.electionElapsed = 0 r.Vote = m.From // 同一任期内只会给一个节点进行投票 } } else { // 拒绝投票 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) } //...... }
- 除去额外的优化部分,具体的判定规则与 raft 论文中描述的规则一致;
-
请求投票完成后,Candidate 开始集票
func stepCandidate(r *raft, m pb.Message) error { // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). var myVoteRespType pb.MessageType if r.state == StatePreCandidate { myVoteRespType = pb.MsgPreVoteResp } else { myVoteRespType = pb.MsgVoteResp } // ...... switch m.Type { // ...... case myVoteRespType: // 处理投票 gr, rj, res := r.poll(m.From, m.Type, !m.Reject) // 计算当前收到多少投票 r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) switch res { case quorum.VoteWon: // 如果quorum 都选择了投票 if r.state == StatePreCandidate { r.campaign(campaignElection) // 预投票发起正式投票 } else { r.becomeLeader() // 变成 leader r.bcastAppend() // 发送 AppendEntries RPC } case quorum.VoteLost: // 集票失败,转为 follower // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) // 注意,此时任期没有改变 } // ...... }
投票成功后,调用 r.becomeLeader 执行身份转换,然后立刻发送 AppendEntries RPC
心跳检测
维护了 heartbeatElapsed 和 electionElapsed 两个计时器,主要完成以下功能:
- 当 electionTimeout 超时:(实现论文中提到的 quorum 和 leadership transfer 两个优化)
- 如果开启了 quorum 模式,则给自己发送一条 MsgCheckQuorum 消息,检测是否处在网络隔离状态中
- 如果当前正处在 leadership transfer 过程中,则中断执行,自己重新变为 leader
- 当 heartbeatTimeout 超时:
- 发起心跳检测
MsgBeat 消息的处理
-
心跳计时超时后,leader 重置计时器,给自己发送一个心跳包
func (r *raft) tickHeartbeat() { // ...... if r.heartbeatElapsed >= r.heartbeatTimeout { // 如果心跳计时超时,重置计时器,发送心跳包 r.heartbeatElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) } }
-
调用
r.bcastHeartbeat()
将心跳包发给其余节点,该方法中判定了是否有额外的上下文需要追加(用于标记只读请求),最终是调用r.sendHeartbeat()
来构建心跳包并发送func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. // 心跳包中附带了 follower 的 matchIndex 索引和 master 的 commitIndex 中的最小值 commit := min(r.prs.Progress[to].Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, Commit: commit, Context: ctx, } r.send(m) }
-
心跳包的处理:
-
心跳包的接受同样是从
Step()
方法开始,然后调用对应的回调函数stepFollower()
和stepCandidate()
,也就是说,对于心跳包,除 leader 外的其余节点只做任期检查,而不进行日志的 up-to-date 比对- 这是由 raft 的规则决定的:每个任期内只能有一个有效 leader
- 对于日志的操作,Leader 永远不会删除自己的已有日志,而 Follower 则是根据 Leader 的日志情况修正自身日志
-
对于 Candidate 和 preCandidate:
case pb.MsgHeartbeat: // 收到心跳,说明当前有leader,转为 follower r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m)
-
对于 Follower:
case pb.MsgHeartbeat: // 收到心跳检测,重置选举超时 r.electionElapsed = 0 r.lead = m.From r.handleHeartbeat(m)
-
构建响应
func (r *raft) handleHeartbeat(m pb.Message) { r.raftLog.commitTo(m.Commit) // 在心跳消息中传入 committedId,使用该值来更新当前节点的 committedId // 要将携带的Context原样发回,实现 readOnly r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) }
- 检查并根据检查结果更新当前节点的提交索引
- 响应 leader
-
-
leader 收到心跳包响应后的处理:
case pb.MsgHeartbeatResp: // 处理心跳响应 pr.RecentActive = true // quorum 机制,收到来自follower的心跳包响应后,将其 RecentActive 设置为 true //---------- Progress 流控机制的处理----------- // 心跳包中附带了对 follower 日志的检查,如果 follower 的日志落后于 leader,进行补全 if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } //-------------- 只读模式优化部分------------------
基本选举策略存在的问题
由于网络的问题存在 partition 的情况,导致:
- 同时存在多个 leader:旧 leader 被隔离后,majority 选出新的 leader,而旧 leader 因为无法收到新 leader 的消息依旧以为自己还是 leader。可能会导致客户端访问到旧 leader 接收到过期信息的情况;
- 节点重新加入集群,或者由于集群配置变更等导致正常工作的 leader 因为不合时宜的 RequestVote RPC 而 step down;
- 解法:引入 quorum 和预投票机制进行优化,见优化部分。
日志复制
Storage 接口
etcd 只负责核心 raft 模块的维护,将存储等其他部分交由用户自行实现。Storage 接口提供了查询持久化数据的相关方法,用户通过实现这些接口来操作持久化数据
type Storage interface {
// 返回保存的 HardState 和 ConfState 信息
InitialState() (pb.HardState, pb.ConfState, error)
// 范围查询 [lo, hi),注意有 maxSize 限制
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// 返回指定索引对应条目的任期
Term(i uint64) (uint64, error)
// 返回最后一条 Entry 的索引
LastIndex() (uint64, error)
// 返回第一条数据的索引
FirstIndex() (uint64, error)
// 返回最近一次快照的数据
Snapshot() (pb.Snapshot, error)
}
unstable
用于保存还未被用户层持久化的数据
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
// entries 数组中第一条 entry 的实际 index
// 因为日志压缩和快照机制,执行快照后,包含在快照中的 entry 会被清理掉,但是 log 中 entry 的索引是单调递增的,所以用 offset 来记录下之前被清理掉的最后一条 entry 的 index
offset uint64
logger Logger
}
- 因为存在快照截断日志,所以引入了偏移量 offset;
- 当前 unstable.entries 中的第 i 条日志,对应到 raft 日志中应为 i + offset
raftLog
etcd-raft 中日志相关的操作
type raftLog struct {
// 保存最近一次快照后提交的数据
storage Storage
// 保存还未持久化的记录和快照
unstable unstable
// 已提交记录的最高索引
committed uint64
// Invariant: applied <= committed
// 被状态机使用的记录的最高索引
applied uint64
logger Logger
// maxNextEntsSize is the maximum number aggregate byte size of the messages
// returned from calls to nextEnts.
// 根据 https://github.com/etcd-io/etcd/pull/9982,maxNextEntsSize 参数是用来防止一次提交的 raft 日志过大导致OOM
maxNextEntsSize uint64
Progress
etcd 将 raft 设计为纯状态机模型。从 leader 的视角来看,其余所有的节点都在预定好的几个状态之间不停切换。Progress 结构用于管理节点的状态。论文中的日志复制部分也是通过 Progress 机制来完成的
-
节点状态:共有三类节点状态
const ( // StateProbe indicates a follower whose last index isn't known. Such a // follower is "probed" (i.e. an append sent periodically) to narrow down // its last index. In the ideal (and common) case, only one round of probing // is necessary as the follower will react with a hint. Followers that are // probed over extended periods of time are often offline. StateProbe StateType = iota // StateReplicate is the state steady in which a follower eagerly receives // log entries to append to its log. StateReplicate // StateSnapshot indicates a follower that needs log entries not available // from the leader's Raft log. Such a follower needs a full snapshot to // return to StateReplicate. StateSnapshot )
-
StateProbe:当节点拒绝了最近一次 AppendEntries RPC 时,就会进入探测状态。leader 通过探测状态来确定节点的日志情况,找到匹配条目和冲突条目。
- 新当选的 leader 会将所有 follower 的状态设置为 StateProbe,更新 match = 0,next = 该 leader 的最后一条日志的索引;
- 在探测状态中,leader 在每个心跳间隔内至多发送一条 AppendEntries RPC,用以确定节点的日志情况;
- 当 follower 不再拒绝 MsgApp 时,意味着已经匹配到了正确的日志条目,则该 follower 转换至 StateReplicate 状态;
-
StateReplicate:复制状态,leader 将自己的日志复制给 follower。采用快速复制的方法,直接更新 follower 的 next 索引至 leader 发送的最新条目。
- 当 follower 拒绝了 msgApp 或者链路层报告 follower 不可达时,将该 follower 的转态调整为 StateProbe;
-
StateSnapshot:当 leader 发现该 follower 的日志远落后于当前日志时,转入快照状态加载快照;
- 快照状态下,leader 不会发送任何 AppendEntries 到该 follower;
- 在收到 follower 应用了快照的响应后,将该 follower 的状态转为 StateProbe;
-
状态转换图:
-
集群成员变更
理论基础
- 将集群从一个配置直接转为另一个配置会导致不安全:因为各个节点应用转换的时间点不同,总会存在一个时间点,使得同一任期内出现两个 leader,一个由旧配置节点选出,另一个由新配置节点选出
-
raft 论文中提出的解决方式是:限制一次只允许增加或者删除单个节点。只有上一轮的配置变更完成后,才能继续下一轮。
-
该方法能确保安全性的原因是:只增删单个节点,新旧配置中 majority 的部分必然有重叠
-
-
raft 的实现方式:将配置变更的指令放在日志条目中,当成特殊条目来处理。优点:
- 沿用了之前的日志同步提交机制,无需额外增加系统的复杂度,同时保证了安全性
- 整个过程可以继续处理客户端请求
-
存在的问题:
- 节点配置需要能够回退,因为未提交的条目有可能被覆盖
- TODO:这里有疑问,我理解的场景是:如果发生网络隔离,出现了A,B两个分组,各自选主后执行了不同的配置变更,再次合并后达成一致,有一组成员的数据会被修改
- 要处理不在当前配置内的节点的消息,即 servers 处理收到的 RPC 请求时不会去分辨发送方是否还在当前配置内:
- 对于不在当前集群内的 leader 发起的 AppendEntries RPC,如果不处理其 RPC,则新加入的节点将永远无法加入集群
- 要支持给非集群内成员节点投票
- 如:当前有3个节点,leader 宕机后,新加入一个节点,需要新节点加入投票才能完成选举
- 被移除的节点在停机前可能会发起新的请求投票,影响当前集群的稳定,使用 check quorum 和 Pre-Vote 来处理
- 节点配置需要能够回退,因为未提交的条目有可能被覆盖
-
新增节点还会带来一个问题:如果新增节点的 log 与原集群中节点的 log 数相差过大,则需要时间来补全新节点的 log,这个过程称为 catching up。而在这个过程中如果发生故障,则可能会降低集群的容错性,因为需要更长的时间来达成一致。
-
raft 的实现方式:
对于新加入的节点,增加一个阶段,不将该节点加入到投票或提交条目的 majority 中,而是由 leader 为其复制日志
leader 通过多轮同步的方式来为新节点复制日志,这期间新加入到 leader 中的条目也可以在后续的轮次中完成复制
- 使用多轮复制时为了降低应用出现不稳定的可能性,同时设置固定的同步轮数,如果最近后一次同步持续时间超过选举超时,则返回 false,客户端将在之后重试
-
-
移除当前集群的 leader 节点:
- 两种实现方式:
- 方案一:leader 需要管理不包含自己的集群,直到修改配置的条目被提交之后,再下线,之后通过选举超时来选出新集群的 leader
- 方案二:使用 leadership transfer
- 两种实现方式:
优化
预投票
该部分的内容在 《CONSENSUS BRIDGING THEORY AND PRACTICE》这篇论文的 9.6 节中描述,etcd-raft 在 pr #6624(https://github.com/etcd-io/etcd/pull/6624) 中实现,通过设置 raft.preVote = true 来开启该优化。
- 背景:节点进入分隔状态后,重新加入集群会扰乱集群的稳定性
- 算法:节点只有收到 majority 的投票后,才会增加任期进入正常选举
- 节点进入分隔状态后,由于没法收到 majority 的投票,不会增加自己的任期
- 节点重新加入集群后,由于大部分的节点都能正常收到心跳,所以不会发起投票,在收到来自当前 leader 的常规心跳检查后,变为 follower,更新自己的任期
etcd 的实现:
在任期判断时,对于预投票的消息,如果当前节点收到任期高于自己的消息,不会更新自己的任期
-
开始选举时,如果开启了预投票,则先进行预投票
func (r *raft) Step(m pb.Message) error { // ----- 任期判断 ------ switch m.Type { case pb.MsgHup: if r.preVote { r.campaign(campaignPreElection) } else { r.campaign(campaignElection) } // ...... }
-
节点状态变为 preCandidate
func (r *raft) becomePreCandidate() { // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == StateLeader { panic("invalid transition [leader -> pre-candidate]") } // Becoming a pre-candidate changes our step functions and state, // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. // 注意:预投票不更新任期 r.step = stepCandidate r.prs.ResetVotes() r.tick = r.tickElection r.lead = None r.state = StatePreCandidate r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) }
-
之后的投票流程和正常投票相同,区别在于其他节点收到 MsgPreVote 时不会改变自己的状态
- 注意:可以给多个预投票的 candidate 投票
-
在收集到 majority 的预投同意后,转到正常投票流程
func stepCandidate(r *raft, m pb.Message) error { //...... case myVoteRespType: // 处理投票 gr, rj, res := r.poll(m.From, m.Type, !m.Reject) // 计算当前收到多少投票 switch res { case quorum.VoteWon: // 如果quorum 都选择了投票 if r.state == StatePreCandidate { r.campaign(campaignElection) // 预投票发起正式投票 } else { r.becomeLeader() // 变成 leader r.bcastAppend() // 发送 AppendEntries RPC } case quorum.VoteLost: // 集票失败,转为 follower // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) // 注意,此时任期没有改变 } // ...... }
- 预投失败会转换为 follower,任期没有改变
-
注意:
-
预投票不会增加任期,但是发送 MsgPreVote 时填入的任期是竞选的任期
func (r *raft) campaign(t CampaignType) { // ...... // 根据选举的类型更新节点的状态 if t == campaignPreElection { r.becomePreCandidate() voteMsg = pb.MsgPreVote // PreVote RPCs are sent for the next term before we've incremented r.Term. term = r.Term + 1 } // ......
-
响应 MsgPreVote 时发送的任期是来自消息的任期,而不是本地节点的信息。两个原因:
一个是因为本地节点可能会被隔离,本地任期可能不正确;
-
第二个是因为 PreCandidate 没有修改自身的任期,只是使用竞选任期来填入 MsgPreVote 中,而节点会忽略落后的消息
// When responding to Msg{Pre,}Vote messages we include the term // from the message, not the local term. To see why, consider the // case where a single node was previously partitioned away and // it's local term is now out of date. If we include the local term // (recall that for pre-votes we don't update the local term), the // (pre-)campaigning node on the other end will proceed to ignore // the message (it ignores all out of date messages). // The term in the original message and current local term are the // same in the case of regular votes, but different for pre-votes. // 注意,响应 MsgPreVote 时的任期使用的是来自消息的任期,而不是本地的任期 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
预投票成功也无法保证节点会成为 leader,因为预投的下一步是进入正常投票,有可能会面临同样的集票失败的问题(选票瓜分,集群成员变化等)
-
Quorum
etcd-raft 在pr #3917(https://github.com/etcd-io/etcd/pull/3917)中实现
- 背景:如果出现 Partition 的情况,可能会导致出现多个 leader,而此时过期的 leader 可能还维持着和客户端的请求,就可能会出现延迟客户端请求的情况,因为无法将对应的日志复制给其他节点,达成 majority 一致的条件进行提交。为了避免该问题,etcd-raft支持开启 CheckQuorum 选项,在 election timeout 时发送一个 MsgCheckQuorum 信息来判断自己当前是否还处于 majority 的集群中;如果检查发现自己不在集群中,则 step down。
etcd 的实现:
-
election timout,给自己发送 MsgCheckQuorum
func (r *raft) tickHeartbeat() { r.heartbeatElapsed++ r.electionElapsed++ if r.electionElapsed >= r.electionTimeout { // 如果选举计时超时 r.electionElapsed = 0 // 重置计时器 if r.checkQuorum { // 给自己发送一条 MsgCheckQuorum 消息,检测是否出现网络隔离 r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) } // ...... }
-
处理 MsgCheckQuorum:
func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { //--------------------其他消息处理---------------------- case pb.MsgCheckQuorum: // 将 leader 自己的 RecentActive 状态设置为 true if pr := r.prs.Progress[r.id]; pr != nil { pr.RecentActive = true } if !r.prs.QuorumActive() { // 如果当前 leader 发现其不满足 quorum 的条件,则说明该 leader 有可能处于隔离状态,step down r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } // Mark everyone (but ourselves) as inactive in preparation for the next // CheckQuorum. r.prs.Visit(func(id uint64, pr *tracker.Progress) { if id != r.id { pr.RecentActive = false } }) return nil // ......
Quorum VS 预投票
- Quorum 解决的是由于网络分隔造成存在多个 leader 导致可能延迟客户端请求的情况;
- 预投票解决的是节点由于不同原因重新加入网络后发起不恰当的 Request Vote 导致当前可用 leader step down 的问题;
Leadership transfer
该部分的内容在 《CONSENSUS BRIDGING THEORY AND PRACTICE》这篇论文的 3.10 节中描述,etcd 在 pr #5809(https://github.com/etcd-io/etcd/pull/5809) 中实现.
- 背景:Leadership transfer 在以下两种类型的场景中可能有用:
- leader 必须关停,如需要维护,或者需要从当前集群中移除
- 当有更适合的节点来承当 leader 角色的时候
- 主动的移交 leader 角色有助于增强集群的鲁棒性,避免由于选举造成的时间和性能上的额外开销
- 算法:
- 前任 leader 停止接收新的客户端请求,使用常规日志复制机制发送它的日志条目到目标服务器;
- 前任 leader 发送一个 TimeoutNow 请求给目标服务器,目标服务器发起选举;
- 前任服务器保证目标服务器有其任期内所有的日志条目,选举投票保证了集群的安全性和可维护性;
- 存在的问题:
- 目标服务器可能在 leadership transfer 完成前失效:
- 如果在 election timeout 时间内未完成 leadership transfer,前任 leader 终止转换,并恢复接收客户端请求;
- 目标服务器可能在 leadership transfer 完成前失效:
etcd 的实现:
-
leader 在收到 leadership transfer 的消息后,在执行请求前先进行检查:
case pb.MsgTransferLeader: // 收到 leadership transfer if pr.IsLearner { r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) return nil } leadTransferee := m.From lastLeadTransferee := r.leadTransferee if lastLeadTransferee != None { // 如果当前已经设置了 transfer 对象 if lastLeadTransferee == leadTransferee { // 如果相同的节点已经在执行 transfer 了,直接返回 return nil } r.abortLeaderTransfer() // 中断之前的 transfer } if leadTransferee == r.id { return nil } // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed. r.electionElapsed = 0 // 重置 electionElapsed,用于检测 transfer 超时 r.leadTransferee = leadTransferee if pr.Match == r.raftLog.lastIndex() { // 判断如果日志已经完成复制,则发起 sendTimeoutNow r.sendTimeoutNow(leadTransferee) } else { r.sendAppend(leadTransferee) // 否则先完成日志复制 }
-
在此期间,leader 不再接受来自客户端的新请求
case pb.MsgProp: // 处理用户提交到 raft 的建议数据(propose) // ...... if r.leadTransferee != None { // 说明正在执行 leadership transferring,不能处理该消息 return ErrProposalDropped } // ......
-
目标节点收到来自 leader 的 MsgTimeoutNow 信息后,调用
campaign
方法开始选举,加入上下文来表示该轮选举是 leader trasferring,强制进行投票func stepFollower(r *raft, m pb.Message) error { switch m.Type { //...... case pb.MsgTimeoutNow: if r.promotable() { // 如果当前节点超时,则发起选举 // 判断 r.promotable 的原因:如果一个被踢出集群的节点收到了 MsgTimeoutNow,会增加任期,发起选举,扰乱集群的稳定 // Refer to:https://github.com/etcd-io/etcd/pull/6815 r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) // Leadership transfers never use pre-vote even if r.preVote is true; we // know we are not recovering from a partition so there is no need for the // extra round trip. // 常规选举超时应该通过 tickElection 触发 MsgHup,然后在 Step 函数中转为 candidate 状态并集票 // 这里是强制的 leadership transfer r.campaign(campaignTransfer) } else { r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From) }
func (r *raft) campaign(t CampaignType) { //-------------校验----------- // 实现 leadership transfer var ctx []byte if t == campaignTransfer { ctx = []byte(t) } r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) } }
-
如果 leadership transfer 超时,则 leader 中断执行,自己重新成为 leader
func (r *raft) tickHeartbeat() { r.heartbeatElapsed++ r.electionElapsed++ if r.electionElapsed >= r.electionTimeout { // 如果选举计时超时 r.electionElapsed = 0 // 重置计时器 // ...... // leadership transfer 执行超时,中断执行,自己重现变为 leader if r.state == StateLeader && r.leadTransferee != None { r.abortLeaderTransfer() } } // ...... }
只读请求
该部分的内容在 《CONSENSUS BRIDGING THEORY AND PRACTICE》这篇论文的 3.10 节中描述,etcd 在 pr #6275(https://github.com/etcd-io/etcd/pull/6275) 中实现;
- 背景:应用中只读请求占了很大的比例。与写日志相比,读请求不会导致服务器状态机状态的改变,因此可以绕过写日志这一步,从而获得大量的性能提升。
- 存在的问题:
- 绕过写日志部分可能会导致读取到过期的结果
- 算法:
- 如果 leader 在当前任期内还未提交过日志,则要先等待 leader 至少执行一次提交:
- Leader完整性只保证了leader必须拥有所有已经提交的日志,但是当某个节点刚刚成为 leader 时,它并不知道它的日志中到底有哪些是已经提交了的。因此必须执行一次提交来确认已提交日志的索引,同时也会将之前未提交的待提交日志进行提交;
- raft 通过给新 leader 提交一个空的 no-op entry 来完成提交索引的确认;
- 保存当前 commitIndex 为 readIndex;
- leader 给其他节点发送心跳检测,通过收到 majority 的成功响应来确认自己仍然是有效的 leader,则此时的 readIndex 依旧是整个集群中最大的提交索引
- leader 等待状态机执行提交的条目,直到 applied index >= readIndex,这保证了满足 linearizability
- leader 将结果返回到客户端
- 如果 leader 在当前任期内还未提交过日志,则要先等待 leader 至少执行一次提交:
etcd 的实现
-
以下结构用于保存 ReadIndex 的相关状态:
type readOnly struct { // 读请求的执行类型 option ReadOnlyOption // 保存所有未执行完成读请求的状态,键是请求的具体内容,值是请求的相关信息 pendingReadIndex map[string]*readIndexStatus // 保存未完成的读请求,读请求的队列,每一个读请求都会被加入到队列中 readIndexQueue []string } type readIndexStatus struct { req pb.Message // 请求的原始信息 index uint64 // 收到读请求时状态机的提交索引 acks map[uint64]bool // 对每个请求的最终响应 }
-
客户端发送读请求:
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) }
- rctx 是对该读请求的唯一标识,等价于 uid
-
在 StepLeader() 中处理 MsgReadIndex:
case pb.MsgReadIndex: // 处理只读请求 // ...... // Reject read only request when this leader has not committed any log entry at its term. if !r.committedEntryInCurrentTerm() { // leader 要先发送一条日志来确定最新 commitIndex return nil } // ...... // If more than the local vote is needed, go through a full broadcast. case ReadOnlySafe: r.readOnly.addRequest(r.raftLog.committed, m) // 保存 readIndex // The local node automatically acks the request. r.readOnly.recvAck(r.id, m.Entries[0].Data) // 对于 leader 节点,由于已经添加过读请求,所以这一步会自动响应 ack r.bcastHeartbeatWithCtx(m.Entries[0].Data) // 带上读请求的上下文通过心跳广播给其他节点 // ......
-
在 StepLeader() 中处理收到的心跳广播:
case pb.MsgHeartbeatResp: // ...... if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { return nil } // 返回 m 对应的 readIndex 之前的所有未响应的只读请求的结果,并更新readOnly结构体 rss := r.readOnly.advance(m) // 响应客户端 for _, rs := range rss { if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None { r.send(resp) } } // ......
调用 recvAck 是为了确定当前 leader 仍旧是集群的 leader,这意味着当前消息 m 对应的 readIndex 之前的所有消息都是集群确认过的。
-
r.readOnly.advance(m)
返回的是当前消息 m 对应的 readIndex 之前的所有未响应的读请求的结果,同时会更新 readOnly 相关的结构体// advance advances the read only request queue kept by the readonly struct. // It dequeues the requests until it finds the read only request that has // the same context as the given `m`. // advance 会将当前m对应的 readIndex 之前的所有未响应的消息都一次性返回 func (ro *readOnly) advance(m pb.Message) []*readIndexStatus { var ( i int found bool ) ctx := string(m.Context) rss := []*readIndexStatus{} for _, okctx := range ro.readIndexQueue { // 遍历队列 i++ rs, ok := ro.pendingReadIndex[okctx] if !ok { panic("cannot find corresponding read state from pending map") } rss = append(rss, rs) // 将遍历到的所有读请求加入到 rss 中 if okctx == ctx { // 一直读取到当前请求的 ctx,退出遍历 found = true break } } if found { ro.readIndexQueue = ro.readIndexQueue[i:] // 更新队列 for _, rs := range rss { // 删除已经处理过的读请求 delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data)) } return rss } return nil }
-
响应客户端:
// ReadState provides state for read only query. // It's caller's responsibility to call ReadIndex first before getting // this state from ready, it's also caller's duty to differentiate if this // state is what it requests through RequestCtx, eg. given a unique id as // RequestCtx type ReadState struct { Index uint64 RequestCtx []byte } // responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer // itself, a blank value will be returned. func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message { if req.From == None || req.From == r.id { // 当前节点是 leader,读请求直接发给 leader,则 leader 直接添加响应 r.readStates = append(r.readStates, ReadState{ Index: readIndex, RequestCtx: req.Entries[0].Data, }) return pb.Message{} } // 当前节点是 leader,读请求发给了follower,由follower转到 leader,则 leader 应该响应follower,由follower添加对客户端的响应 return pb.Message{ Type: pb.MsgReadIndexResp, To: req.From, Index: readIndex, Entries: req.Entries, } }
-
发给 follower 的读请求:
case pb.MsgReadIndex: // 发送给客户端的读请求,转发给 leader 来获取最新数据 if r.lead == None { return nil } m.To = r.lead r.send(m) // 转发给 leader case pb.MsgReadIndexResp: // leader 处理完由 follower 转发的读请求,将响应交给 follower,follower 响应客户端 if len(m.Entries) != 1 { return nil } r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) }
- 根据 raft 的论文,读请求的前 3 步依旧要由 leader 来处理,所以客户端先将请求转发给 leader,leader执行完前 3 步后,将结果交给 follower,由 follower 执行后续步骤
租约
在《CONSENSUS BRIDGING THEORY AND PRACTICE》这篇论文中出现过两次租约的概念,一次在集群配置变更时防止被移出配置的节点影响新集群中的节点,另一次则用于只读请求的优化
集群成员变更中投票规则优化
该部分内容在《CONSENSUS BRIDGING THEORY AND PRACTICE》的 4.2.3 节中描述,etcd 在引入 Lease Read 优化时,为保障 Lease Read 的正确执行而引入该部分优化
-
背景:
节点因为配置变更被移除新集群时,因为无法知晓自己已经被移除集群,被移除节点在下线前可能因为收不到心跳而发起请求投票,导致当前 leader step down,虽然最终新集群中的节点会被选为 leader,但是无疑浪费了很多时间和系统资源,这种情况是应该避免的
-
采用预投票的方式来确认自己是否有资格参与选举,这种办法无法彻底解决问题,因为存在这样一种情况:待下线的节点满足日志 up-to-date 的条件
- 如图所示,旧集群成员为 S1 - S4,S4 是 leader;
- 集群成员变更完成后,S1被移出新集群,S4当选为新集群的 leader,此时 S4 收到一个新条目,还未来得及提交,S1 也还未下线。S1 选举超时后发起选举,增加任期后发送 Request RPC,强制 S4 step down
算法:修改请求投票的判定机制,如果 leader 能够维持心跳,则说明该 leader 是合法的,不允许发起新的选举。即如果在minimum election timeout 时间内还收到来自当前 leader 的心跳检查,则拒绝投票
-
存在的问题:与 leadership transfer 的情况冲突
- 解决方式:在请求中加上上下文(special flag),用于在 RequestVote RPC 处理时进行判断
加入租约机制的只读请求
该部分的内容在 《CONSENSUS BRIDGING THEORY AND PRACTICE》这篇论文的 6.4.1 节中描述,etcd 在 pr #5468(https://github.com/etcd-io/etcd/pull/5468) 中对该优化方式进行了讨论和实现,在 pr #8525(https://github.com/etcd-io/etcd/pull/8525) 中对其进行了完善。
- 背景:之前对只读请求的优化绕过了写日志的磁盘操作,但是对每一条读请求都需要与 quorum 进行一次通信,增加了请求的延迟。通过在心跳机制中引入租约的概念来实现在一次心跳间隔内执行多条只读请求;
- 算法:
- 通过开启 quorum,一旦 leader 的心跳包被集群内的 majority 节点响应,则 leader 可以假设,在同一个 election timeout 间隔内,其他节点都不可能成为 leader。因此 leader 可以在心跳间隔内直接响应只读请求而不需要额外的通信;
- 存在的问题:
- 非常依赖时钟,不能保证线性一致;
- 收到心跳的节点可以保证在 election timeout 时间内不发起选举,但是没有收到的节点仍然有可能超时发起选举;
etcd 的实现
-
在加入租约机制之前,etcd 在
Step()
函数中,如果收到消息的任期小于当前节点的任期,则该消息会被直接丢弃。但是在实现了 quorum 和 Lease Read 之后,如果没有实现或开启预投票,会出现以下问题:- 如果某个节点因为网络分隔后任期增加,其重新加入集群后会发起 RequestVote RPC,此时如果没有实现或开启预投票,则投票 RPC 会打破 Lease Read 中心跳间隔内其余节点不可能成为 leader 的前提条件;
- 为解决该问题,变更了投票规则,引入了上一条中描述的请求投票的修正
func (r *raft) Step(m pb.Message) error { // ...... case m.Term > r.Term: if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { // 判断是否满足强制发起选举的条件(用于 leader transfer) force := bytes.Equal(m.Context, []byte(campaignTransfer)) inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout if !force && inLease { // If a server receives a RequestVote request within the minimum election timeout // of hearing from a current leader, it does not update its term or grant its vote return nil } } // -------其他 m.Term > r.Term 时需执行的逻辑 case m.Term < r.Term: // ignore other cases r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", r.id, r.Term, m.Type, m.From, m.Term) } // ......
-
修改请求投票判定机制后的实现依旧存在问题:当前实现中,如果收到消息的任期小于当前节点的任期,则该消息会被抛弃;
存在的问题为:某个节点在 partition 后重新加入集群,此时该节点拥有一个更高的任期,因此会忽略任何来自当前 leader 的消息(m.Term < r.Term),而对于开启了 quorum 和 Lease Reade 的其他节点,inLease 机制会使他们忽略来自 partition 节点的请求投票信息,故该节点会被永久的 stuck 住。
另外一个问题在 https://github.com/etcd-io/etcd/issues/8501 中描述,可能会产生集群死锁,选不出 leader;
-
为解决上述问题,etcd 对消息任期小于当前节点任期的情况的判定机制也进行了修正
case m.Term < r.Term: if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) { // 节点收到任期比自己小的节点的 heartbeat 或者 Append 消息,可能是自己因为网络(延迟或者分区)问题 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp}) } else if m.Type == pb.MsgPreVote { // Before Pre-Vote enable, there may have candidate with higher term, // but less log. After update to Pre-Vote, the cluster may deadlock if // we drop messages with a lower term. r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}) } else { // ignore other cases } // 其他情况下,消息任期小于当前任期,直接返回 return nil
-
有了以上机制的保障后,在处理只读请求时就可以使用基于租约的只读请求模式:
case ReadOnlyLeaseBased: if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { r.send(resp) } }
学习到的技巧和方法汇总
该部分总结了在学习过程中学到的方法和技巧(包括自己未使用过,或者曾经使用过但是没有很好掌握的)
设计思路
- etcd-raft 对不同状态的节点处理不同类型消息的实现方式非常值得学习;
- 引入入口函数:
- 整个 raft 被设计为一个大的状态机,任期是状态变更的关键属性,而不同的状态决定了处理消息的具体逻辑。即:任期判断是所有后续执行的前提;
- 入口函数的作用有两个:节点状态的判定和两类消息(状态变更相关消息和其他业务消息)处理逻辑的分离,其中节点状态变更又是由任期变化来驱动的,故而有了两阶段的设计,先判断任期,再处理消息;
- 这种设计思路有效的将代码按功能进行分块,按逻辑进行编排,避免了重复代码,也便于后续修改
- 使用回调函数实现不同角色对各类消息的处理逻辑:
- 复用了入口函数的部分,将所有前置判断交给入口函数来做,自己只需实现自己消息处理的部分;
- 引入入口函数:
常用方法
-
对于配置类,可以为该类添加一个 validate 方法,在方法中既可以完成默认值的设置,也可以完成参数的校验检查
type Config struct {} func (c *Config)validate() error {} func newRaft(c *Config) *raft { if err := c.validate(); err != nil { // c.validate() 中执行校验 panic(err.Error()) } }
-
枚举类型的使用:
type StateType uint64 const ( StateFollower StateType = iota StateCandidate StateLeader StatePreCandidate numStates ) var stmap = [...]string { "StateFollower", "StateCandidate", "StateLeader", "StatePreCandidate", } func (st StateType) String() { return stmap[uint64(st)] }
-
字符串的构造:
var buf strings.Builder for _, id := range ids { fmt.Fprintf(&buf, "%d: %s\n", id, m[id]) } return buf.String()
-
实现优雅的关停节点:
func (n *node) Stop() { select { case n.stop <- struct{}{}: // Not already stopped, so trigger it case <-n.done: // Node has already been stopped - no need to do anything return } // Block until the stop has been acknowledged by run() <-n.done } // run 方法中监听各个通道发来的信息 func (n *node) run() { //...... for { select { //-------监听其他通道------- case <- n.stop: close(n.done) return } } } // 关闭 n.done,会同时广播到其他监听 done 通道的地方 func (n *node) Tick() { select { //------监听其他通道------ case <-n.done: }
-
全局随机数的实现:
// lockedRand is a small wrapper around rand.Rand to provide // synchronization among multiple raft groups. Only the methods needed // by the code are exposed (e.g. Intn). type lockedRand struct { mu sync.Mutex rand *rand.Rand } func (r *lockedRand) Intn(n int) int { r.mu.Lock() v := r.rand.Intn(n) r.mu.Unlock() return v } var globalRand = &lockedRand{ rand: rand.New(rand.NewSource(time.Now().UnixNano())), }
- 单例模式,线程安全
参考资料
《CONSENSUS: BRIDGING THEORY AND PRACTICE》
《In Search of an Understandable Consensus Algorithm》
Raft 笔记