etcd-raft源码分析4-leader心跳和日志追加

当server启动后,如果成为server,那么会向其他server定期发送心跳请求,并且在每次收到follower的心跳回复后,会根据follower与leader自己的日志对比将没发送的日志发送给follower,流程图如下:

leader心跳和日志追加.png

由此可见,在leader每次发送给follower的心跳请求如下:

pb.Message{
    To:      to,
    Type:    pb.MsgHeartbeat,
    Commit:  commit,
    Context: ctx,
}

其中包含leader的日志当前提交的索引,在follower收到该心跳请求时,会进行如下处理:

func (r *raft) handleHeartbeat(m pb.Message) {
  r.raftLog.commitTo(m.Commit)
  r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

即follower会根据leader的心跳请求中的日志提交位置信息,将自己的日志提交索引设置到对应的位置,并发送心跳响应给leader。
当leader收到follower的心跳响应后,会比较该follower与leader日志的匹配位置pr.Match与leader日志的最新位置,如果两个位置不相等,说明还有日志需要发送给该follower,最终使得该follower的日志追上leader的日志。

if pr.Match < r.raftLog.lastIndex() {
        r.sendAppend(m.From)
    }

sendAppend方法的核心代码如下:

m := pb.Message{}
//消息的发送目的server的id
m.To = to

term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
...

//消息类型:附加日志消息
m.Type = pb.MsgApp
//上一次发送给该follower的日志索引
m.Index = pr.Next - 1
//上一次发送给该follower的日志的任期号
m.LogTerm = term
//要发送的日志条目
m.Entries = ents
//leader的日志提交位置
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
    switch pr.State {
    // optimistically increase the next when in ProgressStateReplicate
    case ProgressStateReplicate:
        last := m.Entries[n-1].Index
        pr.optimisticUpdate(last)
        pr.ins.add(last)
    case ProgressStateProbe:
        pr.pause()
    default:
        r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
    }
}
r.send(m)

pr.Next表示要发给该follower的下一条日志的索引,pr.Next-1表示上一次发给该follower的日志,因此r.raftLog.term(pr.Next - 1)表示上次发给该follower的日志的任期号,r.raftLog.entries(pr.Next, r.maxMsgSize)表示从从leader日志中取出要发给该follower的日志条目。

下面看下follower如何处理附加日志请求的,见raft.stepFollower方法,代码段如下:

case pb.MsgApp:
    r.electionElapsed = 0
    r.lead = m.From
    r.handleAppendEntries(m)

handleAppendEntries方法如下:

func (r *raft) handleAppendEntries(m pb.Message) {
    // m.Index表示leader发送给follower的上一条日志的索引位置,
    // 如果当前follower在该位置的日志已经提交过了(有可能该leader是刚选举产生的,没有follower的日志信息,所以设置m.Index=0),
    // 则把follower当前提日志交的索引位置告诉leader,让leader从该follower提交位置的下一条位置的日志开始发送给follower
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }
    //将日志追加到follower的日志中,可能存在冲突,因此需要先找到冲突的位置,然后用leader发送来的日志中从冲突位置开始覆盖follower的日志
    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        //mlastIndex为follower已经追加好的最新日志的位置,追加成功后要把该信息告诉leader,以便leader会把该位置之后的日志再发送给该follower
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
        r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
        // 如果leader与follower的日志还没有匹配上,那么把follower的最新日志的索引位置告诉leader,
        // 以便leader下一次从该follower的最新日志位置之后开始尝试发送附加日志,直到leader与follower的日志匹配上了就能追加日志成功了
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
    }
}

