当server启动后,如果成为server,那么会向其他server定期发送心跳请求,并且在每次收到follower的心跳回复后,会根据follower与leader自己的日志对比将没发送的日志发送给follower,流程图如下:
由此可见,在leader每次发送给follower的心跳请求如下:
pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
其中包含leader的日志当前提交的索引,在follower收到该心跳请求时,会进行如下处理:
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
即follower会根据leader的心跳请求中的日志提交位置信息,将自己的日志提交索引设置到对应的位置,并发送心跳响应给leader。
当leader收到follower的心跳响应后,会比较该follower与leader日志的匹配位置pr.Match与leader日志的最新位置,如果两个位置不相等,说明还有日志需要发送给该follower,最终使得该follower的日志追上leader的日志。
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
sendAppend方法的核心代码如下:
m := pb.Message{}
//消息的发送目的server的id
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
...
//消息类型:附加日志消息
m.Type = pb.MsgApp
//上一次发送给该follower的日志索引
m.Index = pr.Next - 1
//上一次发送给该follower的日志的任期号
m.LogTerm = term
//要发送的日志条目
m.Entries = ents
//leader的日志提交位置
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
r.send(m)
pr.Next表示要发给该follower的下一条日志的索引,pr.Next-1表示上一次发给该follower的日志,因此r.raftLog.term(pr.Next - 1)表示上次发给该follower的日志的任期号,r.raftLog.entries(pr.Next, r.maxMsgSize)表示从从leader日志中取出要发给该follower的日志条目。
下面看下follower如何处理附加日志请求的,见raft.stepFollower方法,代码段如下:
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
handleAppendEntries方法如下:
func (r *raft) handleAppendEntries(m pb.Message) {
// m.Index表示leader发送给follower的上一条日志的索引位置,
// 如果当前follower在该位置的日志已经提交过了(有可能该leader是刚选举产生的,没有follower的日志信息,所以设置m.Index=0),
// 则把follower当前提日志交的索引位置告诉leader,让leader从该follower提交位置的下一条位置的日志开始发送给follower
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
//将日志追加到follower的日志中,可能存在冲突,因此需要先找到冲突的位置,然后用leader发送来的日志中从冲突位置开始覆盖follower的日志
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
//mlastIndex为follower已经追加好的最新日志的位置,追加成功后要把该信息告诉leader,以便leader会把该位置之后的日志再发送给该follower
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
// 如果leader与follower的日志还没有匹配上,那么把follower的最新日志的索引位置告诉leader,
// 以便leader下一次从该follower的最新日志位置之后开始尝试发送附加日志,直到leader与follower的日志匹配上了就能追加日志成功了
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
r.raftLog.maybeAppend方法是将leader发送过来的日志追加到follower本地的方法,但如果leader发送过来的日志与follower的日志无法匹配(follower在index位置的日志的任期号不是logTerm),则要把拒绝追加日志信息发送给leader,包含follower的最新日志位置,以便leader下一次从该位置尝试匹配,如下:
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
//index,logTerm为leader上次发送给该follower的日志索引和日志的term,committed是可以提交的日志索引,ents为发过来的日志条目
//只有follower能够匹配eader上次发送的日志索引和term,才能正常响应
if l.matchTerm(index, logTerm) {
//最新的日志索引
lastnewi = index + uint64(len(ents))
//获取冲突的日志索引,有些情况下leader发过来的日志不能直接追加,索引需要找到最新匹配的位置,从该位置之后的日志全部被leader覆盖
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
//取出从冲突的位置开始的日志,覆盖自己的日志,即出现冲突时以leader的日志为准
l.append(ents[ci-offset:]...)
}
//如果leader的已提交的日志索引大于leader复制给当前follower的最新日志的索引,说明follower落后了,对于这次复制来的日志全都直接提交,否则提交leader已经提交的日志索引的日志
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
下面看下leader收到follower的日志追加成功消息时如何处理,在raft的stepLeader方法中:
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
//如果处于ProgressStateReplicate状态,则将pr.Next降低为pr.Match + 1,以便去和follower的日志进行匹配
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
//如果处于ProgressStateReplicate状态,则转变为ProgressStateProbe状态,去探测follower的匹配位置
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
//m.Index为follower的最新日志索引位置,根据该位置更新pr.Match和pr.Next, pr.Match=m.Index, pr.Next=m.Index+1
if pr.maybeUpdate(m.Index) {
switch {
//一旦追加日志成功,则从ProgressStateProbe状态转变为ProgressStateReplicate状态,加快日志追加过程
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
//pr.ins用于限制发送消息的速率,当发送时将日志索引写入到pr.ins,pr.ins有数量限制,当发送消息收到回复后再把pr.ins中该发送成功日志的索引在pr.ins中移除掉
pr.ins.freeTo(m.Index)
}
//收到follower的日志追加成功响应后判断是否能commit一部分日志
if r.maybeCommit() {
//向其他follower发送commit日志消息
r.bcastAppend()
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
当leader收到附加日志拒绝消息时,说明p.Next太大了,而follower在p.Next-1位置的日志没有与follower匹配上,需要将p.Next降低为p.Match+1。然后转变为ProgressStateProbe状态,去探测follower与leader的日志匹配位置。
当leader收到附加日志成功消息时,则要更新pr.Match和pr.Next,m.Index是follower的最新日志位置,要设置pr.Match=m.Index, pr.Next=m.Index+1。每次附加日志成功,就尝试提交下可以提交的日志(r.maybeCommit()),如果日志复制到了过半数server,说明可以提交了,便向其他follower发送日志提交请求(r.bcastAppend())。
下面看下r.maybeCommit()方法:
func (r *raft) maybeCommit() bool {
mis := make(uint64Slice, 0, len(r.prs))
for id := range r.prs {
mis = append(mis, r.prs[id].Match)
}
//mis中保存着复制到每个server节点的日志索引,这里进行从大到小排序
sort.Sort(sort.Reverse(mis))
//如果节点数量为5,r.quorum()-1=2,则在5个节点中日志第三新的节点的最新日志索引就是复制到过半数节点的日志索引,这个位置的日志可以提交啦
mci := mis[r.quorum()-1]
//尝试提交日志
return r.raftLog.maybeCommit(mci, r.Term)
}