etcd-raft源码分析6-kvstore集群变更请求处理

在HttpKVAPI中kvstore的集群增加一个节点请求处理如下:

case r.Method == "POST":
    url, err := ioutil.ReadAll(r.Body)
    if err != nil {
        log.Printf("Failed to read on POST (%v)\n", err)
        http.Error(w, "Failed on POST", http.StatusBadRequest)
        return
    }

    nodeId, err := strconv.ParseUint(key[1:], 0, 64)
    if err != nil {
        log.Printf("Failed to convert ID for conf change (%v)\n", err)
        http.Error(w, "Failed on POST", http.StatusBadRequest)
        return
    }

    cc := raftpb.ConfChange{
        Type:    raftpb.ConfChangeAddNode,
        NodeID:  nodeId,
        Context: url,
    }
    h.confChangeC <- cc

    // As above, optimistic that raft will apply the conf change
    w.WriteHeader(http.StatusNoContent)

处理逻辑是向confChangeC通道写入增加节点消息,下面看下raftNode的routine中对该通道事件的处理:

//集群变更事件
        case cc, ok := <-rc.confChangeC:
            if !ok {
                rc.confChangeC = nil
            } else {
                confChangeCount += 1
                cc.ID = confChangeCount
                rc.node.ProposeConfChange(context.TODO(), cc)
            }
        }

调用了node的ProposeConfChange方法:

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
  data, err := cc.Marshal()
  if err != nil {
      return err
  }
  return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}

调用了node的Step方法,然后调用step方法,触发MsgProp事件:

func (n *node) step(ctx context.Context, m pb.Message) error {
  ch := n.recvc
  if m.Type == pb.MsgProp {
      //当是追加配置请求时ch为n.propc
      ch = n.propc
  }

  select {
  //当是追加配置请求时会向ch(即n.propc)通道写入数据(消息类型和数据)
  case ch <- m:
      return nil
  case <-ctx.Done():
      return ctx.Err()
  case <-n.done:
      return ErrStopped
  }
}

由此可见对于集群变更的处理是将集群变更信息写入n.propc通道,下面看下leader角色的node对于该通道数据的处理,在raft的stepLeader方法中:

case pb.MsgProp:
    //配置追加请求
    if len(m.Entries) == 0 {
        r.logger.Panicf("%x stepped empty MsgProp", r.id)
    }
    if _, ok := r.prs[r.id]; !ok {
        // If we are not currently a member of the range (i.e. this node
        // was removed from the configuration while serving as leader),
        // drop any new proposals.
        return
    }
    if r.leadTransferee != None {
        r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
        return
    }

    for i, e := range m.Entries {
        if e.Type == pb.EntryConfChange {
            if r.pendingConf {
                r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
                m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
            }
            r.pendingConf = true
        }
    }
    r.appendEntry(m.Entries...)
    r.bcastAppend()
    return

有以上代码可知是将该集群变更作为日志追加到本地r.appendEntry(m.Entries...),然后向其他follower发送附加日志rpc:r.bcastAppend()。

当该集群变更日志复制到过半数server后,raftNode提交日志的处理逻辑如下:

case raftpb.EntryConfChange:
        var cc raftpb.ConfChange
        cc.Unmarshal(ents[i].Data)
        rc.confState = *rc.node.ApplyConfChange(cc)
        switch cc.Type {
        case raftpb.ConfChangeAddNode:
            if len(cc.Context) > 0 {
                rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
            }
        case raftpb.ConfChangeRemoveNode:
            if cc.NodeID == uint64(rc.id) {
                log.Println("I've been removed from the cluster! Shutting down.")
                return false
            }
            rc.transport.RemovePeer(types.ID(cc.NodeID))
        }
    }

对于添加一个节点的处理,首先是更新集群变更的状态信息,如下:

func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
  var cs pb.ConfState
  select {
  case n.confc <- cc:
  case <-n.done:
  }
  select {
  case cs = <-n.confstatec:
  case <-n.done:
  }
  return &cs
}

会向n.confc通道写入集群变更消息,下面看node的处理:

case cc := <-n.confc:
        if cc.NodeID == None {
            r.resetPendingConf()
            select {
            case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
            case <-n.done:
            }
            break
        }
        switch cc.Type {
        case pb.ConfChangeAddNode:
            r.addNode(cc.NodeID)
        case pb.ConfChangeRemoveNode:
            // block incoming proposal when local node is
            // removed
            if cc.NodeID == r.id {
                propc = nil
            }
            r.removeNode(cc.NodeID)
        case pb.ConfChangeUpdateNode:
            r.resetPendingConf()
        default:
            panic("unexpected conf type")
        }
        select {
        case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
        case <-n.done:
        }

