Zookeeper-watcher机制源码分析(一)

Watcher的基本流程

ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher

客户端注册watcher有3种方式,getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理

|

ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){

public void processor(WatchedEvent event){

System.out.println(“event.type”);

}

});

zookeeper.create(“/mic”,”0”.getByte(),ZooDefs.Ids. OPEN_ACL_UNSAFE,CreateModel. PERSISTENT); //创建节点

zookeeper.exists(“/mic”,true); //注册监听

zookeeper.setData(“/mic”, “1”.getByte(),-1) ; //修改节点的值触发监听

|

ZooKeeper API的初始化过程

|

ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){

public void processor(WatchedEvent event){

System.out.println(“event.type”);

}

});

|

在创建一个 ZooKeeper 客户端对象实例时,我们通过new Watcher()向构造方法中传入一个默认的 Watcher, 这个 Watcher 将作为整个 ZooKeeper会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如下

|

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,

boolean canBeReadOnly, HostProvider aHostProvider,

ZKClientConfig clientConfig) throws IOException {

LOG.info("Initiating client connection, connectString=" + connectString

  • " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

if (clientConfig == null) {

clientConfig = new ZKClientConfig();

}

this.clientConfig = clientConfig;

watchManager = defaultWatchManager();

watchManager.defaultWatcher = watcher; --在这里将watcher设置到ZKWatchManager

ConnectStringParser connectStringParser = new ConnectStringParser(

connectString);

hostProvider = aHostProvider;

--初始化了ClientCnxn,并且调用cnxn.start()方法

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

hostProvider, sessionTimeout, this, watchManager,

getClientCnxnSocket(), canBeReadOnly);

cnxn.start();

}

|

ClientCnxn:是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,

1. SendThread :负责客户端和服务器端的数据通信, 也包括事件信息的传输

2. EventThread : 主要在客户端回调注册的Watchers进行通知处理

ClientCnxn初始化

|

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,

ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,

long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {

this.zooKeeper = zooKeeper;

this.watcher = watcher;

this.sessionId = sessionId;

this.sessionPasswd = sessionPasswd;

this.sessionTimeout = sessionTimeout;

this.hostProvider = hostProvider;

this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();

readTimeout = sessionTimeout * 2 / 3;

readOnly = canBeReadOnly;

sendThread = new SendThread(clientCnxnSocket); --初始化sendThread

eventThread = new EventThread(); --初始化eventThread

this.clientConfig=zooKeeper.getClientConfig();

}

public void start() { --启动两个线程

sendThread.start();

eventThread.start();

}

|

客户端通过exists注册监听

|

zookeeper.exists(“/mic”,true); //注册监听

|

通过exists方法来注册监听,代码如下

|

public Stat exists(final String path, Watcher watcher)

throws KeeperException, InterruptedException

{

final String clientPath = path;

PathUtils.validatePath(clientPath);

// the watch contains the un-chroot path

WatchRegistration wcb = null;

if (watcher != null) {

wcb = new ExistsWatchRegistration(watcher, clientPath); //构建ExistWatchRegistration

}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();

h.setType(ZooDefs.OpCode.exists); //设置操作类型为exists

ExistsRequest request = new ExistsRequest(); // 构造ExistsRequest

request.setPath(serverPath);

request.setWatch(watcher != null); //是否注册监听

SetDataResponse response = new SetDataResponse(); //设置服务端响应的接收类

//将封装的RequestHeader、ExistsRequest、SetDataResponse、WatchRegistration添加到发送队列

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

if (r.getErr() != 0) {

if (r.getErr() == KeeperException.Code.NONODE.intValue()) {

return null;

}

throw KeeperException.create(KeeperException.Code.get(r.getErr()),

clientPath);

}

//返回exists得到的结果(Stat信息)

return response.getStat().getCzxid() == -1 ? null : response.getStat();

}

|

cnxn.submitRequest

|

public ReplyHeader submitRequest(RequestHeader h, Record request,

Record response, WatchRegistration watchRegistration,

WatchDeregistration watchDeregistration)

