Zookeeper全解析--内部工作机制-源码解析

来源: https://www.cnblogs.com/f1194361820/p/5519227.html
https://www.cnblogs.com/duanxz/p/3783266.html

zookeeper结构图

1

创建ZooKeeper对象时,应会创建一个ClientCnxn(代表了客户端连接对象)。与此同时启动了两个线程:SendThread、EventThread。两个队列:outgoingQueue和pendingQueue。

模块:

我们可以认为ZK的Client由三个主要模块组成:Zookeeper, WatcherManager, ClientCnxn

Zookeeper是ZK Client端的真正接口,用户可以操作的最主要的类,当用户创建一个Zookeeper实例以后,几乎所有的操作都被这个实例包办了,用户不用关心怎么连接到Server,Watcher什么时候被触发等等令人伤神的问题。

WatcherManager,顾名思义,它是用来管理Watcher的,Watcher是ZK的一大特色功能,允许多个Client对一个或多个ZNode进行监控,当ZNode有变化时能够通知到监控这个ZNode的各个Client。我们把一个ZK Client简单看成一个Zookeeper实例,那么这个实例内部的WatcherManager就管理了ZK Client绑定的所有Watcher。

ClientCnxn是管理所有网络IO的模块,所有和ZK Server交互的信息和数据都经过这个模块,包括给ZK Server发送Request,从ZK Server接受Response,以及从ZK Server接受Watcher Event。ClientCnxn完全管理了网络,从外部看来网络操作是透明的。

线程:

每当我们创建一个Zookeeper实例的时候,会有两个线程被创建:SendThread和EventThread。所以当我们使用ZK Client端的时候应该尽量只创建一个Zookeeper实例并反复使用。大量的创建销毁Zookeeper实例不仅会反复的创建和销毁线程,而且会在Server端创建大量的Session。

SendThread是真正处理网络IO的线程,所有通过网络发送和接受的数据包都在这个线程中处理。这个线程的主体是一个while循环:

while (zooKeeper.state.isAlive()) {
        try {
            if (sockKey == null) {
            // don’t re-establish connection if we are closing
                if (closing) {
                    break;
                }
                startConnect();
                lastSend = now;
                lastHeard = now;
            }
            … ….
            selector.select(to);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            // Everything below and until we get back to the select is
            // non blocking, so time is effectively a constant. That is
            // Why we just have to do this once, here
            now = System.currentTimeMillis();
            for (SelectionKey k : selected) {
                … …
                if (doIO()) {
                    lastHeard = now;
                }
                … …
            }
        }
        catch() {
            … …
        }
    }

这里用了java的nio功能,当selector侦测到事件发生的时候就会触发一次循环,主要的操作会在doIO()里面完成:

boolean doIO() throws InterruptedException, IOException {
        boolean packetReceived = false;
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException(“Socket is null!”);
        }
        if (sockKey.isReadable()) {
            … …
        }
         
        if (sockKey.isWritable()) {
        … …
        }

        if (outgoingQueue.isEmpty()) {
            disableWrite();
        } else {
            enableWrite();
        }
        return packetReceived;
    }

这个过程大概是这样的:

  1. 如果有数据可读,则读取数据包,如果数据包是先前发出去的Request的Response,那么这个数据包一定在Pending Queue里面。将它从Pending Queue里面移走,并将此信息添加到Waiting Event Queue 里面,如果数据包是一个Watcher Event,将此信息添加到Waiting Event Queue里面。

  2. 如果OutgoingQueue里面有数据需要发送,则发送数据包并把数据包从Outgoing Queue移至Pending Queue,意思是数据我已经发出去了,但还要等待Server端的回复,所以这个请求现在是Pending 的状态。

另外一个线程EventThread是用来处理Event的。前面提到SendThread从Server收到数据的时候会把一些信息添加到Event Thread里面,比如Finish Event和Watcher Event。EventThread就是专门用来处理这些Event的,收到Finish Event的时候会把相对应的Package置成Finish状态,这样等待结果的Client函数就能得以返回。收到Watcher Event的时候会联系WatcherManager找到相对应的Watcher,从WatcherManager里面移除这个Watcher(因为每个Watcher只会被通知一次) 并回调Watcher的process函数。所以所有Watcher的process函数是运行在EventThread里面的。

保持连接:

到目前为止应该已经大概介绍了ZK Client端的大致结构和处理流程。还剩下一个问题就是当网络出问题时ZK Client是如何处理的。其实这个过程并不复杂,大概是执行以下步骤:

  1. 网络发生故障,网络操作抛出的异常被捕获。

  2. 确认网络操作失败,清除当前与Server相关的网络资源,包括Socket等等。

  3. 在Server列表中逐个尝试链接Server。

这个过程从外界看来是透明的,外界并不会觉察到ZK Client已经悄悄地更换了一个连接的Server。

注意点, SendThread 处理流程

版本 zk3.5.5


