初始化
在集群模式节点启动时,调用QuorumPeer#createElectionAlgorithm
方法,创建选举算法。默认的算法类型为3,也就是
case 3:
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
创建
QuorumCnxManager
实例,该类使用TCP实现了Leader选举的连接管理器。它为每对服务器维护一个连接。并且确保每对正确运行并且可以通过网络进行通信的服务器都有一个连接(彼此之间只有一个连接)。如果两台服务器试图同时启动一个连接,那么连接管理器将使用一种非常简单的连接中断机制,根据双方的Sid大小决定放弃哪个连接。
对于每个peer,管理器维护要发送的消息队列。如果到任何特定peer的连接中断,则发送方线程将消息放回到列表中。由于此实现目前使用队列实现来维护要发送到另一个peer的消息,因此我们将消息添加到队列尾部,从而更改消息的顺序。-
然后启动连接监听器
这个连接监听器是专为选举而生的,在QuorumCnxManager#handleConnection
方法,可以看到// 接收的连接来自比自己的id小的,直接关闭,这样就能保证节点之间只有一个连接 if (sid < this.mySid) { SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } closeSocket(sock); connectOne(sid); } else { // 发送线程 SendWorker sw = new SendWorker(sock, sid); // 接收线程 RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY)); sw.start(); rw.start(); return; }
- 接收的连接来自比自己的sid小的,直接关闭,这样就能保证节点之间只有一个连接
- 为新连接启动两个线程:
SendWorker
、RecvWorker
用于选举投票的接收和发送
-
创建选举算法
FastLeaderElection
基于TCP连接实现的Leader选举算法,在FastLeaderElection
的构造函数中,调用FastLeaderElection#starter
方法启动private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); }
注意到构建了一个
Messenger
实例,其是选举算法的消息发送、接收器,其构造方法中初始化了发送、接收线程Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); this.wr = new WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); }
选举
当集群没有Leader时,或者集群重新启动,集群所有节点状态为LOOKING
,节点会进入选举状态
// QuorumPeer#run
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
以默认的FastLeaderElection#lookForLeader
为例,该方法开始新一轮Leader选举。每当QuorumPeer将其状态更改为LOOKING时,就会调用此方法,并向所有其他peers发送通知。
拆解lookForLeader
方法
- 创建接受的投票和发送投票的集合
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
-
更新自己的投票,也就是优先给自己投票
同时在增加逻辑时钟,表明开始新纪元synchronized(this){ //增加逻辑时钟 logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); }
-
更新本地投票后,给其他节点发送通知
sendNotifications();
这一步对每个peer节点创建一个消息,存放在
sendqueue
中,这时候messenger
实例中WorkerSender
线程将发送数据// Messenger.WorkerSender#run public void run() { while (!stop) { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } } void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); manager.toSend(m.sid, requestBuffer); }
将调用
QuorumCnxManager#toSend
方法发送数据(加入队列)public void toSend(Long sid, ByteBuffer b) { if (this.mySid == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); } else { ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY); ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq); if (bqExisting != null) { addToSendQueue(bqExisting, b); } else { addToSendQueue(bq, b); } connectOne(sid); } }
可以看到将消息加入到
queueSendMap
中所对应的sid
的队列中,那这个队列的数据肯定由QuorumCnxManager#handleConnection
方法中为每个peer创建的发送线程SendWorker
发送。在QuorumCnxManager.SendWorker#run
方法中ByteBuffer b = null; try { ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if (bq != null) { b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); } else { break; } if(b != null) { // lastMessageSent是为了确保消息正确发送 lastMessageSent.put(sid, b); // 发送消息 send(b); } }
可以看到到这里将消息发送给对应的其他的peer。
于此同时
RecvWorker
也在接收其他peer的投票,并将投票加入recvQueue
队列FastLeaderElection.Messenger.WorkerReceiver
线程会调用QuorumCnxManager#pollRecvQueue
方法,将Message
从中取出,并封装成Notification
,投递到recvqueue
队列 -
进入循环,不断交换投票,直到找到新的Leader
-
从接收队列中取出一个通知
recvQueue
由上面WorkerReceiver
线程封装并投递的Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
-
处理通知
处理投票要先判断epochif (n.electionEpoch > logicalclock.get()) { // 1 确保大家都在同一轮投票中 logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { // 2 break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { //3 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); }
外部投票的epoch大于内部投票。更新自己的
logicalclock
,并且清空所有已经收到的投票,然后进行选票比较。最终再将内部投票发送出去。更新logicalclock
的目的是为了确保大家都在同一轮投票。外部投票的epoch小于内部投票。还是上一轮投票,忽略
外部投票的epoch等于内部投票。同一轮投票,进行选票比较。如果为其投票,则更新本地投票,同时给其他节点发送通知
只有在同一个选举轮次的投票才是有效的投票。同时比较的优先级为
- epoch
- zxid
- sid
-
投票添加到
recvset
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
recvset
存放的是所有服务器的投票,如果有半数的机器与本机投票的节点相同则选举结束。 -
判断自己投票的peer是否选为Leader
// 当前投票的节点选为Leader if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // 最后再看看还有没有其他的节点投票比当前Leader投票更高 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } // 没有为其他的节点的投票,Leader选举完成,更改节点状态 if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } }
至此,集群选举就完成了。
-
询问Leader
除了整个集群启动或者集群Leader与集群其他节点失去通信(宕机、网络等),需要进行选举,单独的某个Learner重新启动或者网络闪断重连是不需要选举的,但是新的Learner需要从集群中得知新的Leader是哪个节点。这时该Learner同样调用FastLeaderElection#lookForLeader
方法,返回回来的投票状态是FOLLOWING
或者LEADING
case FOLLOWING:
case LEADING:
...
这种状况下,只要去收集其他节点的通知,然后根据收到的通知,取出正确的Leader,做Follow即可。