throws InterruptedException {

ReplyHeader r = new ReplyHeader();

//将消息添加到队列,并构造一个Packet传输对象

Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);

synchronized (packet) {

while (!packet.finished) { //在数据包没有处理完成之前,一直阻塞

packet.wait();

}

}

return r;

}

|

|

public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,

Record response, AsyncCallback cb, String clientPath,

String serverPath, Object ctx, WatchRegistration watchRegistration,

WatchDeregistration watchDeregistration) {

//将相关传输对象转化成Packet

Packet packet = null;

packet = new Packet(h, r, request, response, watchRegistration);

packet.cb = cb;

packet.ctx = ctx;

packet.clientPath = clientPath;

packet.serverPath = serverPath;

packet.watchDeregistration = watchDeregistration;

synchronized (state) {

if (!state.isAlive() || closing) {

conLossPacket(packet);

} else {

if (h.getType() == OpCode.closeSession) {

closing = true;

}

outgoingQueue.add(packet); //添加到outgoingQueue

}

}

sendThread.getClientCnxnSocket().packetAdded();//此处是多路复用机制,唤醒Selector,告诉他有数据包添加过来了

return packet;

}

|

在 ZooKeeper 中,Packet 是一个最小的通信协议单元,即数据包。Pakcet 用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。在 ClientCnxn 中 WatchRegistration 也会被封装到 Pakcet 中,然后由 SendThread 线程调用queuePacket方法把 Packet 放入发送队列中等待客户端发送,这又是一个异步过程,分布式系统采用异步通信是一个非常常见的手段

SendThread的发送过程

在初始化连接的时候,zookeeper初始化了两个线程并且启动了。接下来我们来分析SendThread的发送过程,因为是一个线程,所以启动的时候会调用SendThread.run方法

|