public void run() {
....
// don't re-establish connection if we are closing
// determine whether we need to send an AuthFailed event.
// An authentication error occurred during authentication with the Zookeeper Server.
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); // 主要是这里,这里会发送
...
}

clientCnxnSocket 是一个借口, 有两个实现类ClientCnxnSocketNIO 和 ClientCnxnSocketNetty

如何选择NIO的,(注意这里默认选择NIO)
ClientCnxnSocketNIO .doTransport()

void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
            throws IOException, InterruptedException {
        selector.select(waitTimeOut);   // nio的selector 选择器选择
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            if (findSendablePacket(outgoingQueue,
                    sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }
        selected.clear();
    }

ClientCnxnSocketNetty,

ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
It's responsible for connecting to server, reading/writing network traffic and
being a layer between network data and higher level packets.
这是负责为联机阅读/写作到服务器,和网络交流数据包。

@Override
    void doTransport(int waitTimeOut,
                     List<Packet> pendingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        try {
            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                return;
            }
            Packet head = null;
            if (needSasl.get()) {
                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
            }
            // check if being waken up on closing.
            if (!sendThread.getZkState().isAlive()) {
                // adding back the packet to notify of failure in conLossPacket().
                addBack(head);
                return;
            }
            // channel disconnection happened
            if (disconnected.get()) {
                addBack(head);
                throw new EndOfStreamException("channel for sessionid 0x"
                        + Long.toHexString(sessionId)
                        + " is lost");
            }
            if (head != null) {
                doWrite(pendingQueue, head, cnxn);
            }
        } finally {
            updateNow();
        }
    }

同步调用:

同步调用,就是客户端成功发送请求后,才继续执行。例如:zk.create(path,data,acl,createMode);

这行代码会发起一个同步调用。一个线程A执行这个create时,会创建一个表示create动作的packet,放到数据发送队列outgoingQueue。之后当线程A就开始等待,直到SendThread线程从outgoingQueue队列取出该packet,并将其成功发送(已收到服务端的回应为准)。然后线程A才继续执行。

异步调用:

异步调用,就是客户端不会管请求是否发送成功,都会继续执行。例如:zk.create(path,data,acl,createMode,stringCallback);

这行代码会发起一个异步调用。一个线程A执行这个create时,会创建一个表示create动作的packet,放到数据发送队列outgoingQueue。线程A接着就去执行下一行代码了,而不会去管数据packet是否由SendThread线程发送到服务端了。

SendThread的职责:

1:创建一个长连接,用于会话保持

通过周期性的发送ping packet到当前连接的ZooKeeper服务器实例。这个过程,我们通常称为心跳。每当客户端与服务端的连接断开后,会自动重新连接到下一个服务器。如果断开的是最后与一个服务器的连接,那么会重新连接到第一个服务器。

2:使用这个长连接与服务器通信

1)发送客户调用

不断的从outgoingQueue取出packet发给服务端。当发送的是Client的同步调用的packet,则在发送packet后,立即通知客户端同步调用线程继续执行。当发送的是Client的异步调用,则会将packet发给服务端,并保存到pendingQueue。当从服务端发回响应后,生成一个packet 完成事件交给EventThread,由Event执行CallBack调用。

2)处理服务端响应

对服务端响应反序列化后,根据响应分类进行处理如下:

  • Ping的响应:不做处理

  • 认证失败的响应:创建认证失败的WatchedEvent,并将event交给EventThread处理。

  • 服务端的数据变更通知:生成相应的数据变更WatchedEvent,并将event交给EventThread处理。

  • 服务端对调用的回应:不论是同步调用还是异步调用,服务端都会给出回应。收到此类回应后,先是将watcher放到watcherManager中。然后对同步、异步做后续处理。

     如果是同步调用,则通知发起调用的线程继续处理。
     如果是异步调用,则将该packet交由EventThread来处理。例如对create、delete、exists、getData、getChildren方法调用的响应。
    

EventThread做了什么事呢?

从上述描述中,也可以看出EventThread用于对接收到的packet或者event进行处理:

  • 如果是event,则从WatcherManager中取出相应的Watcher进行处理。
  • 如果是packet,则执行相关联的AsyncCallback。

通过源码的阅读,知道在使用ZooKeeper客户端时要注意以下两点:

  • 在进行Watcher回调时,会从WatchManager取出与相关path关联的多个Watcher(此时WatchManager中就不会再有这个path相关的Watcher了),然后串行的调用这多个Watcher#process方法。所以在编程时,会根据业务的需要,有可能会反复注册Watcher。
  • 另外因为多个Watcher的调用是串行的,所以不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。

好了,对于ZK Client的介绍大概就这么多了,希望这样的介绍对于大家学习和使用Zookeeper有一些帮助。对于文章中没有介绍或者没有说清楚的地方需要进一步查看源码来解决。

PS: 若你觉得可以、还行、过得去、甚至不太差的话,可以“关注”或者“点赞”一下,就此谢过!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容