raft 结构
网络层
首先etcd最外层有一个网络层,负责与集群其他节点通信或者接受客户端的请求,这里我们主要学习raft模块不详细解读,使用就用网络层来代替。
node
node负责raft于网络层的交互,交互使用go的chan
propc: make(chan msgWithResult), //接收网络层MsgProp类型消息
recvc: make(chan pb.Message), //接收网络层除MsgProp类型以外的消息
confc: make(chan pb.ConfChange),//接收EntryConfChange类型消息比如动态添加节点
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),//向上层返回 ready
advancec: make(chan struct{}),//上层处理往ready后返回给raft的消息
// make tickc a buffered chan, so raft node can buffer some ticks when the node
// is busy processing raft messages. Raft node will resume process buffered
// ticks when it becomes idle.
tickc: make(chan struct{}, 128),//管理超时的管道
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
可以看到定义来许多管道来交互信息
raft
raft主要处理raft算法的实现,和日志的复制。
raft会存储当前节点的id,任期,投票等。
其中raftlog用来管理日志,可以看到raftlog下有两个类。
一个日志的流程是这样的:
- 客户端发送请求,生成日志会先进入到unstable模块,如名字所表示,它是不安全的,因为日志还没有进行持久化。
- 将unstable中的日志同步给其他节点,同时交给上层持久化。node也负责和上层通信。
- 如果日志已经被半数以上的节点复制成功了,那么这部分日志将会被认为提交成功。raft会修改raftlog中的committed记录提交的index。
- 提交成功后raft会将提交成功的日志返回给上层,上层会应用日志,然后响应给客户端成功,同时raft也会同步给其他节点让他们也应用日志,然后修改自己的applied记录应用的index。
启动node
创建node
创建node之前先会读取持久化这磁盘中的日志,根据是否有日志和是否是一个新集群,有三种情况
- 没有日志并且是新集群,直接调用startNode
- 没有日志不是新集群,会先与其他节点通信然后更新配置调用straNode
- 有日志,说明是节点重启,读取日志调用restartNode
startNode 和 restartNode其实差别不大,都会构建一个raft的配置
c := &raft.Config{
ID: uint64(id),//当前节点id
ElectionTick: cfg.ElectionTicks,//选举用超时时间
HeartbeatTick: 1,//心跳间隔
Storage: s,//就是MemorySorage
MaxSizePerMsg: maxSizePerMsg, //每次发消息的最大size
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,//上文提到的pre模式
}
除了Storage参数,其他都是一些配置的属性。在straNode中
Storage是调用 s = raft.NewMemoryStorage() 来创建的一个空的初始对象。而restartNode中会用Storage加载从日志中读取的数据。来还原服务宕机前的状态。
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
创建raft
接下来调用raft.StartNode(c, peers)来创建raft对象。c就是上面的raft.Config,peers记录集群中所有节点的id。
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = r.raftLog.lastIndex()
- 先调用newRaft创建raft对象,newRaft方法中会先Storage中读取信息。然后创建raft对象,然后判断Storage是不是空,如果是重启节点,那这里Storage就会有数据,然后更新raft的数据。
if !isHardStateEqual(hs, emptyState) {
r.loadState(hs)
}
if c.Applied > 0 {
raftlog.appliedTo(c.Applied)
}
/-------------------------------------------/
func (r *raft) loadState(state pb.HardState) {
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
}
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote
/------------------------------------------/
func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
}
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
}
}
r.loadState(hs) 里面会更新raft记录的任期和投票,raftLog中的提交index。
appliedTo 会一个raftLog的应用index。
之后会调用r.becomeFollower(1, None)修改节点状态变成follower
同样其他状态也有相应的函数:
func (r *raft) becomeFollower(term uint64, lead uint64)
func (r *raft) becomeCandidate()
func (r *raft) becomePreCandidate()
func (r *raft) becomeLeader()
然后如果是新集群,则会将集群的节点信息追加到日志中,并提交追加的日志,这部分用于重启时能从日志中读取到集群的节点信息。
最后创建node 开启协程启动node ,创建node没有什么操作就是创建对象。
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
run
func (n *node) run(r *raft) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready
lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState
for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
if lead != r.lead {
if r.hasLeader() {
if lead == None {
propc = n.propc
} else {
propc = nil
}
lead = r.lead
}
select {
case pm := <-propc:
err := r.Step(m)
case m := <-n.recvc:
r.Step(m)
case cc := <-n.confc:
case <-n.tickc:
r.tick()
case readyc <- rd:
advancec = n.advancec
case <-advancec:
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
}
上面的代码是删除来很多逻辑只保留基本结构的。
主要用于从管道读取消息然后传递给raft。
这里重点就是,raft是不能主动给发消息的,只能是上层应用自己来拉取。
第一次进入循环advancec为空,会调用
newReady(r, prevSoftSt, prevHardSt) 这个函数的返回就是raft的状态变化和要发送的消息。
rd := Ready{
Entries: r.raftLog.unstableEntries(),//unstable中的日志交给上层持久化
CommittedEntries: r.raftLog.nextEnts(),//已经提交待应用的日志,交给上层应用
Messages: r.msgs,//raft要发送的消息
}
node拿到这个对象后会通过readyc通道发送给上层,然后记录这次的状态已用于下次调用Ready的时候判断状态是否变化。
之后会等待advancec通道,如果要消息则说明上次的消息已经处理完成,可以修改自己的状态了,比如日志应用完成修改自己的应用index。
这就是raft启动的过程。下一个文章讲解选举的流程