zookeeper源码分析(1)-服务端启动流程分析了服务端集群启动时会进行选举,下面主要分析下选举流程和后续的leader,follower,observer服务器的启动流程
Leader选举
首先介绍一些选举相关术语:
-
SID:
服务器ID,同myid的值一样 -
ZXID:
事务ID,用来标识当前服务器的事务变更状态,值越大说明当前服务器的数据越新 -
Vote:
投票的对象,包含如下属性:
final private long id; //被推举的Leader SID值
final private long zxid; //被推举的Leader 事务 ID值
final private long electionEpoch;//逻辑时钟,用来判断多个投票是否在同一轮选举周期中,每进行新一轮的投票后,都会对该值加1
final private long peerEpoch;//被推举的Leader的epoch
final private ServerState state;//投票所属服务器的状态
服务器状态ServerState
public enum ServerState {
LOOKING, //寻找Leader状态,处于该状态时,服务器会进入选举流程
FOLLOWING,//跟随者状态,只处理非事务请求,事务请求会转交给leader服务器
LEADING,//领导者状态
OBSERVING;//观察者状态,不参与选举过程,只处理非事务请求,事务请求会转交给leader服务器
}
-
QuorumCnxManager
每台服务器在进行FastLeaderElection对象创建时,都会启动一个QuorumCnxManager,负责各台服务器之间的底层Leader选举过程中的网络通信,这个类中维护了一系列的队列,用于保存接收到的/待发送的消息,对于发送队列,会对每台其他服务器分别创建一个发送队列,互不干扰。核心变量为:
//消息接收队列,用于存放从其他服务器接收到的消息
public final ArrayBlockingQueue<Message> recvQueue;
//消息发送队列,按照SID分组,用于保存待发送的消息,从而保证了各台机器之间的消息发送互不影响
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
//SendWorker是消息发送器,这是按照SID分组的消息发送器集合
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
//最近发送过的消息,为每个SID保留最近发送过的消息
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
QuorumCnxManager会为每个远程服务器创建一个SendWorker线程和RecvWorker线程
- 消息发送过程:
每个SendWorker不断的从对应的消息发送队列中获取一个消息来发送,并将这个消息放入lastMessageSent中,如果队列为空,则从lastMessageSent取出最后一个消息重新发送,可解决接方没有正确接收或处理消息的问题 - 消息接收过程:
每个RecvWorker不断的从这个TCP连接中读取消息,并将其保存到recvQueue队列中
下面看一下服务器之间连接的创建过程:
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
可以发现在两两创建连接时,有个规则:只允许SID大的服务器主动和其他服务器建立连接,否则断开连接。在receiveConnection方法中,服务器会接受远程SID比自己大的连接。从而避免了两台服务器之间的重复连接。
leader选举算法实现流程如下:
选举主要函数为:FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
//用于记录当前服务器在本轮次的选举中收到的所有外部投票
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
* Only peer epoch is used to check that the votes come
* from the same ensemble. This is because there is at
* least one corner case in which the ensemble can be
* created with inconsistent zxid and election epoch
* info. However, given that only one ensemble can be
* running at a single point in time and that each
* epoch is used only once, using only the epoch to
* compare the votes is sufficient.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
选举流程为:
1.自增选举轮次
//使得所有有效选票都在一个轮次中
logicalclock.incrementAndGet();
2.初始化选票
第一次选举前,每台服务器都会将自己推举为leader
//updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
//leader为myid
synchronized void updateProposal(long leader, long zxid, long epoch){
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
3.发送初始化选票
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
//投票类型为notification
ToSend.mType.notification,
//投票leader的myid值
proposedLeader,
//投票leader的zxid值
proposedZxid,
//当前选举轮次
logicalclock.get(),
//当前服务器状态
QuorumPeer.ServerState.LOOKING,
//为myid
sid,
//当前currentEpoch的值,即currentEpoch文件的值
proposedEpoch,
//参与选举的服务器地址
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
会对所有参与选举的server端发送自己的选票
注意:在创建FastLeaderElection选举算法对象时,会调用它的start方法,
public void start() {
this.messenger.start();
}
// Starts instances of WorkerSender and WorkerReceiver
void start(){
this.wsThread.start();
this.wrThread.start();
}
启动两个线程,wsThread和wrThread,实际上会包装为WorkerSender和WorkerReceiver,WorkerSender会不断的从FastLeaderElection.sendqueue 中获得发送消息QuorumCnxManager的queueSendMap中,发送出去。WorkerReceiver会不断的从QuorumCnxManager的recvQueue中获得消息添加到FastLeaderElection.recvqueue中
实现流程图如下:
如果当前处于选举状态 ServerState.LOOKING,会不断的进入选举循环中
4.接收外部选票Notification n
如果没有接收到外部投票,且QuorumCnxManager.queueSendMap为空,则重新发送自己的投票,否则检查连接,没有连接的话重新和其他服务器创建连接,如果已经建立则重新发送投票
5.判断选举轮次(如果接收到了外部选票)
- 如果外部投票的轮次大于内部投票
n.electionEpoch > logicalclock.get()
,则立即更新自己的选举轮次logicalclock.set(n.electionEpoch);
并清空所有已经收到的投票recvset.clear()
,然后使用初始化的投票
来进行pk,并把内部投票发送出去 - 外部投票的轮次小于内部投票,服务器会直接忽略掉该外部投票,返回步骤4
- 外部投票的选举轮次和内部投票一致,开始pk选票
6.选票pkFastLeaderElection#totalOrderPredicate
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
因素考虑优先级:
1.选举轮次 2.ZXID 3.SID,谁越大选谁
7.变更投票,并将变更发送出去
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
8.选票归档
recvset
用于记录当前服务器在本轮次的leader选举中收到的所有外部投票,按照SID来区分
9.统计投票,更新服务器状态
如果termPredicate
返回为true,说明recvset接收到当前轮次所有其他服务器的投票,如果不再接收到其他选票,说明当前服务器的选票就是最终leader的SID,也就是有过半的服务器认可了当前的内部投票,如果确定已经有过半的服务器认可了该内部投票,则更新当前服务器的状态,确定是自身是leader还是follower,否则终止投票,否则返回步骤4
至此,选举过程已经分析完毕了,确定了服务器的角色之后,下面来看各个服务器的启动流程
先放张Leader服务器和Follewer服务器启动期交互过程图
Leader服务器启动
主要方法:Leader.lead()
void lead() throws IOException, InterruptedException {
········统计选举时间和注册JMX代码省略········
try {
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized(this){
lastProposed = zk.getZxid();
}
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
try {
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ newLeaderProposal.ackSetsToString() + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for(LearnerHandler f : getLearners()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
followerSet.add(f.getSid());
}
}
boolean initTicksShouldBeIncreased = true;
for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
initTicksShouldBeIncreased = false;
break;
}
}
if (initTicksShouldBeIncreased) {
LOG.warn("Enough followers present. "+
"Perhaps the initTicks need to be increased.");
}
return;
}
startZkServer();
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.setZooKeeperServer(zk);
}
self.adminServer.setZooKeeperServer(zk);
boolean tickSkip = true;
// If not null then shutdown this leader
String shutdownMessage = null;
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
// We use an instance of SyncedLearnerTracker to
// track synced learners to make sure we still have a
// quorum of current (and potentially next pending) view.
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self
.getLastSeenQuorumVerifier());
}
syncedAckSet.addAck(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
// check leader running status
if (!this.isRunning()) {
// set shutdown flag
shutdownMessage = "Unexpected internal error";
break;
}
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+ syncedAckSet.ackSetsToString() + " ]";
break;
}
tickSkip = !tickSkip;
}
for (LearnerHandler f : getLearners()) {
f.ping();
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
// leader goes in looking state
}
} finally {
zk.unregisterJMX(this);
}
}
1.重新加载快照和事务日志数据,可参考zookeeper源码分析(6)-数据和存储
- 启动Follewer接收器LearnerCnxAcceptor
LearnerCnxAcceptor负责接收所有非Leader服务器的连接请求,用于集群间非选举通信
LearnerCnxAcceptor.run()
public void run() {
while (!stop) {
Socket s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
}
·······省略异常处理代码·······
}
可以看到当接收到其余服务器的连接请求时,会创建LearnerHandler实例,该实例负责Leader服务器和其他服务器之间的消息通信和数据同步,初次创建会收到其他服务器发送的OBSERVERINFO或是FOLLOWERINFO类型的消息,通信类型可参考zookeeper集群间通信类型
5.Leader解析Learner消息,计算新的epoch(getEpochToPropose
)
逻辑为:如果Learner的epoch比Leader的epoch大,则epoch_of_leader = epoch_of_learner + 1,然后该LearnerHandler会进行等待,知道过半的Learner已经和Leader建立过通信,这样就可以确定Leader 的epoch了
6.Leader向其他服务器发送leader状态
LearnerHandler.run
public void run() {
try {
leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
if (leader.self.getView().containsKey(this.sid)) {
LOG.info("Follower sid: " + this.sid + " : info : "
+ leader.self.getView().get(this.sid).toString());
} else {
LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
}
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
// Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
LOG.debug("Sending NEWLEADER message to " + sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
.toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
LearnerSnapshot snapshot =
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
snapshot.close();
}
}
// Start thread that blast packets in the queue to learner
startSendingPackets();
/*
* Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK,"
+ " but received packet: {}", packetToString(qp));
return;
}
if(LOG.isDebugEnabled()){
LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
LOG.debug("Sending UPTODATE message to " + sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while(true){
········之后表示主从已经同步完成,zkServer启动完毕,可以接收服务器间的通信了················
}
启动LearnerHandler之后,会向Learner发送LEADERINFO,此时leader线程和LearnerHandler线程都会等待在leader.waitForEpochAck(this.getSid(), ss);
方法上
7.Leaner响应ACKEPOCH消息
当一半Leaner参与选举的服务器回复ACKEPOCH消息之后,Leader服务器发送开始进行主从数据同步,boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
可参考zookeeper源码分析(6)-数据和存储
8.zkServer启动
开始进行主从同步后,Leader线程会等待在waitForNewLeaderAck(self.getId(), zk.getZxid());
方法上,每当一个LearnerHandler线程完成了和Learner服务器的同步,会发送一个NEWLWADER给Learner服务器,Learner服务器会响应一个ACK消息给LearnerHandler,一半Leaner参与选举的服务器回复ACK消息之后,leader服务器会启动LeaderZooKeeperServer,同时LearnerHandler会发送一个UPTODATE消息给同步好的Leaner服务器,表示同步完成,可对外提供服务了
注意:leader服务器维护了两个服务器校验器
//last committed quorum verifier
public QuorumVerifier quorumVerifier;
//last proposed quorum verifier
public QuorumVerifier lastSeenQuorumVerifier = null;
在和Leaner服务器进行同步前交互时,传递的一直是lastSeenQuorumVerifier,我的理解是这样不影响事务请求正常提交的quorumVerifier.version,代码解释为:如有不对,请小伙伴指教~
Follewer服务器启动
主要流程为Follower.followLeader()
void followLeader() throws InterruptedException {
·········省略JMX注册和异常检查代码·········
QuorumServer leaderServer = findLeader();
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
}
}
1.主动连接注册到Leader服务器,并发送FOLLOWERINFO消息
2.一旦tcp连接上了,会接收到leader服务器发送的LEADERINFO消息,并回复ACKEPOCH消息,调用Learner.registerWithLeader(Leader.FOLLOWERINFO);
/**
* Once connected to the leader, perform the handshake protocol to
* establish a following / observing connection.
* @param pktType
* @return the zxid the Leader sends for synchronization purposes.
* @throws IOException
*/
protected long registerWithLeader(int pktType) throws IOException{
/*
* Send follower info, including last zxid and sid
*/
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);
readPacket(qp);
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
if (newEpoch > self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
// since we have already acked an epoch equal to the leaders, we cannot ack
// again, but we still need to send our lastZxid to the leader so that we can
// sync with it if it does assume leadership of the epoch.
// the -1 indicates that this reply should not count as an ack for the new epoch
wrappedEpochBytes.putInt(-1);
} else {
throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
return qp.getZxid();
}
}
3.开始数据同步syncWithLeader(newEpochZxid);
,参考zookeeper源码分析(6)-数据和存储
4.数据同步完成,启动LearnerZooKeeperServer,初始化请求链
Observer服务器启动
主要流程为:Observer.observeLeader()
void observeLeader() throws Exception {
·········省略JMX注册和异常检查代码·········
try {
QuorumServer leaderServer = findLeader();
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
syncWithLeader(newLeaderZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
}
1.主动连接注册到Leader服务器,并发送OBSERVERINFO消息
2.一旦tcp连接上了,会接收到leader服务器发送的LEADERINFO消息,并回复ACKEPOCH消息,主要用来告诉服务器自己当前的lastLoggedZxid和epochBytes,调用Learner.registerWithLeader(Leader.OBSERVERINFO);
3.开始数据同步syncWithLeader(newEpochZxid);
,参考zookeeper源码分析(6)-数据和存储
4.数据同步完成,启动LearnerZooKeeperServer,初始化请求链
此后当Leader节点断掉或Leader服务器失去了与过半Follower的联系时,底层节点之间的通信会抛出异常,此时Leader.lead() or Follower.followLeader()
会结束方法内的循环,从而返回至Quorum.run
方法内,节点分别关闭各自的所有通信,将选举状态置为LOOKING
状态,开始新一轮的选举。
感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~