etcd raft模块解析(二)

raft 结构

raft.png
网络层

首先etcd最外层有一个网络层,负责与集群其他节点通信或者接受客户端的请求,这里我们主要学习raft模块不详细解读,使用就用网络层来代替。

node

node负责raft于网络层的交互,交互使用go的chan

propc:      make(chan msgWithResult), //接收网络层MsgProp类型消息
recvc:      make(chan pb.Message), //接收网络层除MsgProp类型以外的消息
confc:      make(chan pb.ConfChange),//接收EntryConfChange类型消息比如动态添加节点
confstatec: make(chan pb.ConfState),
readyc:     make(chan Ready),//向上层返回 ready 
advancec:   make(chan struct{}),//上层处理往ready后返回给raft的消息
// make tickc a buffered chan, so raft node can buffer some ticks when the node
// is busy processing raft messages. Raft node will resume process buffered
// ticks when it becomes idle.
tickc:  make(chan struct{}, 128),//管理超时的管道
done:   make(chan struct{}), 
stop:   make(chan struct{}),
status: make(chan chan Status),

可以看到定义来许多管道来交互信息

raft

raft主要处理raft算法的实现,和日志的复制。
raft会存储当前节点的id,任期,投票等。
其中raftlog用来管理日志,可以看到raftlog下有两个类。
一个日志的流程是这样的:

  • 客户端发送请求,生成日志会先进入到unstable模块,如名字所表示,它是不安全的,因为日志还没有进行持久化。
  • 将unstable中的日志同步给其他节点,同时交给上层持久化。node也负责和上层通信。
  • 如果日志已经被半数以上的节点复制成功了,那么这部分日志将会被认为提交成功。raft会修改raftlog中的committed记录提交的index。
  • 提交成功后raft会将提交成功的日志返回给上层,上层会应用日志,然后响应给客户端成功,同时raft也会同步给其他节点让他们也应用日志,然后修改自己的applied记录应用的index。

启动node

创建node

创建node之前先会读取持久化这磁盘中的日志,根据是否有日志和是否是一个新集群,有三种情况

  • 没有日志并且是新集群,直接调用startNode
  • 没有日志不是新集群,会先与其他节点通信然后更新配置调用straNode
  • 有日志,说明是节点重启,读取日志调用restartNode
    startNode 和 restartNode其实差别不大,都会构建一个raft的配置
c := &raft.Config{
        ID:              uint64(id),//当前节点id
        ElectionTick:    cfg.ElectionTicks,//选举用超时时间
        HeartbeatTick:   1,//心跳间隔
        Storage:         s,//就是MemorySorage
        MaxSizePerMsg:   maxSizePerMsg, //每次发消息的最大size
        MaxInflightMsgs: maxInflightMsgs,
        CheckQuorum:     true,
        PreVote:         cfg.PreVote,//上文提到的pre模式
    }

除了Storage参数,其他都是一些配置的属性。在straNode中
Storage是调用 s = raft.NewMemoryStorage() 来创建的一个空的初始对象。而restartNode中会用Storage加载从日志中读取的数据。来还原服务宕机前的状态。

    s := raft.NewMemoryStorage()
    if snapshot != nil {
        s.ApplySnapshot(*snapshot)
    }

创建raft

接下来调用raft.StartNode(c, peers)来创建raft对象。c就是上面的raft.Config,peers记录集群中所有节点的id。

func StartNode(c *Config, peers []Peer) Node {
    r := newRaft(c)
    // become the follower at term 1 and apply initial configuration
    // entries of term 1
    r.becomeFollower(1, None)
    for _, peer := range peers {
        cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
        d, err := cc.Marshal()
        if err != nil {
            panic("unexpected marshal error")
        }
        e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
        r.raftLog.append(e)
    }
    // Mark these initial entries as committed.
    // TODO(bdarnell): These entries are still unstable; do we need to preserve
    // the invariant that committed < unstable?
    r.raftLog.committed = r.raftLog.lastIndex()
  • 先调用newRaft创建raft对象,newRaft方法中会先Storage中读取信息。然后创建raft对象,然后判断Storage是不是空,如果是重启节点,那这里Storage就会有数据,然后更新raft的数据。
if !isHardStateEqual(hs, emptyState) {
        r.loadState(hs)
    }
    if c.Applied > 0 {
        raftlog.appliedTo(c.Applied)
    }
/-------------------------------------------/
func (r *raft) loadState(state pb.HardState) {
    if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
        r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
    }
    r.raftLog.committed = state.Commit
    r.Term = state.Term
    r.Vote = state.Vote
/------------------------------------------/
func (l *raftLog) appliedTo(i uint64) {
    if i == 0 {
        return
    }
    if l.committed < i || i < l.applied {
        l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
    }
    l.applied = i
}
}

