ZooKeeper集群选举

初始化

在集群模式节点启动时,调用QuorumPeer#createElectionAlgorithm方法,创建选举算法。默认的算法类型为3,也就是

case 3:
    qcm = createCnxnManager();
    QuorumCnxManager.Listener listener = qcm.listener;
    if(listener != null){
        listener.start();
        le = new FastLeaderElection(this, qcm);
    } else {
        LOG.error("Null listener when initializing cnx manager");
    }
    break;
  1. 创建QuorumCnxManager实例,该类使用TCP实现了Leader选举的连接管理器。它为每对服务器维护一个连接。并且确保每对正确运行并且可以通过网络进行通信的服务器都有一个连接(彼此之间只有一个连接)。如果两台服务器试图同时启动一个连接,那么连接管理器将使用一种非常简单的连接中断机制,根据双方的Sid大小决定放弃哪个连接。
    对于每个peer,管理器维护要发送的消息队列。如果到任何特定peer的连接中断,则发送方线程将消息放回到列表中。由于此实现目前使用队列实现来维护要发送到另一个peer的消息,因此我们将消息添加到队列尾部,从而更改消息的顺序。

  2. 然后启动连接监听器
    这个连接监听器是专为选举而生的,在QuorumCnxManager#handleConnection方法,可以看到

    // 接收的连接来自比自己的id小的,直接关闭,这样就能保证节点之间只有一个连接
    if (sid < this.mySid) {
      SendWorker sw = senderWorkerMap.get(sid);
      if (sw != null) {
        sw.finish();
      }
    
      closeSocket(sock);
      connectOne(sid);
    } else {
      // 发送线程
      SendWorker sw = new SendWorker(sock, sid);
      // 接收线程
      RecvWorker rw = new RecvWorker(sock, din, sid, sw);
      sw.setRecv(rw);
    
      SendWorker vsw = senderWorkerMap.get(sid);
    
      if(vsw != null)
        vsw.finish();
    
      senderWorkerMap.put(sid, sw);
      queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
    
      sw.start();
      rw.start();
    
      return;
    }
    
    • 接收的连接来自比自己的sid小的,直接关闭,这样就能保证节点之间只有一个连接
    • 为新连接启动两个线程:SendWorkerRecvWorker用于选举投票的接收和发送
  3. 创建选举算法FastLeaderElection
    基于TCP连接实现的Leader选举算法,在FastLeaderElection的构造函数中,调用FastLeaderElection#starter方法启动

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
      this.self = self;
      proposedLeader = -1;
      proposedZxid = -1;
    
      sendqueue = new LinkedBlockingQueue<ToSend>();
      recvqueue = new LinkedBlockingQueue<Notification>();
      this.messenger = new Messenger(manager);
    }
    

    注意到构建了一个Messenger实例,其是选举算法的消息发送、接收器,其构造方法中初始化了发送、接收线程

    Messenger(QuorumCnxManager manager) {
      this.ws = new WorkerSender(manager);
      Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
      t.setDaemon(true);
      t.start();
    
      this.wr = new WorkerReceiver(manager);
    
      t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
      t.setDaemon(true);
      t.start();
    }
    

选举

当集群没有Leader时,或者集群重新启动,集群所有节点状态为LOOKING,节点会进入选举状态

// QuorumPeer#run
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());

以默认的FastLeaderElection#lookForLeader为例,该方法开始新一轮Leader选举。每当QuorumPeer将其状态更改为LOOKING时,就会调用此方法,并向所有其他peers发送通知。