public void run() {

clientCnxnSocket.introduce(this, sessionId, outgoingQueue);

clientCnxnSocket.updateNow();

clientCnxnSocket.updateLastSendAndHeard();

int to;

long lastPingRwServer = Time.currentElapsedTime();

final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

while (state.isAlive()) {

try {

if (!clientCnxnSocket.isConnected()) {// 如果没有连接:发起连接

// don't re-establish connection if we are closing

if (closing) {

break;

}

startConnect(); //发起连接

clientCnxnSocket.updateLastSendAndHeard();

}

if (state.isConnected()) { //如果是连接状态,则处理sasl的认证授权

// determine whether we need to send an AuthFailed event.

if (zooKeeperSaslClient != null) {

boolean sendAuthEvent = false;

if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {

try {

zooKeeperSaslClient.initialize(ClientCnxn.this);

} catch (SaslException e) {

LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);

state = States.AUTH_FAILED;

sendAuthEvent = true;

}

}

KeeperState authState = zooKeeperSaslClient.getKeeperState();

if (authState != null) {

if (authState == KeeperState.AuthFailed) {

// An authentication error occurred during authentication with the Zookeeper Server.

state = States.AUTH_FAILED;

sendAuthEvent = true;

} else {

if (authState == KeeperState.SaslAuthenticated) {

sendAuthEvent = true;

}

}

}

if (sendAuthEvent == true) {

eventThread.queueEvent(new WatchedEvent(

Watcher.Event.EventType.None,

authState,null));

}

}

to = readTimeout - clientCnxnSocket.getIdleRecv();

} else {

to = connectTimeout - clientCnxnSocket.getIdleRecv();

}

//to,表示客户端距离timeout还剩多少时间,准备发起ping连接

if (to <= 0) {//表示已经超时了。

String warnInfo;

warnInfo = "Client session timed out, have not heard from server in "

  • clientCnxnSocket.getIdleRecv()

  • "ms"

  • " for sessionid 0x"

  • Long.toHexString(sessionId);

LOG.warn(warnInfo);

throw new SessionTimeoutException(warnInfo);

}

if (state.isConnected()) {

//计算下一次ping请求的时间

int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -

((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);

//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL

if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {

sendPing(); //发送ping请求

clientCnxnSocket.updateLastSend();

} else {

if (timeToNextPing < to) {

to = timeToNextPing;

}

}

}

// If we are in read-only mode, seek for read/write server

if (state == States.CONNECTEDREADONLY) {

long now = Time.currentElapsedTime();

int idlePingRwServer = (int) (now - lastPingRwServer);

if (idlePingRwServer >= pingRwTimeout) {

lastPingRwServer = now;

idlePingRwServer = 0;

pingRwTimeout =

Math.min(2*pingRwTimeout, maxPingRwTimeout);

pingRwServer();

}

to = Math.min(to, pingRwTimeout - idlePingRwServer);

}

调用clientCnxnSocket,发起传输

其中 pendingQueue是一个用来存放已经发送、等待回应的Packet队列,

clientCnxnSocket默认使用ClientCnxnSocketNIO(ps:还记得在哪里初始化吗?在实例化zookeeper的时候)

clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);

} catch (Throwable e) {

if (closing) {

if (LOG.isDebugEnabled()) {

// closing so this is expected

LOG.debug("An exception was thrown while closing send thread for session 0x"

  • Long.toHexString(getSessionId())

  • " : " + e.getMessage());

}

break;

} else {

// this is ugly, you have a better way speak up

if (e instanceof SessionExpiredException) {

LOG.info(e.getMessage() + ", closing socket connection");

} else if (e instanceof SessionTimeoutException) {

LOG.info(e.getMessage() + RETRY_CONN_MSG);

} else if (e instanceof EndOfStreamException) {

LOG.info(e.getMessage() + RETRY_CONN_MSG);

} else if (e instanceof RWServerFoundException) {

LOG.info(e.getMessage());

} else {

LOG.warn(

"Session 0x"

  • Long.toHexString(getSessionId())

  • " for server "

  • clientCnxnSocket.getRemoteSocketAddress()

  • ", unexpected error"

  • RETRY_CONN_MSG, e);

}

// At this point, there might still be new packets appended to outgoingQueue.

// they will be handled in next connection or cleared up if closed.

cleanup();

if (state.isAlive()) {

eventThread.queueEvent(new WatchedEvent(

Event.EventType.None,

Event.KeeperState.Disconnected,

null));

}

clientCnxnSocket.updateNow();

clientCnxnSocket.updateLastSendAndHeard();

}

}

}

synchronized (state) {

// When it comes to this point, it guarantees that later queued

// packet to outgoingQueue will be notified of death.

cleanup();

}

clientCnxnSocket.close();

if (state.isAlive()) {

eventThread.queueEvent(new WatchedEvent(Event.EventType.None,

Event.KeeperState.Disconnected, null));

}

ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),

"SendThread exited loop for session: 0x"

  • Long.toHexString(getSessionId()));

}

|

client 和 server的网络交互

|

@Override

void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {

try {

if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {

return;

}

Packet head = null;

if (needSasl.get()) {

if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {

return;

}

} else {

//判断outgoingQueue是否存在待发送的数据包,不存在则直接返回

if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {

return;

}

}

// check if being waken up on closing.

if (!sendThread.getZkState().isAlive()) {

// adding back the patck to notify of failure in conLossPacket().

addBack(head);

return;

}

// channel disconnection happened

if (disconnected.get()) { //异常流程,channel关闭了,讲当前的packet添加到addBack中

addBack(head);

throw new EndOfStreamException("channel for sessionid 0x"

  • Long.toHexString(sessionId)

  • " is lost");

}

if (head != null) { //如果当前存在需要发送的数据包,则调用doWrite方法,pendingQueue表示处于已经发送过等待响应的packet队列

doWrite(pendingQueue, head, cnxn);

}

} finally {

updateNow();

}

}

|

DoWrite方法

|