r.loadState(hs) 里面会更新raft记录的任期和投票,raftLog中的提交index。
appliedTo 会一个raftLog的应用index。
之后会调用r.becomeFollower(1, None)修改节点状态变成follower
同样其他状态也有相应的函数:

func (r *raft) becomeFollower(term uint64, lead uint64) 
func (r *raft) becomeCandidate() 
func (r *raft) becomePreCandidate() 
func (r *raft) becomeLeader() 

然后如果是新集群,则会将集群的节点信息追加到日志中,并提交追加的日志,这部分用于重启时能从日志中读取到集群的节点信息。
最后创建node 开启协程启动node ,创建node没有什么操作就是创建对象。

    n := newNode()
    n.logger = c.Logger
    go n.run(r)
    return &n

run

func (n *node) run(r *raft) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

lead := None
prevSoftSt := r.softState()
  prevHardSt := emptyState

  for {
     if advancec != nil {
        readyc = nil
     } else {
        rd = newReady(r, prevSoftSt, prevHardSt)
        if rd.containsUpdates() {
           readyc = n.readyc
        } else {
           readyc = nil
        }
     }

     if lead != r.lead {
        if r.hasLeader() {
           if lead == None {
           propc = n.propc
        } else {
           propc = nil
        }
        lead = r.lead
     }

  select {
     case pm := <-propc:
        err := r.Step(m)
     case m := <-n.recvc:
           r.Step(m)
     case cc := <-n.confc:
     case <-n.tickc:
        r.tick()
     case readyc <- rd:
        advancec = n.advancec
     case <-advancec:
        advancec = nil
     case c := <-n.status:
        c <- getStatus(r)
     case <-n.stop:
        close(n.done)
        return
}
  }
}

上面的代码是删除来很多逻辑只保留基本结构的。
主要用于从管道读取消息然后传递给raft。
这里重点就是,raft是不能主动给发消息的,只能是上层应用自己来拉取。
第一次进入循环advancec为空,会调用
newReady(r, prevSoftSt, prevHardSt) 这个函数的返回就是raft的状态变化和要发送的消息。

rd := Ready{
   Entries:          r.raftLog.unstableEntries(),//unstable中的日志交给上层持久化
 CommittedEntries: r.raftLog.nextEnts(),//已经提交待应用的日志,交给上层应用
 Messages:         r.msgs,//raft要发送的消息
}

node拿到这个对象后会通过readyc通道发送给上层,然后记录这次的状态已用于下次调用Ready的时候判断状态是否变化。
之后会等待advancec通道,如果要消息则说明上次的消息已经处理完成,可以修改自己的状态了,比如日志应用完成修改自己的应用index。

这就是raft启动的过程。下一个文章讲解选举的流程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • 寻找一种易于理解的一致性算法(扩展版) 摘要 Raft 是一种为了管理复制日志的一致性算法。它提供了和 Paxos...
    枝叶君阅读 2,625评论 0 15
  • 寻找一种易于理解的一致性算法(扩展版) 摘要 Raft 是一种为了管理复制日志的一致性算法。它提供了和 Paxos...
    yflau阅读 940评论 0 1
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,813评论 0 5
  • 你们有没有遇到过一种很聊得来的人? 她说她拉小提琴,你说你喜欢大卫格瑞特的《Viva La Vida》,还喜欢2c...
    王龙Loong阅读 426评论 0 0
  • a市,一个繁华又危险的地方 很不幸,墨初月就出生在这里,作为“黑色媚影”慕韵雪的独生子,他从小就接受杀手...
    大杯口口可乐阅读 156评论 0 0