上一篇启动分析我们提到,ServerCnxnFactory负责接收客户端请求。ServerCnxnFactory有两个实现,NIOServerCnxnFactory和NettyServerCnxnFactory,因为通信协议与实现无关,所以我们只分析NettyServerCnxnFactory。
TCP服务器的启动代码如下
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
cnxnFactory.startup(zkServer);
ServerCnxnFactory.createFactory根据zookeeper.serverCnxnFactory这个jvm属性的值来实例化对应的ServerCnxnFactory并返回。cnxnFactory.configure配置ServerCnxnFactory的监听地址、最大连接数、背压和是否使用安全连接(SSL)。
进入到NettyServerCnxnFactory的构造器
NettyServerCnxnFactory() {
x509Util = new ClientX509Util();
boolean usePortUnification = Boolean.getBoolean(PORT_UNIFICATION_KEY);
LOG.info("{}={}", PORT_UNIFICATION_KEY, usePortUnification);
if (usePortUnification) {
try {
QuorumPeerConfig.configureSSLAuth();
} catch (QuorumPeerConfig.ConfigException e) {
LOG.error("unable to set up SslAuthProvider, turning off client port unification", e);
usePortUnification = false;
}
}
this.shouldUsePortUnification = usePortUnification;
this.advancedFlowControlEnabled = Boolean.getBoolean(NETTY_ADVANCED_FLOW_CONTROL);
LOG.info("{} = {}", NETTY_ADVANCED_FLOW_CONTROL, this.advancedFlowControlEnabled);
setOutstandingHandshakeLimit(Integer.getInteger(OUTSTANDING_HANDSHAKE_LIMIT, -1));
EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(NettyUtils.getClientReachableLocalInetAddressCount());
EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NettyUtils.nioOrEpollServerSocketChannel())
// parent channel options
.option(ChannelOption.SO_REUSEADDR, true)
// child channels options
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, -1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (advancedFlowControlEnabled) {
pipeline.addLast(readIssuedTrackingHandler);
}
if (secure) {
initSSL(pipeline, false);
} else if (shouldUsePortUnification) {
initSSL(pipeline, true);
}
pipeline.addLast("servercnxnfactory", channelHandler);
}
});
this.bootstrap = configureBootstrapAllocator(bootstrap);
this.bootstrap.validate();
}
主要包含以下步骤:
- 读取zookeeper.client.portUnification这个jvm属性的值,如果这个属性为true,且securety为false,则连接是否使用SSL由客户端决定。
- 读取zookeeper.netty.advancedFlowControl.enabled的jvm属性的值,如果这个属性为true,将会开启高级流控模式,在高级流控模式下,Netty的autoread会被禁用,在一次读取完成后是否继续读取由应用自己决定。
- 初始化Netty服务器,关键的代码是:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (advancedFlowControlEnabled) {
pipeline.addLast(readIssuedTrackingHandler);
}
if (secure) {
initSSL(pipeline, false);
} else if (shouldUsePortUnification) {
initSSL(pipeline, true);
}
pipeline.addLast("servercnxnfactory", channelHandler); //channelHandler是CnxnChannelHandler的一个实例对象
}
});
readIssuedTrackingHandler负责跟踪在高级流控模式下,一次读取完成后调用了多少次read。initSSL初始化SSL相关的handler,这意味着所有的请求数据都由channelHandler处理。
接下来看ServerCnxnFactory的startup方法:
public void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException {
startup(zkServer, true);
}
继续看NettyServerCnxnFactory的startup方法:
public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
start();
setZooKeeperServer(zks);
if (startServer) {
zks.startdata();
zks.startup();
}
}
ZookeeperServer类的启动方法暂时不分析,本篇只分析协议部分,继续看NettyServerCnxnFactory的start方法:
public void start() {
if (listenBacklog != -1) {
bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
}
LOG.info("binding to port {}", localAddress);
parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
// Port changes after bind() if the original port was 0, update
// localAddress to get the real port.
localAddress = (InetSocketAddress) parentChannel.localAddress();
LOG.info("bound to port {}", getLocalPort());
}
这里的主要逻辑是监听端口并开始接收客户端请求。
前面我们讲了,请求最终实际上是由channelHandler处理的,我们首先来看CnxnChannelHandler的channelActive方法,这个方法在TCP连接建立后被调用。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel active {}", ctx.channel());
}
final Channel channel = ctx.channel();
if (limitTotalNumberOfCnxns()) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
channel.close();
return;
}
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
channel.close();
return;
}
NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
// Check the zkServer assigned to the cnxn is still running,
// close it before starting the heavy TLS handshake
if (!cnxn.isZKServerRunning()) {
LOG.warn("Zookeeper server is not running, close the connection before starting the TLS handshake");
ServerMetrics.getMetrics().CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING.add(1);
channel.close();
return;
}
if (handshakeThrottlingEnabled) {
// Favor to check and throttling even in dual mode which
// accepts both secure and insecure connections, since
// it's more efficient than throttling when we know it's
// a secure connection in DualModeSslHandler.
//
// From benchmark, this reduced around 15% reconnect time.
int outstandingHandshakesNum = outstandingHandshake.addAndGet(1);
if (outstandingHandshakesNum > outstandingHandshakeLimit) {
outstandingHandshake.addAndGet(-1);
channel.close();
ServerMetrics.getMetrics().TLS_HANDSHAKE_EXCEEDED.add(1);
} else {
cnxn.setHandshakeState(HandshakeState.STARTED);
}
}
if (secure) {
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
} else if (!shouldUsePortUnification) {
allChannels.add(ctx.channel());
addCnxn(cnxn);
}
}
主要步骤如下:
- 判断是否超过总连接数限制,如果超过,直接断开连接。
- 判断是否超过单地址连接数限制,如果超过,直接断开连接。
- 实例化NettyServerCnxn类并附到channel的attribution里。
- 判断ZookeeperServer类是否启动完成,如果没有,直接断开连接。
- SSL握手相关代码,不重要,暂不分析。
- 将所有非安全的连接和SSL握手完成的channel放入allChannels中,将cnxn放入cnxns中,并按客户端地址统计连接数放入ipMap中。
接下来看channelRead方法,该方法在收到客户端数据后被调用。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("message received called {}", msg);
}
try {
LOG.debug("New message {} from {}", msg, ctx.channel());
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
cnxn.processMessage((ByteBuf) msg);
}
} catch (Exception ex) {
LOG.error("Unexpected exception in receive", ex);
throw ex;
}
} finally {
ReferenceCountUtil.release(msg);
}
}
逻辑比较简单,调用cnxn.processMessage方法并将收到的数据传递过去。
接下来看NettyServerCnxn的processMessage方法:
void processMessage(ByteBuf buf) {
checkIsInEventLoop("processMessage");
LOG.debug("0x{} queuedBuffer: {}", Long.toHexString(sessionId), queuedBuffer);
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} buf {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(buf));
}
if (throttled.get()) {
LOG.debug("Received message while throttled");
// we are throttled, so we need to queue
if (queuedBuffer == null) {
LOG.debug("allocating queue");
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedDuplicate());
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
}
} else {
LOG.debug("not throttled");
if (queuedBuffer != null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
receiveMessage(buf);
// Have to check !closingChannel, because an error in
// receiveMessage() could have led to close() being called.
if (!closingChannel && buf.isReadable()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Before copy {}", buf);
}
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
if (LOG.isTraceEnabled()) {
LOG.trace("Copy is {}", queuedBuffer);
LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
}
}
}
}
}
主要流程如下:
- 判断当前是否处于禁止处理数据包的状态。如果是,将数据追加到queuedBuffer缓冲区内,不进行包解析操作。
- 如果当前允许处理数据包,则调用processQueuedBuffer或receiveMessage处理协议包,二者的逻辑基本相同。processQueuedBuffer会调用receiveMessage试图处理协议包,如果协议包处理完成,则清理queuedBuffer已读取的部分。queuedBuffer的存在是为了处理粘包和半包问题,核心的处理逻辑都是receiveMessage。
接下来看NettyServerCnxn的receiveMessage方法:
private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while (message.isReadable() && !throttled.get()) {
if (bb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() + message.readableBytes();
bb.limit(newLimit);
}
message.readBytes(bb);
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("after readbytes 0x{} bb {}",
Long.toHexString(sessionId),
ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() == 0) {
bb.flip();
packetReceived(4 + bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
// TODO: if zks.processPacket() is changed to take a ByteBuffer[],
// we could implement zero-copy queueing.
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null;
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
ByteBuffer dat = bbLen.duplicate();
dat.flip();
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (message.readableBytes() < bbLen.remaining()) {
bbLen.limit(bbLen.position() + message.readableBytes());
}
message.readBytes(bbLen);
bbLen.limit(bbLen.capacity());
if (bbLen.remaining() == 0) {
bbLen.flip();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
}
int len = bbLen.getInt();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
}
bbLen.clear();
if (!initialized) {
if (checkFourLetterWord(channel, message, len)) {
return;
}
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
bb = ByteBuffer.allocate(len);
}
}
}
} catch (IOException e) {
LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.IO_EXCEPTION);
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.CLIENT_RATE_LIMIT);
}
}
主要流程如下:
- 判断bb(存储完整的协议包的缓冲区)是不是null,如果是null,代表协议头(协议包长度或者四字指令)还没有完整读取,如果不是null,代表协议头已经完整读取,但是协议体还没完整读取。
- 如果需要,读取协议头。
- 调用checkFourLetterWord,通过协议头判断是否是四字指令,四字指令不需要协议体(stmk除外,stmk需要一个8字节的参数。),如果是四字指令则直接处理,四字指令不需要经过session初始化(不需要握手等流程,握手后无法执行四字指令。)。
- 如果不是四字指令,读取协议体到bb,调用zks.processConnectRequest开始握手,握手后设置initialized为true。
- 认证后,后续的协议包通过调用zks.processPacket处理。
四字指令只包含一个四字节长的协议头。主要用来获取ZooKeeper服务的当前状态及相关信息,每次服务器响应四字指令后都会关闭连接。
四字指令表代码(位于org.apache.zookeeper.server.command.FourLetterCommands)如下:
static {
cmd2String.put(confCmd, "conf");
cmd2String.put(consCmd, "cons");
cmd2String.put(crstCmd, "crst");
cmd2String.put(dirsCmd, "dirs");
cmd2String.put(dumpCmd, "dump");
cmd2String.put(enviCmd, "envi");
cmd2String.put(getTraceMaskCmd, "gtmk");
cmd2String.put(ruokCmd, "ruok");
cmd2String.put(setTraceMaskCmd, "stmk");
cmd2String.put(srstCmd, "srst");
cmd2String.put(srvrCmd, "srvr");
cmd2String.put(statCmd, "stat");
cmd2String.put(wchcCmd, "wchc");
cmd2String.put(wchpCmd, "wchp");
cmd2String.put(wchsCmd, "wchs");
cmd2String.put(mntrCmd, "mntr");
cmd2String.put(isroCmd, "isro");
cmd2String.put(telnetCloseCmd, "telnet close");
cmd2String.put(hashCmd, "hash");
}
简介如下:
指令 | 描述 |
---|---|
conf | 获取服务相关配置的详细信息。 |
cons | 获取所有连接到这台服务器的客户端的详细统计信息。 |
crst | 重置所有连接到这台服务器的客户端的详细统计信息。 |
dirs | 获取数据和日志的总大小。 |
dump | 获取比较重要的会话和临时节点。如果在集群模式下这个命令只能在leader节点上用。 |
envi | 获取出服务环境的详细信息。 |
gtmk | 获取text trace mask。 |
ruok | 测试服务是否处于正确状态,正确状态会返回字符串imok。 |
stmk | 设置text trace mask。 |
srst | 重置服务器的统计信息。 |
srvr | 获取连接服务器的详细信息。 |
stat | 获取关于性能和连接的客户端的列表。 |
wchc | 获取服务器watch的详细信息。 |
wchp | 获取服务器watch的详细信息,按路径分组排序输出。 |
wchs | 获取服务器watch的摘要信息。 |
mntr | 获取监控变量值。 |
isro | 获取服务器是否是只读的服务器。 |
0xfff4fffd(telnet close) | telnet关闭连接前会发送这个,因为是个负数,添加这个命令只是为了防止异常退出。 |
hash | 获取最近的摘要(digest)日志。 |
忽略握手相关过程,我们来看ZookeeperServer的processPacket方法:
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
// a different server to connect to.
authReturn = ap.handleAuthentication(
new ServerAuthenticationProvider.ServerObjs(this, cnxn),
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.debug("Authentication succeeded for scheme: {}", scheme);
LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn(
"No authentication provider for scheme: {} has {}",
scheme,
ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
processSasl(incomingBuffer, cnxn, h);
} else {
if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
cnxn.sendResponse(replyHeader, null, "response");
cnxn.sendCloseSession();
cnxn.disableRecv();
} else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
}
- 首先从前面读出来的协议体里读出请求头,请求头包含两个部分xid和type,都是四字节的整数。
- 递增处理中请求计数器,并判断服务器是否积压了太多请求未完成的请求,如果积压了太多的请求,则暂停从客户端接收数据。
- 判断是否是认证请求,如果是认证请求,则开始认证并返回认证结果。
- 判断是否是SASL认证请求,如果是,则开始SASL认证并返回认证结果。
- 否则判断是否强制要求SASL认证且SASL认证认证未完成,如果是,关闭连接。
- 提交请求到请求队列(在大请求的情况下,如果积压了过多的大请求,会拒绝该请求。)。
不同请求处理的具体逻辑暂不分析,后面的文章可能会继续分析,我们来看Request的结构,找到org.apache.zookeeper.ZooDefs下的OpCode,这里存储了所有的操作码:
public interface OpCode {
int notification = 0;
int create = 1;
int delete = 2;
int exists = 3;
int getData = 4;
int setData = 5;
int getACL = 6;
int setACL = 7;
int getChildren = 8;
int sync = 9;
int ping = 11;
int getChildren2 = 12;
int check = 13;
int multi = 14;
int create2 = 15;
int reconfig = 16;
int checkWatches = 17;
int removeWatches = 18;
int createContainer = 19;
int deleteContainer = 20;
int createTTL = 21;
int multiRead = 22;
int auth = 100;
int setWatches = 101;
int sasl = 102;
int getEphemerals = 103;
int getAllChildrenNumber = 104;
int setWatches2 = 105;
int addWatch = 106;
int createSession = -10;
int closeSession = -11;
int error = -1;
}
结合org.apache.zookeeper.proto下的请求实体类,我们可以总结出所有的request的相关信息(N/A代表只有响应或请求头,没有响应体或请求体。):
操作码 | 作用 | 参数 | 请求实体 | 响应实体 |
---|---|---|---|---|
notification | N/A | N/A | N/A | N/A |
create | 创建节点 | path,data,acl,flags | CreateRequest | CreateResponse |
delete | 删除节点 | path,version | DeleteRequest | N/A |
exists | 判断节点是否存在,可选择在节点存在时监听该节点变化 | path,watch | ExistsRequest | ExistsResponse |
getData | 获取节点存储的值,可选择在节点存在时监听该节点变化 | path,watch | GetDataRequest | GetDataResponse |
setData | 设置节点存储的值 | path,data,version | SetDataRequest | SetDataResponse |
getACL | 获取节点的访问权限控制列表 | path | GetACLRequest | GetACLResponse |
setACL | 设置节点的访问权限控制列表 | path,acl,version | SetACLRequest | SetACLResponse |
getChildren | 获取节点的子节点,可选择在节点存在时监听该节点子节点变化 | path,watch | GetChildrenRequest | GetChildrenResponse |
sync | 集群模式下使用,与leader同步指定path的数据 | path | SyncRequest | SyncResponse |
ping | 心跳包 | N/A | N/A | N/A |
getChildren2 | 获取节点的子节点,可选择在节点存在时监听该节点子节点变化,与getChildren相比,该操作码会返回path对应节点的详细状态信息 | path,watch | GetChildren2Request | GetChildren2Response |
check | 判断version是否为最新版本,与multi配合使用 | path,version | CheckVersionRequest | SetDataResponse |
multi | 将多个操作放到一个事务里处理 | N/A | MultiOperationRecord | MultiResponse |
create2 | 创建节点,与create相比,该操作码会返回创建后的节点的详细状态信息 | path,data,acl,flags | CreateRequest | Create2Response |
reconfig | 集群模式下使用,触发集群的重新计票选举 | joiningServers,leavingServers,newMembers,curConfigId | ReconfigRequest | GetDataResponse |
checkWatches | 检查当前连接是否在监听path对应节点的变化,type有三种任意变化、子节点变化和当前节点数据变化 | path,type | CheckWatchesRequest | N/A |
removeWatches | 停止监听path对应的节点 | path,type | RemoveWatchesRequest | N/A |
createContainer | 创建container节点 | path,data,acl,flags | CreateRequest | Create2Response |
deleteContainer | 删除container节点 | path | DeleteRequest | N/A |
createTTL | 创建有TTL时间的节点 | path,data,acl,flags,ttl | CreateTTLRequest | Create2Response |
multiRead | 一次获取多个节点的值或者子节点 | N/A | MultiOperationRecord | MultiResponse |
auth | 认证包 | type,scheme,auth | AuthPacket | N/A |
setWatches | 监听节点 | relativeZxid,dataWatches,existWatches,childWatches | SetWatches | N/A |
sasl | SASL认证 | token | GetSASLRequest | SetSASLResponse |
getEphemerals | 获取创建的以prefixPath开头的临时节点 | prefixPath | GetEphemeralsRequest | GetEphemeralsResponse |
getAllChildrenNumber | 获取path下子节点的数量 | path | GetAllChildrenNumberRequest | GetAllChildrenNumberResponse |
setWatches2 | 监听节点,与setWatches相比,增加了持久化监听类型 | relativeZxid,dataWatches,existWatches,childWatches,persistentWatches,persistentRecursiveWatches | SetWatches2 | N/A |
addWatch | 为节点添加watch | path,mode | AddWatchRequest | ErrorResponse |
createSession | 创建session | protocolVersion,lastZxidSeen,timeOut,sessionId,passwd | ConnectRequest | ConnectResponse |
closeSession | 关闭session | N/A | N/A | N/A |
error | 与multi一起使用,表示命令执行错误 | N/A | N/A | N/A |
Zookeeper的C/S通信协议没有采用http协议,而是使用TCP上的自定义协议。这样做的好处时相比http,通信效率更高,watch更加容易实现(长连接,全双工双向通信),缺点是每当命令参数改变时,都要引入新的命令。
最后我们总结下Zookeeper的C/S自定义TCP通信协议的包结构。
请求包(客户端到服务器,非四字指令):
协议头 | 请求头 | 请求体 |
---|---|---|
4字节 | 8字节 | 协议头表示的长度 - 8 |
响应包(服务器到客户端):
协议头 | 响应头 | 响应体 |
---|---|---|
4字节 | 16字节 | 协议头表示的长度 - 16 |
集合、数组和向量表示:
大小 | 元素1-n |
---|---|
4字节 | n*元素大小 |
四字指令(客户端到服务器):
指令码 | 参数 |
---|---|
4字节 | 0或8字节 |
四字指令响应(服务器到客户端):
响应内容 |
---|
不定长度 |