Watcher的基本流程
ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher
客户端注册watcher有3种方式,getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理
|
ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){
public void processor(WatchedEvent event){
System.out.println(“event.type”);
}
});
zookeeper.create(“/mic”,”0”.getByte(),ZooDefs.Ids. OPEN_ACL_UNSAFE,CreateModel. PERSISTENT); //创建节点
zookeeper.exists(“/mic”,true); //注册监听
zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改节点的值触发监听
|
ZooKeeper API的初始化过程
|
ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){
public void processor(WatchedEvent event){
System.out.println(“event.type”);
}
});
|
在创建一个 ZooKeeper 客户端对象实例时,我们通过new Watcher()向构造方法中传入一个默认的 Watcher, 这个 Watcher 将作为整个 ZooKeeper会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如下
|
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
- " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher; --在这里将watcher设置到ZKWatchManager
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
--初始化了ClientCnxn,并且调用cnxn.start()方法
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
|
ClientCnxn:是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,
1. SendThread :负责客户端和服务器端的数据通信, 也包括事件信息的传输
2. EventThread : 主要在客户端回调注册的Watchers进行通知处理
ClientCnxn初始化
|
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket); --初始化sendThread
eventThread = new EventThread(); --初始化eventThread
this.clientConfig=zooKeeper.getClientConfig();
}
public void start() { --启动两个线程
sendThread.start();
eventThread.start();
}
|
客户端通过exists注册监听
|
zookeeper.exists(“/mic”,true); //注册监听
|
通过exists方法来注册监听,代码如下
|
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath); //构建ExistWatchRegistration
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists); //设置操作类型为exists
ExistsRequest request = new ExistsRequest(); // 构造ExistsRequest
request.setPath(serverPath);
request.setWatch(watcher != null); //是否注册监听
SetDataResponse response = new SetDataResponse(); //设置服务端响应的接收类
//将封装的RequestHeader、ExistsRequest、SetDataResponse、WatchRegistration添加到发送队列
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
//返回exists得到的结果(Stat信息)
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
|
cnxn.submitRequest
|
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
//将消息添加到队列,并构造一个Packet传输对象
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);
synchronized (packet) {
while (!packet.finished) { //在数据包没有处理完成之前,一直阻塞
packet.wait();
}
}
return r;
}
|
|
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
//将相关传输对象转化成Packet
Packet packet = null;
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet); //添加到outgoingQueue
}
}
sendThread.getClientCnxnSocket().packetAdded();//此处是多路复用机制,唤醒Selector,告诉他有数据包添加过来了
return packet;
}
|
在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。在 ClientCnxn 中 WatchRegistration 也会被封装到 Pakcet 中,然后由 SendThread 线程调用queuePacket方法把 Packet 放入发送队列中等待客户端发送,这又是一个异步过程,分布式系统采用异步通信是一个非常常见的手段
SendThread的发送过程
在初始化连接的时候,zookeeper初始化了两个线程并且启动了。接下来我们来分析SendThread的发送过程,因为是一个线程,所以启动的时候会调用SendThread.run方法
|
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {// 如果没有连接:发起连接
// don't re-establish connection if we are closing
if (closing) {
break;
}
startConnect(); //发起连接
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) { //如果是连接状态,则处理sasl的认证授权
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//to,表示客户端距离timeout还剩多少时间,准备发起ping连接
if (to <= 0) {//表示已经超时了。
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
clientCnxnSocket.getIdleRecv()
"ms"
" for sessionid 0x"
Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//计算下一次ping请求的时间
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing(); //发送ping请求
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
调用clientCnxnSocket,发起传输
其中 pendingQueue是一个用来存放已经发送、等待回应的Packet队列,
clientCnxnSocket默认使用ClientCnxnSocketNIO(ps:还记得在哪里初始化吗?在实例化zookeeper的时候)
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
Long.toHexString(getSessionId())
" : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
Long.toHexString(getSessionId())
" for server "
clientCnxnSocket.getRemoteSocketAddress()
", unexpected error"
RETRY_CONN_MSG, e);
}
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
- Long.toHexString(getSessionId()));
}
|
client 和 server的网络交互
|
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {
try {
if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
Packet head = null;
if (needSasl.get()) {
if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
} else {
//判断outgoingQueue是否存在待发送的数据包,不存在则直接返回
if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
return;
}
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
// adding back the patck to notify of failure in conLossPacket().
addBack(head);
return;
}
// channel disconnection happened
if (disconnected.get()) { //异常流程,channel关闭了,讲当前的packet添加到addBack中
addBack(head);
throw new EndOfStreamException("channel for sessionid 0x"
Long.toHexString(sessionId)
" is lost");
}
if (head != null) { //如果当前存在需要发送的数据包,则调用doWrite方法,pendingQueue表示处于已经发送过等待响应的packet队列
doWrite(pendingQueue, head, cnxn);
}
} finally {
updateNow();
}
}
|
DoWrite方法
|
private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
updateNow();
while (true) {
if (p != WakeupPacket.getInstance()) {
if ((p.requestHeader != null) && //判断请求头以及判断当前请求类型不是ping或者auth操作
(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid()); //设置xid,这个xid用来区分请求类型
synchronized (pendingQueue) {
pendingQueue.add(p); //将当前的packet添加到pendingQueue队列中
}
}
sendPkt(p); //将数据包发送出去
}
if (outgoingQueue.isEmpty()) {
break;
}
p = outgoingQueue.remove();
}
}
|
sendPkt
|
private void sendPkt(Packet p) {
// Assuming the packet will be sent out successfully. Because if it fails,
// the channel will close and clean up queues.
p.createBB(); //序列化请求数据
updateLastSend(); //更新最后一次发送updateLastSend
sentCount++; //更新发送次数
channel.write(ChannelBuffers.wrappedBuffer(p.bb)); //通过nio channel发送字节缓存到服务端
}
|
createBB
|
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header"); //序列化header头(requestHeader)
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request"); //序列化request(request)
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
|
从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。
总结
用户调用exists注册监听以后,会做几个事情
讲请求数据封装为packet,添加到outgoingQueue
SendThread这个线程会执行数据发送操作,主要是将outgoingQueue队列中的数据发送到服务端
通过clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); 其中ClientCnxnSocket只zookeeper客户端和服务端的连接通信的封装,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO;具体使用哪一个类来实现发送,是在初始化过程是在实例化Zookeeper的时候设置的,代码如下
|
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
|
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = getClientConfig().getProperty(
ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
Constructor<?> clientCxnConstructor =
Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
- clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
|
- 基于第3步,最终会在ClientCnxnSocketNetty方法中执行sendPkt将请求的数据包发送到服务端
对Java技术,架构技术感兴趣的同学,欢迎加QQ群619881427,一起学习,相互讨论。
群内已经有小伙伴将知识体系整理好(源码,笔记,PPT,学习视频),欢迎加群免费领取。
分享给喜欢Java,喜欢编程,有梦想成为架构师的程序员们,希望能够帮助到你们。
不是Java程序员也没关系,帮忙转发给更多朋友!谢谢。
分享一个小技巧点击阅读原文也可以轻松获取到学习资料哦!!