拆解lookForLeader方法

  1. 创建接受的投票和发送投票的集合
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
  1. 更新自己的投票,也就是优先给自己投票
    同时在增加逻辑时钟,表明开始新纪元

    synchronized(this){
     //增加逻辑时钟
      logicalclock.incrementAndGet();
      updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }
    
  2. 更新本地投票后,给其他节点发送通知

    sendNotifications();
    

    这一步对每个peer节点创建一个消息,存放在sendqueue中,这时候messenger实例中WorkerSender线程将发送数据

    // Messenger.WorkerSender#run
    public void run() {
      while (!stop) {
        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
        if(m == null) continue;
        process(m);
      }
    }
    void process(ToSend m) {
      ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
                                          m.leader,
                                          m.zxid, 
                                          m.electionEpoch, 
                                          m.peerEpoch);
      manager.toSend(m.sid, requestBuffer);
    }
    

    将调用QuorumCnxManager#toSend方法发送数据(加入队列)

    public void toSend(Long sid, ByteBuffer b) {
      if (this.mySid == sid) {
        b.position(0);
        addToRecvQueue(new Message(b.duplicate(), sid));
      } else {
        ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
        ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
        if (bqExisting != null) {
          addToSendQueue(bqExisting, b);
        } else {
          addToSendQueue(bq, b);
        }
        connectOne(sid);
      }
    }
    

    可以看到将消息加入到queueSendMap中所对应的sid的队列中,那这个队列的数据肯定由QuorumCnxManager#handleConnection方法中为每个peer创建的发送线程SendWorker发送。在QuorumCnxManager.SendWorker#run方法中

    ByteBuffer b = null;
    try {
      ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
      if (bq != null) {
        b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
      } else {
        break;
      }
    
      if(b != null) {
        // lastMessageSent是为了确保消息正确发送
        lastMessageSent.put(sid, b);
        // 发送消息
        send(b);
      }
    } 
    

    可以看到到这里将消息发送给对应的其他的peer。

    于此同时RecvWorker也在接收其他peer的投票,并将投票加入recvQueue队列

    FastLeaderElection.Messenger.WorkerReceiver线程会调用QuorumCnxManager#pollRecvQueue方法,将Message从中取出,并封装成Notification,投递到recvqueue队列

  3. 进入循环,不断交换投票,直到找到新的Leader

    1. 从接收队列中取出一个通知
      recvQueue由上面WorkerReceiver线程封装并投递的

      Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
      
    2. 处理通知
      处理投票要先判断epoch

      if (n.electionEpoch > logicalclock.get()) {
        // 1 确保大家都在同一轮投票中
        logicalclock.set(n.electionEpoch);
        recvset.clear();
        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                               getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
          updateProposal(n.leader, n.zxid, n.peerEpoch);
        } else {
          updateProposal(getInitId(),
                         getInitLastLoggedZxid(),
                         getPeerEpoch());
        }
        sendNotifications();
      } else if (n.electionEpoch < logicalclock.get()) {
        // 2
        break;
      } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                     proposedLeader, proposedZxid, proposedEpoch)) {
        //3
        updateProposal(n.leader, n.zxid, n.peerEpoch);
        sendNotifications();
      }
      
      • 外部投票的epoch大于内部投票。更新自己的logicalclock,并且清空所有已经收到的投票,然后进行选票比较。最终再将内部投票发送出去。更新logicalclock的目的是为了确保大家都在同一轮投票。

      • 外部投票的epoch小于内部投票。还是上一轮投票,忽略

      • 外部投票的epoch等于内部投票。同一轮投票,进行选票比较。如果为其投票,则更新本地投票,同时给其他节点发送通知

      只有在同一个选举轮次的投票才是有效的投票。同时比较的优先级为

      • epoch
      • zxid
      • sid
    3. 投票添加到recvset

      recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
      

      recvset存放的是所有服务器的投票,如果有半数的机器与本机投票的节点相同则选举结束。

    4. 判断自己投票的peer是否选为Leader

      // 当前投票的节点选为Leader
      if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
                                 logicalclock.get(), proposedEpoch))) {
        // 最后再看看还有没有其他的节点投票比当前Leader投票更高
        while((n = recvqueue.poll(finalizeWait,
                                  TimeUnit.MILLISECONDS)) != null){
          if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
             proposedLeader, proposedZxid, proposedEpoch)){
            recvqueue.put(n);
            break;
          }
        }
      
        // 没有为其他的节点的投票,Leader选举完成,更改节点状态
        if (n == null) {
          self.setPeerState((proposedLeader == self.getId()) ?
                            ServerState.LEADING: learningState());
      
          Vote endVote = new Vote(proposedLeader,
                                  proposedZxid,
                                  logicalclock.get(),
                                  proposedEpoch);
          leaveInstance(endVote);
          return endVote;
        }
      }
      

    至此,集群选举就完成了。

询问Leader

除了整个集群启动或者集群Leader与集群其他节点失去通信(宕机、网络等),需要进行选举,单独的某个Learner重新启动或者网络闪断重连是不需要选举的,但是新的Learner需要从集群中得知新的Leader是哪个节点。这时该Learner同样调用FastLeaderElection#lookForLeader方法,返回回来的投票状态是FOLLOWING或者LEADING

case FOLLOWING:
case LEADING:
    ...

这种状况下,只要去收集其他节点的通知,然后根据收到的通知,取出正确的Leader,做Follow即可。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容