r.addNode的代码如下:

func (r *raft) addNode(id uint64) {
  r.pendingConf = false
  if _, ok := r.prs[id]; ok {
      // Ignore any redundant addNode calls (which can happen because the
      // initial bootstrapping entries are applied twice).
      return
  }

  r.setProgress(id, 0, r.raftLog.lastIndex()+1)
}

就是设置下该peer的发送日志进度信息。再回到raftNode中对于可提交的集群变更日志的处理,在更新完集群信息后执行了rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)}),即把该节点加入到当前集群,建立与该节点的通信,这块代码之前分析过了不再赘述。当该日志在其他follower也提交时,其他follower也会同样处理把这个新节点加入集群。

因此,只有集群变更日志在当前server被提交完成后,当前server才建立与新节点的通信,才知道集群的最新规模,在复制集群变更日志的过程中他们依然不知道集群的最新规模。但对于新节点来说,在启动式会知道老集群的节点信息,因此新节点启动后就知道了集群的最新规模。

总结如下,在ectd的raft实现中,处理集群变更的方案是:每次变更只能添加或删除一个节点,不能一次变更多个节点,因为每次变更一个节点能保证不会有多个leader同时产生,下面以下图为例分析下原因。

image.png

最初节点个数为3,即server1、server2、server3,最初leader为server3,如果有2个节点要加入到集群,那么在原来的3个节点收到集群变更请求前认为集群中有3个节点,确切的说是集群变更日志提交前,假如server3作为leader,把集群变更日志发送到了server4、server5,这样可以提交该集群变更日志了,因此server3、server4、server5的集群变更日志提交后他们知道当前集群有5个节点了。而server1和server2还没收到集群变更日志或者收到了集群变更日志但没有提交,那么server1和server2认为集群中还是有3个节点。假设此时server3因为网络原因重新发起选举,server1也同时发起选举,server1在收到server2的投票赞成响应而成为leader,而server3可以通过server4和server5也可以成为leader,这时出现了两个leader,是不安全且不允许的。

但如果每次只添加或减少1个节点,假设最初有3个节点,有1个节点要加入。最初leader为server1,在server1的集群变更日志提交前,server1、server2、server3认为集群中有3个节点,只有server4认为集群中有4个节点,如果leader在server1、server2、server3中产生,那么必然需要2个server,而server4只能收到server1、server2、server3中1个server的响应,是无法成为leader的,因为server4认为集群中有4个节点,需要3个节点同意才能成为leader。如果在server1是leader时该集群变更日志提交了,那么集群中至少有2个server有该集群变更日志了,假如server1和server2都有该集群变更日志了,server3和server4还没有,那么server3和server4不可能被选为leader,因为他们的日志不够新。对于server4来说需要3个server同时同意才能成为leader,而server1和server2的日志比他新,不会同意他成为leader。对于server3来说,在集群变更日志提交前他认为集群中只有3个server,因此只会把投票请求发送给server1和server2,而server1和server2因为日志比他新不会同意;如果server3的集群变更日志也提交了,那么他人为集群中有4个节点,这时与server4一样,需要3个server同时同意才能成为leader,如果server1通过server2成为leader了,那么server1和server2都不会参与投票了。

因此每次一个节点的加入不管在集群变更日志提交前、提交过程中还是提交后都不会出现两个leader的情况。

提交前是因为原来的节点不知道这个新的节点,不会发送投票给他,也不会处理新节点的投票请求;

提交后是因为大家都知道集群的最新规模了,不会产生两个大多数的投票;

提交过程中是因为没有这条集群变更日志的server由于日志不够新也不能成为leader,比如最初集群规模是2n+1,现在有1个新节点加入,如果集群变更日志复制到了过半数server,因为之前的leader是老的集群的,因此过半数是n+1,假如这个n+1个server中产生了一个leader,那么对于新的节点来说,因为集群变更日志还没有应用到状态机所以只有这个新节点认为集群中有2n+2个server,因此需要n+2个server同意投票他才能成为leader,但这是不可能的,因为已经有n+1个节点已经投过票了,而对于老集群中的剩下的没有投票的n个节点中,他们任何一个server都需要n+1个server同意才能成为leader,而他们因为还没有把集群变更日志真正提交即应用到状态机,还不知道新节点的存在,也就不能收到n+1个server投票,最多只能收到n个节点的投票,因此也不能成为leader,保证了只能有一个leader被选出来。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容