笔者关注ZooKeeper有一段时间,从ZooKeeper提供的API中,发现有一个比较有意思的API,叫sync。但一直不太明白sync API的含义。
从官方的文档里,找到了这段话:
Sometimes developers mistakenly assume one other guarantee that ZooKeeper does not in fact make. This is:
Simultaneously Consistent Cross-Client Views
ZooKeeper does not guarantee that at every instance in time, two different clients will have identical views of ZooKeeper data. Due to factors like network delays, one client may perform an update before another client gets notified of the change. Consider the scenario of two clients, A and B. If client A sets the value of a znode /a from 0 to 1, then tells client B to read /a, client B may read the old value of 0, depending on which server it is connected to. If it is important that Client A and Client B read the same value, Client B should should call the sync() method from the ZooKeeper API method before it performs its read.
So, ZooKeeper by itself doesn't guarantee that changes occur synchronously across all servers, but ZooKeeper primitives can be used to construct higher level functions that provide useful client synchronization.
于是我们知道,sync是使得client当前连接着的ZooKeeper服务器,和ZooKeeper的Leader节点同步(sync)一下数据。
另外我也同时看了一下源代码,当follower收到到sync请求时,会将这个请求添加到一个pendingSyncs队列里,然后将这个请求发送给leader,直到收到leader的Leader.SYNC消息时,才将这个请求从pendingSyncs队列里移除,并commit这个请求。
FollowerRequestProcessor.java:
@Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
// 可以看到sync请求和其它事务型请求的的区别在于,除了发送给leader之外,还要记录到pendingSyncs里
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
zks.getFollower().request(request);
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}
FollowerZooKeeperServer.java:
// 接收到leader发送的Leader.SYNC消息后,才真正commit这个请求
synchronized public void sync(){
if(pendingSyncs.size() ==0){
LOG.warn("Not expecting a sync.");
return;
}
Request r = pendingSyncs.remove();
commitProcessor.commit(r);
}
当Leader收到一个sync请求时,如果leader当前没有待commit的决议,那么leader会立即发送一个Leader.SYNC消息给follower。否则,leader会等到当前最后一个待commit的决议完成后,再发送Leader.SYNC消息给Follower。
Leader.java:
synchronized public void processSync(LearnerSyncRequest r){
if(outstandingProposals.isEmpty()){
sendSync(r);
} else {
List<LearnerSyncRequest> l = pendingSyncs.get(lastProposed);
if (l == null) {
l = new ArrayList<LearnerSyncRequest>();
}
l.add(r);
pendingSyncs.put(lastProposed, l);
}
}
其实这里面有一个隐含的逻辑,就是如果leader和follower之间的消息通信,是严格按顺序来发送的(TCP保证),因此,当follower接收到Leader.SYNC消息时,说明follower也一定接收到了leader之前(在leader接收到sync请求之前)发送的所有提案或者commit消息。这样,就可以确保follower和leader是同步的了。