r.raftLog.maybeAppend方法是将leader发送过来的日志追加到follower本地的方法,但如果leader发送过来的日志与follower的日志无法匹配(follower在index位置的日志的任期号不是logTerm),则要把拒绝追加日志信息发送给leader,包含follower的最新日志位置,以便leader下一次从该位置尝试匹配,如下:

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    //index,logTerm为leader上次发送给该follower的日志索引和日志的term,committed是可以提交的日志索引,ents为发过来的日志条目
    //只有follower能够匹配eader上次发送的日志索引和term,才能正常响应
    if l.matchTerm(index, logTerm) {
        //最新的日志索引
        lastnewi = index + uint64(len(ents))
        //获取冲突的日志索引,有些情况下leader发过来的日志不能直接追加,索引需要找到最新匹配的位置,从该位置之后的日志全部被leader覆盖
        ci := l.findConflict(ents)
        switch {
        case ci == 0:
        case ci <= l.committed:
            l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
        default:
            offset := index + 1
            //取出从冲突的位置开始的日志,覆盖自己的日志,即出现冲突时以leader的日志为准
            l.append(ents[ci-offset:]...)
        }
        //如果leader的已提交的日志索引大于leader复制给当前follower的最新日志的索引,说明follower落后了,对于这次复制来的日志全都直接提交,否则提交leader已经提交的日志索引的日志
         l.commitTo(min(committed, lastnewi))
        return lastnewi, true
    }
    return 0, false
}

下面看下leader收到follower的日志追加成功消息时如何处理,在raft的stepLeader方法中:

case pb.MsgAppResp:
    pr.RecentActive = true

    if m.Reject {
        r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
            r.id, m.RejectHint, m.From, m.Index)
        //如果处于ProgressStateReplicate状态,则将pr.Next降低为pr.Match + 1,以便去和follower的日志进行匹配
        if pr.maybeDecrTo(m.Index, m.RejectHint) {
            r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
            //如果处于ProgressStateReplicate状态,则转变为ProgressStateProbe状态,去探测follower的匹配位置
            if pr.State == ProgressStateReplicate {
                pr.becomeProbe()
            }
            r.sendAppend(m.From)
        }
    } else {
        oldPaused := pr.IsPaused()
        //m.Index为follower的最新日志索引位置,根据该位置更新pr.Match和pr.Next, pr.Match=m.Index, pr.Next=m.Index+1
        if pr.maybeUpdate(m.Index) {
            switch {
            //一旦追加日志成功,则从ProgressStateProbe状态转变为ProgressStateReplicate状态,加快日志追加过程
            case pr.State == ProgressStateProbe:
                pr.becomeReplicate()
            case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
                r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
                pr.becomeProbe()
            case pr.State == ProgressStateReplicate:
                //pr.ins用于限制发送消息的速率,当发送时将日志索引写入到pr.ins,pr.ins有数量限制,当发送消息收到回复后再把pr.ins中该发送成功日志的索引在pr.ins中移除掉
                pr.ins.freeTo(m.Index)
            }

            //收到follower的日志追加成功响应后判断是否能commit一部分日志
            if r.maybeCommit() {
                //向其他follower发送commit日志消息
                r.bcastAppend()
            } else if oldPaused {
                // update() reset the wait state on this node. If we had delayed sending
                // an update before, send it now.
                r.sendAppend(m.From)
            }
            // Transfer leadership is in progress.
            if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
                r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
                r.sendTimeoutNow(m.From)
            }
        }
    }

当leader收到附加日志拒绝消息时,说明p.Next太大了,而follower在p.Next-1位置的日志没有与follower匹配上,需要将p.Next降低为p.Match+1。然后转变为ProgressStateProbe状态,去探测follower与leader的日志匹配位置。

当leader收到附加日志成功消息时,则要更新pr.Match和pr.Next,m.Index是follower的最新日志位置,要设置pr.Match=m.Index, pr.Next=m.Index+1。每次附加日志成功,就尝试提交下可以提交的日志(r.maybeCommit()),如果日志复制到了过半数server,说明可以提交了,便向其他follower发送日志提交请求(r.bcastAppend())。

下面看下r.maybeCommit()方法:

func (r *raft) maybeCommit() bool {
   mis := make(uint64Slice, 0, len(r.prs))
  for id := range r.prs {
      mis = append(mis, r.prs[id].Match)
  }
  //mis中保存着复制到每个server节点的日志索引,这里进行从大到小排序
  sort.Sort(sort.Reverse(mis))
  //如果节点数量为5,r.quorum()-1=2,则在5个节点中日志第三新的节点的最新日志索引就是复制到过半数节点的日志索引,这个位置的日志可以提交啦
  mci := mis[r.quorum()-1]
  //尝试提交日志
  return r.raftLog.maybeCommit(mci, r.Term)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容