ZooKeeper的读写请求实际上是由一个RequestProcessor链处理的,根据服务器的角色分为四种不完全相同的情况,单机、leader、follower和observer,下面我们来具体分析。
单机模式:
由前面的C/S协议分析,请求会由ZooKeeperServer的processPacket处理:
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
// a different server to connect to.
authReturn = ap.handleAuthentication(
new ServerAuthenticationProvider.ServerObjs(this, cnxn),
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.debug("Authentication succeeded for scheme: {}", scheme);
LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn(
"No authentication provider for scheme: {} has {}",
scheme,
ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
processSasl(incomingBuffer, cnxn, h);
} else {
if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
cnxn.sendResponse(replyHeader, null, "response");
cnxn.sendCloseSession();
cnxn.disableRecv();
} else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
}
如果是数据相关请求,会实例化一个Request对象,并填充相关字段,然后调用submitRequest。submitRequest会导致Request被放入RequestThrottler的submittedRequests的阻塞队列中。RequestThrottler有一个线程,从submittedRequests中取出请求,并调用ZooKeeperServer的submitRequestNow处理请求。
public void submitRequestNow(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
LOG.debug("Dropping request.", e);
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
LOG.error("Unable to process request", e);
// Update request accounting/throttling limits
requestFinished(si);
}
}
关键代码是这一句firstProcessor.processRequest,调用processor链的第一个Processor的processRequest方法。
processor链的初始化入口是ZooKeeperServer的setupRequestProcessors方法:
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor) firstProcessor).start();
}
Request首先由PrepRequestProcessor处理,然后由SyncRequestProcessor处理,最后交给FinalRequestProcessor处理。
首先来看PrepRequestProcessor的processRequest方法:
public void processRequest(Request request) {
request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
只有一个入队操作,我们接着看PrepRequestProcessor(继承了Thread类)的run方法:
public void run() {
LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
try {
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
request.prepStartTime = Time.currentElapsedTime();
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
逻辑很简单不停的从队列里拿出请求,调用pRequest方法,我们接着看pRequest:
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
pRequestHelper(request);
}
request.zxid = zks.getZxid();
long timeFinishedPrepare = Time.currentElapsedTime();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
nextProcessor.processRequest(request);
ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}
首先清空Request的hdr(事务头)和txn(事务操作内容),调用pRequestHelper处理Request,更新request的zxid,最后将请求传递给下一个RequestProcessor处理。接下来我们看pRequestHelper方法:
private void pRequestHelper(Request request) throws RequestProcessorException {
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiOperationRecord multiRequest = new MultiOperationRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;
//Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
for (Op op : multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn;
/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else {
/* Prep the request and convert to a Txn */
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = op.getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting"
+ " remaining multi ops. Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}
// TODO: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(type, bb.array()));
}
}
request.setTxn(new MultiTxn(txns));
if (digestEnabled) {
setTxnDigest(request);
}
break;
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break;
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getAllChildrenNumber:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.setWatches2:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getEphemerals:
case OpCode.multiRead:
case OpCode.addWatch:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type {}", request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info(
"Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if (bb != null) {
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
}
对于造成数据变更的请求,调用pRequest2Txn处理请求,否则在检查session合法后返回。我们继续看pRequest2Txn方法:
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
}
PrecalculatedDigest precalculatedDigest;
switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
pRequest2TxnCreate(type, request, record, deserialize);
break;
}
case OpCode.deleteContainer: {
String path = new String(request.request.array());
String parentPath = getParentPathAndValidate(path);
ChangeRecord nodeRecord = getRecordForPath(path);
if (nodeRecord.childCount > 0) {
throw new KeeperException.NotEmptyException(path);
}
if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
throw new KeeperException.BadVersionException(path);
}
ChangeRecord parentRecord = getRecordForPath(parentPath);
request.setTxn(new DeleteTxn(path));
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount--;
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
}
case OpCode.delete:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
DeleteRequest deleteRequest = (DeleteRequest) record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
}
String path = deleteRequest.getPath();
String parentPath = getParentPathAndValidate(path);
ChangeRecord parentRecord = getRecordForPath(parentPath);
zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
ChangeRecord nodeRecord = getRecordForPath(path);
checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
if (nodeRecord.childCount > 0) {
throw new KeeperException.NotEmptyException(path);
}
request.setTxn(new DeleteTxn(path));
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount--;
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest) record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
}
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setVersion(newVersion);
nodeRecord.stat.setMtime(request.getHdr().getTime());
nodeRecord.stat.setMzxid(zxid);
nodeRecord.data = setDataRequest.getData();
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.reconfig:
if (!zks.isReconfigEnabled()) {
LOG.error("Reconfig operation requested but reconfig feature is disabled.");
throw new KeeperException.ReconfigDisabledException();
}
if (ZooKeeperServer.skipACL) {
LOG.warn("skipACL is set, reconfig operation will skip ACL checks!");
}
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
LeaderZooKeeperServer lzks;
try {
lzks = (LeaderZooKeeperServer) zks;
} catch (ClassCastException e) {
// standalone mode - reconfiguration currently not supported
throw new KeeperException.UnimplementedException();
}
QuorumVerifier lastSeenQV = lzks.self.getLastSeenQuorumVerifier();
// check that there's no reconfig in progress
if (lastSeenQV.getVersion() != lzks.self.getQuorumVerifier().getVersion()) {
throw new KeeperException.ReconfigInProgress();
}
ReconfigRequest reconfigRequest = (ReconfigRequest) record;
long configId = reconfigRequest.getCurConfigId();
if (configId != -1 && configId != lzks.self.getLastSeenQuorumVerifier().getVersion()) {
String msg = "Reconfiguration from version "
+ configId
+ " failed -- last seen version is "
+ lzks.self.getLastSeenQuorumVerifier().getVersion();
throw new KeeperException.BadVersionException(msg);
}
String newMembers = reconfigRequest.getNewMembers();
if (newMembers != null) { //non-incremental membership change
LOG.info("Non-incremental reconfig");
// Input may be delimited by either commas or newlines so convert to common newline separated format
newMembers = newMembers.replaceAll(",", "\n");
try {
Properties props = new Properties();
props.load(new StringReader(newMembers));
request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false);
request.qv.setVersion(request.getHdr().getZxid());
} catch (IOException | ConfigException e) {
throw new KeeperException.BadArgumentsException(e.getMessage());
}
} else { //incremental change - must be a majority quorum system
LOG.info("Incremental reconfig");
List<String> joiningServers = null;
String joiningServersString = reconfigRequest.getJoiningServers();
if (joiningServersString != null) {
joiningServers = StringUtils.split(joiningServersString, ",");
}
List<String> leavingServers = null;
String leavingServersString = reconfigRequest.getLeavingServers();
if (leavingServersString != null) {
leavingServers = StringUtils.split(leavingServersString, ",");
}
if (!(lastSeenQV instanceof QuorumMaj)) {
String msg = "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
}
Map<Long, QuorumServer> nextServers = new HashMap<Long, QuorumServer>(lastSeenQV.getAllMembers());
try {
if (leavingServers != null) {
for (String leaving : leavingServers) {
long sid = Long.parseLong(leaving);
nextServers.remove(sid);
}
}
if (joiningServers != null) {
for (String joiner : joiningServers) {
// joiner should have the following format: server.x = server_spec;client_spec
String[] parts = StringUtils.split(joiner, "=").toArray(new String[0]);
if (parts.length != 2) {
throw new KeeperException.BadArgumentsException("Wrong format of server string");
}
// extract server id x from first part of joiner: server.x
Long sid = Long.parseLong(parts[0].substring(parts[0].lastIndexOf('.') + 1));
QuorumServer qs = new QuorumServer(sid, parts[1]);
if (qs.clientAddr == null || qs.electionAddr == null || qs.addr == null) {
throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have 3 ports specified");
}
// check duplication of addresses and ports
for (QuorumServer nqs : nextServers.values()) {
if (qs.id == nqs.id) {
continue;
}
qs.checkAddressDuplicate(nqs);
}
nextServers.remove(qs.id);
nextServers.put(qs.id, qs);
}
}
} catch (ConfigException e) {
throw new KeeperException.BadArgumentsException("Reconfiguration failed");
}
request.qv = new QuorumMaj(nextServers);
request.qv.setVersion(request.getHdr().getZxid());
}
if (QuorumPeerConfig.isStandaloneEnabled() && request.qv.getVotingMembers().size() < 2) {
String msg = "Reconfig failed - new configuration must include at least 2 followers";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
} else if (request.qv.getVotingMembers().size() < 1) {
String msg = "Reconfig failed - new configuration must include at least 1 follower";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
}
if (!lzks.getLeader().isQuorumSynced(request.qv)) {
String msg2 = "Reconfig failed - there must be a connected and synced quorum in new configuration";
LOG.warn(msg2);
throw new KeeperException.NewConfigNoQuorum();
}
nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, null, null);
SetDataTxn setDataTxn = new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1);
request.setTxn(setDataTxn);
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setVersion(-1);
nodeRecord.stat.setMtime(request.getHdr().getTime());
nodeRecord.stat.setMzxid(zxid);
nodeRecord.data = setDataTxn.getData();
// Reconfig is currently a noop from digest computation
// perspective since config node is not covered by the digests.
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.NOOP, ZooDefs.CONFIG_NODE, nodeRecord.data, nodeRecord.stat);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.setACL:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetACLRequest setAclRequest = (SetACLRequest) record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
}
path = setAclRequest.getPath();
validatePath(path, request.sessionId);
List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo, path, listACL);
newVersion = checkAndIncVersion(nodeRecord.stat.getAversion(), setAclRequest.getVersion(), path);
request.setTxn(new SetACLTxn(path, listACL, newVersion));
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setAversion(newVersion);
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
request.request.rewind();
int to = request.request.getInt();
request.setTxn(new CreateSessionTxn(to));
request.request.rewind();
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
break;
case OpCode.closeSession:
// We don't want to do this check since the session expiration thread
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges) {
// need to move getEphemerals into zks.outstandingChanges
// synchronized block, otherwise there will be a race
// condition with the on flying deleteNode txn, and we'll
// delete the node again here, which is not correct
Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
if (digestEnabled) {
parentPath = getParentPathAndValidate(path2Delete);
parentRecord = getRecordForPath(parentPath);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
}
nodeRecord = new ChangeRecord(
request.getHdr().getZxid(), path2Delete, null, 0, null);
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.REMOVE, path2Delete);
addChangeRecord(nodeRecord);
}
if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
break;
case OpCode.check:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
}
path = checkVersionRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo, path, null);
request.setTxn(new CheckVersionTxn(
path,
checkAndIncVersion(nodeRecord.stat.getVersion(), checkVersionRequest.getVersion(), path)));
break;
default:
LOG.warn("unknown type {}", type);
break;
}
// If the txn is not going to mutate anything, like createSession,
// we just set the current tree digest in it
if (request.getTxnDigest() == null && digestEnabled) {
setTxnDigest(request);
}
}
主要逻辑如下:
- 为request生成hdr,hdr存储了操作类型和事务id等信息。
- 为request生成tnx,tnx包含了操作的参数信息,如目录等。
- 为request计算txnDigest,包含了操作的签名信息。
PrepRequestProcessor的处理到此为止,我们接下来看SyncRequestProcessor的处理过程:
public void processRequest(final Request request) {
Objects.requireNonNull(request, "Request cannot be null");
request.syncQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}
只有一个入队操作,我们接着看SyncRequestProcessor(继承了Thread类)的run方法:
public void run() {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
/* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
si = queuedRequests.take();
}
if (si == REQUEST_OF_DEATH) {
break;
}
long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// track the number of records written to the log
if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (!snapThreadMutex.tryAcquire()) {
LOG.warn("Too busy to snap, skipping");
} else {
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read or a throttled request(which doesn't need to be written to the disk),
// and there are no pending flushes (writes), then just pass this to the next processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (shouldFlush()) {
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
}
逻辑很简单不停的从队列里拿出请求,并作以下处理:
- 如果从队列中取请求求超时,则调用flush把所有待刷盘的请求写到磁盘。flush会将已刷盘的请求传递给下一个RequestProcessor处理。
- 如果该请求未超时,且追加请求到日志成功,判断是否需要新建一个数据库快照,如果需要,滚动数据库日志并生成一个新的数据库快照。快照日志文件名字与snapshot.800000000类似,事务日志文件名字与log.800000001相似。
- 如果没有待刷盘的数据(toFlush为空,且zks.getZKDatabase().append(si)返回false,返回false代表请求不需要写到事务日志里,一般是读请求或者watcher类请求),直接把请求传递给下一个RequestProcessor处理。
- 将Request加入到toFlush中。
- 判断是否需要刷盘,如果需要,则调用flush把所有待刷盘的请求写到磁盘。flush会将已刷盘的请求传递给下一个RequestProcessor处理。
SyncRequestProcessor的处理到此为止,我们接下来看FinalRequestProcessor的处理过程:
public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request);
if (LOG.isTraceEnabled()) {
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
ProcessTxnResult rc = null;
if (!request.isThrottled()) {
rc = applyRequest(request);
}
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
int responseSize = 0;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
AuditHelper.addAuditLog(request, rc, true);
/*
* When local session upgrading is disabled, leader will
* reject the ephemeral node creation due to session expire.
* However, if this is the follower that issue the request,
* it will have the correct error code, so we should use that
* and report to user
*/
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
}
KeeperException ke = request.getException();
if (ke instanceof SessionMovedException) {
throw ke;
}
if (ke != null && request.type != OpCode.multi) {
throw ke;
}
LOG.debug("{}", request);
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REPLIES.add(1);
}
if (request.isThrottled()) {
throw KeeperException.create(Code.THROTTLEDOP);
}
AuditHelper.addAuditLog(request, rc);
switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
updateStats(request, lastOp, lastZxid);
responseSize = cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
return;
}
case OpCode.createSession: {
lastOp = "SESS";
updateStats(request, lastOp, lastZxid);
zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
rsp = new MultiResponse();
for (ProcessTxnResult subTxnResult : rc.multiResult) {
OpResult subResult;
switch (subTxnResult.type) {
case OpCode.check:
subResult = new CheckResult();
break;
case OpCode.create:
subResult = new CreateResult(subTxnResult.path);
break;
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
break;
case OpCode.delete:
case OpCode.deleteContainer:
subResult = new DeleteResult();
break;
case OpCode.setData:
subResult = new SetDataResult(subTxnResult.stat);
break;
case OpCode.error:
subResult = new ErrorResult(subTxnResult.err);
if (subTxnResult.err == Code.SESSIONMOVED.intValue()) {
throw new SessionMovedException();
}
break;
default:
throw new IOException("Invalid type of op");
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.multiRead: {
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = new MultiOperationRecord();
ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
try {
Record rec;
switch (readOp.getType()) {
case OpCode.getChildren:
rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
break;
case OpCode.getData:
rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
GetDataResponse gdr = (GetDataResponse) rec;
subResult = new GetDataResult(gdr.getData(), gdr.getStat());
break;
default:
throw new IOException("Invalid type of readOp");
}
} catch (KeeperException e) {
subResult = new ErrorResult(e.code().intValue());
}
((MultiResponse) rsp).add(subResult);
}
break;
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
lastOp = "CREA";
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setData: {
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig: {
lastOp = "RECO";
rsp = new GetDataResponse(
((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(),
rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL: {
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// TODO we really should not need this
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
Collections.emptyList(),
Collections.emptyList(),
cnxn);
break;
}
case OpCode.setWatches2: {
lastOp = "STW2";
SetWatches2 setWatches = new SetWatches2();
// TODO we really should not need this
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
setWatches.getPersistentWatches(),
setWatches.getPersistentRecursiveWatches(),
cnxn);
break;
}
case OpCode.addWatch: {
lastOp = "ADDW";
AddWatchRequest addWatcherRequest = new AddWatchRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
addWatcherRequest);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path,
null);
Stat stat = new Stat();
List<ACL> acl = zks.getZKDatabase().getACL(path, stat);
requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());
try {
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.ADMIN,
request.authInfo,
path,
null);
rsp = new GetACLResponse(acl, stat);
} catch (KeeperException.NoAuthException e) {
List<ACL> acl1 = new ArrayList<ACL>(acl.size());
for (ACL a : acl) {
if ("digest".equals(a.getId().getScheme())) {
Id id = a.getId();
Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
acl1.add(new ACL(a.getPerms(), id1));
} else {
acl1.add(a);
}
}
rsp = new GetACLResponse(acl1, stat);
}
break;
}
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo,
path,
null);
int number = zks.getZKDatabase().getAllChildrenNumber(path);
rsp = new GetAllChildrenNumberResponse(number);
break;
}
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
zks.checkACL(
request.cnxn,
zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo, path,
null);
List<String> children = zks.getZKDatabase()
.getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
break;
}
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
break;
}
case OpCode.getEphemerals: {
lastOp = "GETE";
GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List<String> ephemerals = new ArrayList<>();
if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) {
ephemerals.addAll(allEphems);
} else {
for (String p : allEphems) {
if (p.startsWith(prefixPath)) {
ephemerals.add(p);
}
}
}
rsp = new GetEphemeralsResponse(ephemerals);
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
// and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
cnxn.sendCloseSession();
return;
} catch (KeeperException e) {
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
err = Code.MARSHALLINGERROR;
}
ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
updateStats(request, lastOp, lastZxid);
try {
if (path == null || rsp == null) {
responseSize = cnxn.sendResponse(hdr, rsp, "response");
} else {
int opCode = request.type;
Stat stat = null;
// Serialized read and get children responses could be cached by the connection
// object. Cache entries are identified by their path and last modified zxid,
// so these values are passed along with the response.
switch (opCode) {
case OpCode.getData : {
GetDataResponse getDataResponse = (GetDataResponse) rsp;
stat = getDataResponse.getStat();
responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
case OpCode.getChildren2 : {
GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
stat = getChildren2Response.getStat();
responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
default:
responseSize = cnxn.sendResponse(hdr, rsp, "response");
}
}
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG", e);
} finally {
ServerMetrics.getMetrics().RESPONSE_BYTES.add(responseSize);
}
}
- 如果不是一个超时请求,调用applyRequest处理它。
- switch语句对于写请求通过applyRequest的返回结果生成响应。对于读请求,处理请求得到结果并生成响应。
- 发送响应给请求端。
接下来看applyRequest的代码:
private ProcessTxnResult applyRequest(Request request) {
ProcessTxnResult rc = zks.processTxn(request);
// ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
// was not being queued — ZOOKEEPER-558) properly. This happens, for example,
// when the client closes the connection. The server should still close the session, though.
// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
// We need to check if we can close the session id.
// Sometimes the corresponding ServerCnxnFactory could be null because
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return rc;
}
}
if (request.getHdr() != null) {
/*
* Request header is created only by the leader, so this must be
* a quorum request. Since we're comparing timestamps across hosts,
* this metric may be incorrect. However, it's still a very useful
* metric to track in the happy case. If there is clock drift,
* the latency can go negative. Note: headers use wall time, not
* CLOCK_MONOTONIC.
*/
long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
if (propagationLatency >= 0) {
ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
}
}
return rc;
}
- ProcessTxnResult rc = zks.processTxn(request)请求交给ZooKeeperServer处理。
- 如果是关闭session的请求则关闭session。
接着看ZooKeeperServer的processTxn方法:
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum();
// return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}
- processTxnForSessionEvents,处理创建session和关闭session的处理。
- processTxnInDB执行具体的写入逻辑,更新dataTree并触发相关watch事件。
- 从outstandingChanges和outstandingChangesForPath中移除已处理的请求。
- 如果是需要同步的请求,将其序列化后加入到commitLog中(一个队列),这样在新的节点作为follower加入节点时,如果数据不差很远,可以直接通过commitLog同步,而不用通过快照同步数据。
单机模式分析到此为止,接下来分析leader。
leader模式下
processor链的初始化入口是LeaderZooKeeperServer的setupRequestProcessors方法:
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
对于从客户端发来的请求:
对于从follower或者observer发来的请求:
先看LeaderRequestProcessor的processRequest方法:
public void processRequest(Request request) throws RequestProcessorException {
// Screen quorum requests against ACLs first
if (!lzks.authWriteRequest(request)) {
return;
}
// Check if this is a local session and we are trying to create
// an ephemeral node, in which case we upgrade the session
Request upgradeRequest = null;
try {
upgradeRequest = lzks.checkUpgradeSession(request);
} catch (KeeperException ke) {
if (request.getHdr() != null) {
LOG.debug("Updating header");
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(ke.code().intValue()));
}
request.setException(ke);
LOG.warn("Error creating upgrade request", ke);
} catch (IOException ie) {
LOG.error("Unexpected error in upgrade", ie);
}
if (upgradeRequest != null) {
nextProcessor.processRequest(upgradeRequest);
}
nextProcessor.processRequest(request);
}
主要流程如下:
- 查ACL,看是否有权限。
- 检查是否是一个本地session,且需要升级为全局session,如果是,插入一个创建session的请求,把该请求交给下一个processor处理。
- 将请求交给下一个processor处理。
PrepRequestProcessor我们已经分析过了,我们来看ProposalRequestProcessor的处理流程:
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");
/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/
if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
if (shouldForwardToNextProcessor(request)) {
nextProcessor.processRequest(request);
}
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request);
}
}
}
主要流程如下:
- 判断是不是follower或者observer发的同步请求,如果是,同步数据给follower或者observer。
- shouldForwardToNextProcessor如果开启了zookeeper.forward_learner_requests_to_commit_processor_disabled这个优化选项且请求是由follower或者observer发过来的,则返回false,否则返回true。如果shouldForwardToNextProcessor的返回值是true,则将请求交给下一个processor处理,否则不给下一个processor处理。
- 判断是否是写请求,如果是则生成提案,将提案发给所有连接到本机follower和observer。follower或者observer收到提案后,会将提案保存到磁盘然后给leader返回ack。等到半数有投票权的节点返回了ack,leader会给所有连接到本机follower和observer的节点发送commit请求并将请求交给CommitProcessor处理,客户端收到commit请求后,将请求应用到数据库。
- 刷盘。
接下来看CommitProcessor的处理流程:
public void processRequest(Request request) {
if (stopped) {
return;
}
LOG.debug("Processing request:: {}", request);
request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request);
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) {
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
}
逻辑很简单,将请求放入queuedRequests队列,如果需要commit,还要放入queuedWriteRequests队列。
接着看commit方法,在收到半数follwer的确认后,该方法会被调用。
public void commit(Request request) {
if (stopped || request == null) {
return;
}
LOG.debug("Committing request:: {}", request);
request.commitRecvTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
committedRequests.add(request);
wakeup();
}
逻辑很简单,放入committedRequests队列。
接着看run方法:
public void run() {
try {
/*
* In each iteration of the following loop we process at most
* requestsToProcess requests of queuedRequests. We have to limit
* the number of request we poll from queuedRequests, since it is
* possible to endlessly poll read requests from queuedRequests, and
* that will lead to a starvation of non-local committed requests.
*/
int requestsToProcess = 0;
boolean commitIsWaiting = false;
do {
/*
* Since requests are placed in the queue before being sent to
* the leader, if commitIsWaiting = true, the commit belongs to
* the first update operation in the queuedRequests or to a
* request from a client on another server (i.e., the order of
* the following two lines is important!).
*/
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
// Avoid sync if we have something to do
if (requestsToProcess == 0 && !commitIsWaiting) {
// Waiting for requests to process
synchronized (this) {
while (!stopped && requestsToProcess == 0 && !commitIsWaiting) {
wait();
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
}
}
}
ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size());
long time = Time.currentElapsedTime();
/*
* Processing up to requestsToProcess requests from the incoming
* queue (queuedRequests). If maxReadBatchSize is set then no
* commits will be processed until maxReadBatchSize number of
* reads are processed (or no more reads remain in the queue).
* After the loop a single committed request is processed if
* one is waiting (or a batch of commits if maxCommitBatchSize
* is set).
*/
Request request;
int readsProcessed = 0;
while (!stopped
&& requestsToProcess > 0
&& (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
&& (request = queuedRequests.poll()) != null) {
requestsToProcess--;
if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) {
// Add request to pending
Deque<Request> requests = pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
requests.addLast(request);
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
} else {
readsProcessed++;
numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(request);
}
/*
* Stop feeding the pool if there is a local pending update
* and a committed request that is ready. Once we have a
* pending request with a waiting committed request, we know
* we can process the committed one. This is because commits
* for local requests arrive in the order they appeared in
* the queue, so if we have a pending request and a
* committed request, the committed request must be for that
* pending write or for a write originating at a different
* server. We skip this if maxReadBatchSize is set.
*/
if (maxReadBatchSize < 0 && !pendingRequests.isEmpty() && !committedRequests.isEmpty()) {
/*
* We set commitIsWaiting so that we won't check
* committedRequests again.
*/
commitIsWaiting = true;
break;
}
}
ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
if (!commitIsWaiting) {
commitIsWaiting = !committedRequests.isEmpty();
}
/*
* Handle commits, if any.
*/
if (commitIsWaiting && !stopped) {
/*
* Drain outstanding reads
*/
waitForEmptyPool();
if (stopped) {
return;
}
int commitsToProcess = maxCommitBatchSize;
/*
* Loop through all the commits, and try to drain them.
*/
Set<Long> queuesToDrain = new HashSet<>();
long startWriteTime = Time.currentElapsedTime();
int commitsProcessed = 0;
while (commitIsWaiting && !stopped && commitsToProcess > 0) {
// Process committed head
request = committedRequests.peek();
if (request.isThrottled()) {
LOG.error("Throttled request in committed pool: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
/*
* Check if this is a local write request is pending,
* if so, update it with the committed info. If the commit matches
* the first write queued in the blockedRequestQueue, we know this is
* a commit for a local write, as commits are received in order. Else
* it must be a commit for a remote write.
*/
if (!queuedWriteRequests.isEmpty()
&& queuedWriteRequests.peek().sessionId == request.sessionId
&& queuedWriteRequests.peek().cxid == request.cxid) {
/*
* Commit matches the earliest write in our write queue.
*/
Deque<Request> sessionQueue = pendingRequests.get(request.sessionId);
ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
/*
* Can't process this write yet.
* Either there are reads pending in this session, or we
* haven't gotten to this write yet.
*/
break;
} else {
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
// If session queue != null, then it is also not empty.
Request topPending = sessionQueue.poll();
/*
* Generally, we want to send to the next processor our version of the request,
* since it contains the session information that is needed for post update processing.
* In more details, when a request is in the local queue, there is (or could be) a client
* attached to this server waiting for a response, and there is other bookkeeping of
* requests that are outstanding and have originated from this server
* (e.g., for setting the max outstanding requests) - we need to update this info when an
* outstanding request completes. Note that in the other case, the operation
* originated from a different server and there is no local bookkeeping or a local client
* session that needs to be notified.
*/
topPending.setHdr(request.getHdr());
topPending.setTxn(request.getTxn());
topPending.setTxnDigest(request.getTxnDigest());
topPending.zxid = request.zxid;
topPending.commitRecvTime = request.commitRecvTime;
request = topPending;
if (request.isThrottled()) {
LOG.error("Throttled request in committed & pending pool: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
// Only decrement if we take a request off the queue.
numWriteQueuedRequests.decrementAndGet();
queuedWriteRequests.poll();
queuesToDrain.add(request.sessionId);
}
}
/*
* Pull the request off the commit queue, now that we are going
* to process it.
*/
committedRequests.remove();
commitsToProcess--;
commitsProcessed++;
// Process the write inline.
processWrite(request);
commitIsWaiting = !committedRequests.isEmpty();
}
ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR
.add(Time.currentElapsedTime() - startWriteTime);
ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
/*
* Process following reads if any, remove session queue(s) if
* empty.
*/
readsProcessed = 0;
for (Long sessionId : queuesToDrain) {
Deque<Request> sessionQueue = pendingRequests.get(sessionId);
int readsAfterWrite = 0;
while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) {
numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(sessionQueue.poll());
readsAfterWrite++;
}
ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
readsProcessed += readsAfterWrite;
// Remove empty queues
if (sessionQueue.isEmpty()) {
pendingRequests.remove(sessionId);
}
}
ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
}
ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
endOfIteration();
} while (!stoppedMainLoop);
} catch (Throwable e) {
handleException(this.getName(), e);
}
LOG.info("CommitProcessor exited loop!");
}
- 判断是否有请求需要处理,如果没有,则睡眠。
- 第一个while循环从queuedRequests中拿出第一个请求,如果该请求不需要commit过程(一般是读请求),且该请求前没有同一个session发来的需要commit的请求,则将该请求直接交给下一个processor处理。直到有需要commit的请求或者queuedRequests为空时,或者在一次循环中处理这样的请求超过maxReadBatchSize个时。
- 第二个while循环处理需要commit的请求,查看committedRequests的第一个请求,判断是不是客户端发来的请求,如果是,这个请求会是queuedWriteRequests的第一个元素。
- 判断是否有同一session下的该请求前的读请求还没处理或者queuedRequests还没有处理到该请求,如果是,结束循环。否则处理该请求,然后将session放入queuesToDrain。
- 最后一个for循环处理因写请求未完成而不能执行的读请求。
接下来看ToBeAppliedRequestProcessor的流程:
public void processRequest(Request request) throws RequestProcessorException {
next.processRequest(request);
// The only requests that should be on toBeApplied are write
// requests, for which we will have a hdr. We can't simply use
// request.zxid here because that is set on read requests to equal
// the zxid of the last write op.
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator<Proposal> iter = leader.toBeApplied.iterator();
if (iter.hasNext()) {
Proposal p = iter.next();
if (p.request != null && p.request.zxid == zxid) {
iter.remove();
return;
}
}
LOG.error("Committed request not found on toBeApplied: {}", request);
}
}
逻辑很简单,将请求交给下一个processor处理,将commit时放到toBeApplied中的请求移除。
最后是FinalRequestProcessor,前面我们已经分析过了,就不再重复了。
follower模式下
processor链的初始化入口是FollowerZooKeeperServer的setupRequestProcessors方法:
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
对于客户端发来的请求:
对于leader发过来的请求:
首先分析FollowerRequestProcessor:
public void processRequest(Request request) {
processRequest(request, true);
}
void processRequest(Request request, boolean checkForUpgrade) {
if (!finished) {
if (checkForUpgrade) {
// Before sending the request, check if the request requires a
// global session and what we have is a local session. If so do
// an upgrade.
Request upgradeRequest = null;
try {
upgradeRequest = zks.checkUpgradeSession(request);
} catch (KeeperException ke) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(ke.code().intValue()));
}
request.setException(ke);
LOG.warn("Error creating upgrade request", ke);
} catch (IOException ie) {
LOG.error("Unexpected error in upgrade", ie);
}
if (upgradeRequest != null) {
queuedRequests.add(upgradeRequest);
}
}
queuedRequests.add(request);
}
}
主要流程如下:
- 检查是否是一个本地session,且需要升级为全局session,如果是,插入一个创建session的请求。
- 请求入队,可能入队一个,也可能多入队一个创建session的请求。
接下来看run方法:
public void run() {
try {
while (!finished) {
ServerMetrics.getMetrics().LEARNER_REQUEST_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// Screen quorum requests against ACLs first
if (!zks.authWriteRequest(request)) {
continue;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
maybeSendRequestToNextProcessor(request);
if (request.isThrottled()) {
continue;
}
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getFollower().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getFollower().request(request);
}
break;
}
}
} catch (RuntimeException e) { // spotbugs require explicit catch of RuntimeException
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}
- 从队列头部取出第一个请求。
- 通过ACL检查权限。
- 判断是否开启zookeeper.follower.skipLearnerRequestToNextProcessor且请求是从observer发来的。如果不是,将请求交给下一个processor处理。
- 判断是否是写请求,如果是,发送请求给leader。
CommitProcessor、FinalRequestProcessor和SyncRequestProcessor,前面我们已经分析过了,就不再重复了。
来看SendAckRequestProcessor:
public void processRequest(Request si) {
if (si.type != OpCode.sync) {
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
try {
si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}
逻辑很简单,就是在SyncRequestProcessor完成写入事务日志操作后,发送ack给leader。leader在收到半数follower的确认后,会发送commit包给follower,这时我们会调用CommitProcessor的commit方法,最后该请求会由FinalRequestProcessor处理。
observer模式下
processor链的初始化入口是ObserverZooKeeperServer的setupRequestProcessors方法:
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
/*
* Observer should write to disk, so that the it won't request
* too old txn from the leader which may lead to getting an entire
* snapshot.
*
* However, this may degrade performance as it has to write to disk
* and do periodic snapshot which may double the memory requirements
*/
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
对于客户端发来的请求:
对于master(不一定是leader)发来的请求:
如果开启了zookeeper.observer.syncEnabled(默认开启),请求会由SyncRequestProcessor和CommitProcessor处理,如果没开启,请求只由CommitProcessor处理。
我们来看ObserverRequestProcessor的流程:
public void processRequest(Request request) {
if (!finished) {
Request upgradeRequest = null;
try {
upgradeRequest = zks.checkUpgradeSession(request);
} catch (KeeperException ke) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(ke.code().intValue()));
}
request.setException(ke);
LOG.info("Error creating upgrade request", ke);
} catch (IOException ie) {
LOG.error("Unexpected error in upgrade", ie);
}
if (upgradeRequest != null) {
queuedRequests.add(upgradeRequest);
}
queuedRequests.add(request);
}
}
- 检查是否是一个本地session,且需要升级为全局session,如果是,插入一个创建session的请求。
- 请求入队,可能入队一个,也可能多入队一个创建session的请求。
接看来看run方法:
public void run() {
try {
while (!finished) {
ServerMetrics.getMetrics().LEARNER_REQUEST_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// Screen quorum requests against ACLs first
if (!zks.authWriteRequest(request)) {
continue;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
if (request.isThrottled()) {
continue;
}
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this Observer has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getObserver().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getObserver().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getObserver().request(request);
}
break;
}
}
} catch (RuntimeException e) { // spotbugs require explicit catch of RuntimeException
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("ObserverRequestProcessor exited loop!");
}
- 从队列头部取出第一个请求。
- 通过ACL检查权限。
- 将请求交给下一个processor处理。
- 判断是否是写请求,如果是,发送请求给master。
其它的processor前面我们都分析过了,值得注意的是observer与follower不同的是,follower参与两阶段提交(第一阶段收到提案,写事务日志,第二阶段提交提案。)。observer是一阶段提交,master完成写操作后发送INFORM给observer,observer根据zookeeper.observer.syncEnabled决定是否需要写事务日志到磁盘,然后调用CommitProcessor的commit方法,最后该请求会由FinalRequestProcessor处理。