private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {

updateNow();

while (true) {

if (p != WakeupPacket.getInstance()) {

if ((p.requestHeader != null) && //判断请求头以及判断当前请求类型不是ping或者auth操作

(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&

(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {

p.requestHeader.setXid(cnxn.getXid()); //设置xid,这个xid用来区分请求类型

synchronized (pendingQueue) {

pendingQueue.add(p); //将当前的packet添加到pendingQueue队列中

}

}

sendPkt(p); //将数据包发送出去

}

if (outgoingQueue.isEmpty()) {

break;

}

p = outgoingQueue.remove();

}

}

|

sendPkt

|

private void sendPkt(Packet p) {

// Assuming the packet will be sent out successfully. Because if it fails,

// the channel will close and clean up queues.

p.createBB(); //序列化请求数据

updateLastSend(); //更新最后一次发送updateLastSend

sentCount++; //更新发送次数

channel.write(ChannelBuffers.wrappedBuffer(p.bb)); //通过nio channel发送字节缓存到服务端

}

|

createBB

|

public void createBB() {

try {

ByteArrayOutputStream baos = new ByteArrayOutputStream();

BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);

boa.writeInt(-1, "len"); // We'll fill this in later

if (requestHeader != null) {

requestHeader.serialize(boa, "header"); //序列化header头(requestHeader)

}

if (request instanceof ConnectRequest) {

request.serialize(boa, "connect");

// append "am-I-allowed-to-be-readonly" flag

boa.writeBool(readOnly, "readOnly");

} else if (request != null) {

request.serialize(boa, "request"); //序列化request(request)

}

baos.close();

this.bb = ByteBuffer.wrap(baos.toByteArray());

this.bb.putInt(this.bb.capacity() - 4);

this.bb.rewind();

} catch (IOException e) {

LOG.warn("Ignoring unexpected exception", e);

}

}

|

从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。

总结

用户调用exists注册监听以后,会做几个事情

  1. 讲请求数据封装为packet,添加到outgoingQueue

  2. SendThread这个线程会执行数据发送操作,主要是将outgoingQueue队列中的数据发送到服务端

  3. 通过clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); 其中ClientCnxnSocket只zookeeper客户端和服务端的连接通信的封装,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO;具体使用哪一个类来实现发送,是在初始化过程是在实例化Zookeeper的时候设置的,代码如下

|

cnxn = new ClientCnxn(connectStringParser.getChrootPath(),

hostProvider, sessionTimeout, this, watchManager,

getClientCnxnSocket(), canBeReadOnly);

|

private ClientCnxnSocket getClientCnxnSocket() throws IOException {

String clientCnxnSocketName = getClientConfig().getProperty(

ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);

if (clientCnxnSocketName == null) {

clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();

}

try {

Constructor<?> clientCxnConstructor =

Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);

ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());

return clientCxnSocket;

} catch (Exception e) {

IOException ioe = new IOException("Couldn't instantiate "

  • clientCnxnSocketName);

ioe.initCause(e);

throw ioe;

}

}

|

  1. 基于第3步,最终会在ClientCnxnSocketNetty方法中执行sendPkt将请求的数据包发送到服务端

对Java技术,架构技术感兴趣的同学,欢迎加QQ群619881427,一起学习,相互讨论。

群内已经有小伙伴将知识体系整理好(源码,笔记,PPT,学习视频),欢迎加群免费领取。

分享给喜欢Java,喜欢编程,有梦想成为架构师的程序员们,希望能够帮助到你们。

不是Java程序员也没关系,帮忙转发给更多朋友!谢谢。

分享一个小技巧点击阅读原文也可以轻松获取到学习资料哦!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,560评论 18 399
  • 《独自叩问》的作者高全喜老师在后记中说,本书的评论文章基本上写于二十年前。当时他虽然未直接参与美术创作,但与美术界...
    大雨时行阅读 918评论 0 3
  • 作者/佚名 世界上最遥远的距离, 不是生与死 而是我就站在你面前, 你却不知道我爱你。。。 世界上最遥远的距离, ...
    相逢一笑307阅读 311评论 0 0
  • 《弗洛伊德及其后继者》读后感之二 我是谁? 这是一个只有身为人才会问的问题。因为只有人才会有自我的意识。 亚当和夏...
    DrDan阅读 1,341评论 0 2