3.ZooKeeper集群启动过程

基础

配置文件
  1. zk的配置文件中可以配置三个端口
    1. clientPort=2181 这个是配置服务端用来接收客户端连接的端口。
    2. server.1=127.0.0.1:2888:3888
      1. 配置服务器1的ip地址
      2. 2888端口为集群peer之间的通信端口,除了选举以外都用这个端口
      3. 3888端口为选举端口,只有选举的时候使用。

1. 数据加载阶段

  1. 集群版的入口在QuorumPeerMain.main():
    1. 读取并解析配置文件。
    2. 构建并启动对事务日志文件和快照文件的定期删除清理任务。清理策略一般是保留最近的3个文件。
    3. 集群版会调用runFromConfig
  2. runFromConfig():
    1. 创建QuorumPeer并赋值,QuorumPeer extends ZooKeeperThread,是一个线程任务
    2. QuorumPeer复写了Thread.start()方法,方法内部逻辑为:
      1. 加载磁盘快照和提交日志,生成或者说是恢复内存DataTree
      2. 初始化并启动接收client端连接的线程,监听clientPort。处理连接默认使用NIOServerCnxnFactory(内部使用jdk原生nio),也可以选择使用NettyServerCnxnFactory(netty版)。
      3. 这里提前说下,对于client连接,zk使用的是nio,对于集群peer之间内部连接,zk使用的jdk bio。这个用的很好,很适合各自的情况,可以想下。
      4. 启动jetty web服务器
      5. 开始leader选举
      6. 在main主线程调用quorumPeer.join(),等待quorumPeer执行完毕。

2. Leader选举开始

  1. 初始服务器状态为LOOKING,此时每个peer都会创建投票,写入自己的sid,最大zxid,currentEpoch,表示选自己做leader,投给自己。
  2. 根据选举类型创建选举算法,以前的都过时了,现在就剩下FastLeaderElection这个算法了。
  3. 创建新的QuorumCnxManager,替换旧的qcmRef,将新的qcm传入FastLeaderElection线程,启动线程,开始选举。
    1. 先说下QuorumCnxManager:
      1. 根据配置的集群地址和选举端口,每个peer都会主动去连接及群众的其他服务器,然后根据sid判断,只保留sid大的连接小的这个方向的连接,销毁掉sid小的连接大的这个方向的连接。这样每个peer都有且只有一个与其他服务器的连接,该链接为长连接,使用jdk bio实现。
      2. QuorumCnxManager就是选举连接管理器,将收到的投票数据放到QuorumCnxManager.recvQueue中。
      3. 而选举过后正常服务时,使用的是服务端口,是所有follower主动连接leader,只保留这个方向的连接,不管sid大小。
  4. FastLeaderElection线程的算法逻辑
    1. 每个peer都会把自己的投票转发给集群中的其他服务器,放到sendqueue中,有独立的发送线程发送。
    2. 独立的接收线程会不断的从QuorumCnxManager.recvQueue中拿数据,解析,转为投票的结构体,放到FastLeaderElection.recvqueue中。
    3. 只要当前peer的状态时LOOKING,就会不断的从FastLeaderElection.recvqueue中取投票数据,先判断是否有效,然后查看远程服务器的状态,如果也是LOOKING,则:
      1. 将远程服务器的投票数据和当前服务器的投票数据进行对比,返回是否应该采用远程的投票(即当前服务器投票的服务器数据较老)
        1. 远程服务器的epoch更大,则使用远程的投票。
        2. epoch一样大,远程的zxid更大,则使用远程的投票。
        3. epoch、zxid一样大,远程的serverid更大,则使用远程的投票。总之一定会比较出来到底用哪个投票的服务器,不会有模棱两可的结果。
      2. 如果使用远程服务器的投票,则清空投票箱,重新投。
    4. 如果远程服务器的状态是FOLLOWING或者LEADING:
      1. 可以确定远程服务器已经知道了leader是谁,它投过来的票记录的就是leader,我们使用这个投票的leader就可以了。
    5. 投票结束,会调用QuorumPeer.setPeerState更新当前peer的状态。

3. Leader选举结束后

Leader选举结束后的逻辑主要集中在Leader,LearnerHandler,Follower,Learner这几个类中。

  1. 选举结束后,FastLeaderElection算法逻辑中,会调用QuorumPeer.setPeerState(...)方法,将当前peer的最新角色通过synchronized同步锁,通知给QuorumPeer.run()方法里的while循环,这样当前peer就知道了自己的角色,走新的角色逻辑。
  2. 假如当前peer是Leader
    1. leader首先会启动一个独立的LearnerCnxAcceptor线程,这个线程主要逻辑就是,使用bio的ServerSocket,监听通讯端口,accept每个Follower主动发来的连接,为每一个连接创建一个LearnerHandler线程,将socket交给该线程处理。
    2. 转3.1
    3. leader会阻塞等待所有Follower发来消息,知道确认最新的epoch是多少
    4. 然后将最新的epoch发给所有follower,阻塞等待Follower对最新epoch的ack。
    5. 转3.4
    6. leader在收到Follower对epoch的ack包后,判断超过半数都接受,那么该epoch就可用。
    7. 根据这次Follower发来的消息,leader与自己的数据做比较,
      1. 如果发现某个Follower的最大zxid与自己相同,就发DIFF包,表示不需要同步。
      2. 其他情况发不同的包,如TRUNC,SNAP等,指明Follower与leader如何同步数据。
    8. 转3.7
    9. 不会有对同步数据的ack,直接再发一个NEWLEADER包给follower,包括了sid,zxid,仲裁器等信息。然后阻塞等待follower对NEWLEADER包的ack响应达到大多数。转3.8
    10. 大多数follower响应后,leader会发送UPTODATE包,表示数据已经都同步好了,集群服务的准备工作都完成了,可以对client提供服务了。转3.9.
  3. 假如当前peer是Learner中的Follower:
    1. Follower会根据选举时的投票和配置信息,重新获取leader的地址和通信端口,主动连接leader,建立与leader之间的长连接。
    2. 发送第一个包,是将自己的sid,最大zxid等信息发给leader。
    3. 转上面步骤2.3,2.4
    4. Follower在收到leader发来的最新的epoch后,与自己当前的epoch比较,
      1. 如果leader的epoch最大,就用这个epoch作为ack响应的epoch。
      2. 如果自己用的比leader的大,表示有问题,用-1作为ack响应的epoch。
    5. Follower会将sid,使用的epoch等信息作为epoch的ack包发给leader。
    6. 转2.6
    7. follower收到同步方式和需要同步的数据后,会完成数据同步,完成同步后,不会有对同步结果的ack包发给leader,就是接着等着读取leader的数据,转2.9
    8. 收到leader发来的NEWLEADER包,使用包里的仲裁器和epoch等数据作为自己的数据。发送ack包给leader。转2.10
    9. 因为在同步数据阶段,会用while循环读取leader发来的数据,使用的bio,无消息的时候,流本身就有阻塞的作用。正是使用此处流的阻塞,来保证在learner启动初始化阶段,只要没有收到leader发来的UPTODATE包,就一直循环读取或者阻塞等待,不会对外提供服务,处理client的请求。收到UPTODATE包后跳出while循环,解除阻塞。进入读取和处理client数据的while循环中。

4. 正式对client提供服务

  1. leader:
    1. leader线程会不断的循环,检测每个LearnerHandler线程是否存活,定时的ping每一个learner,判断与Learner的ping是否超时等,以此来判断某个learner是否存活,再根据这个判断是否还能够达成多数follower的有效仲裁,如果不能有效仲裁了,就退出循环,重新选举。
  2. follower:
    1. follower
  3. 处理器都是继承自Thred,本身是一个线程,说说通用的处理器流程:
    1. CommitProcessor:
      1. 对服务器来说,只会启动一个CommitProcessor线程。
      2. 在线程第一次运行时,因为queuedRequests和committedRequests都为空,所以线程阻塞wait。
      3. 然后上一个处理器调用CommitProcessor.processRequest()提交请求到queuedRequests队列中。
      4. 因为此时是第一个请求,无待commit请求,所以会调用notifyAll唤醒阻塞的所有线程。
      5. 线程唤醒后,因为queuedRequests不为空,所以跳出while循环,向下走。此时下一个while循环为true,因为没有待commit或者正在commit的请求,而且从queuedRequests拉数据不为空。
      6. 判断这个请求是不是需要被commit的请求,即写请求。
        1. 如果是读请求,更新正在处理请求的计数numRequestsProcessing+1,根据请求的sessionId对线程数组取余,拿到执行线程,封装成任务,提交给该线程,线程内操作就是将该读请求提交给下一个执行器。即调用nextProcessor.processRequest(request);一般就是走到FinalRequestProcessor,会根据路径取对应内存DataTree的node节点,将数据封装成响应,写回给客户端。回到CommitProcessor后会执行finally块,里面会将currentlyCommitting置为null,将正在处理请求的计数numRequestsProcessing-1。相当于该numRequestsProcessing最大值为1。
        2. 如果是写请求,则set为待commit请求,赋值给nextPending变量。因为有了待commit请求,所以再走while循环条件不成立,跳出while,向下走。
      7. 因为committedRequests为空,所以if条件不成立,走回while循环,继续阻塞wait。
      8. 假如后来再有一个写请求入队queuedRequests。根据上面说的逻辑,会赋值给nextPending变量。
      9. 假如leader对写的请求投票结束,确定要commit该数据,会给Follower发送COMMIT,给Observer发送INFORM命令。处理器链会调用CommitProcessor.commit,将待commit请求入队committedRequests。调用notifyAll唤醒阻塞线程。
      10. 此时因为committedRequests不为空,并且也没有正在commit的请求,所以跳出while,向下走。
      11. 从committedRequests出队该已确定要被提交的请求,set为正在commit的请求,即赋值给currentlyCommitting。然后将待commit变量置为null,即nextPending为null。
      12. 更新正在处理请求的计数numRequestsProcessing+1,根据请求的sessionId对线程数组取余,拿到执行线程,封装成任务,提交给该线程,线程内操作就是将该读请求提交给下一个执行器。即调用nextProcessor.processRequest(request);一般就是走到FinalRequestProcessor。FinalRequestProcessor会将写请求的数据,写到内存DataTree上,写成功响应给client。
      13. 回到CommitProcessor后会执行finally块,里面会将currentlyCommitting置为null,将正在处理请求的计数numRequestsProcessing-1。相当于该numRequestsProcessing最大值为1。
      14. 假如在有一个写请求处于待commit状态,且CommitProcessor线程处于wait状态,此时有一个读请求或者写请求被上一个处理器塞到了queuedRequests中,因为CommitProcessor线程阻塞等待leader的提交命令来唤醒,所以在没有leader的commit命令之前,都是阻塞的,不会从queuedRequests队列中去拿后面的读或者写请求。相当于没有写的情况下,可以同时有多个读,只要有写的情况下,只能有一个写。很类似于读写锁。
1. **FinalRequestProcessor:**
    1. 对服务器来说,只会启动一个FinalRequestProcessor线程。
    2. 根据request.hdr字段来判断,如果有值证明是写请求,如果为null,证明是读请求。如果是写请求,将结果应用在内存DataTree上,触发事件通知,返回执行结果。
    3. 如果是写请求,还会删除对应的outstandingChanges数据,将请求放到committedLog已提交列表中,方便与follower快速同步数据。
    4. 获取请求中的客户端信息,如果为null,证明请求是leader发来的commit命令等,不是直连的客户端发给自己的写请求,不需要继续执行。
    5. 如果客户端信息不为null,证明是客户端直连该follower发的写请求,根据request.type判断是什么命令,构造响应,写回给客户端。
        1. 比如是getData请求,那么会取路径对应的Node数据,检查用户的读权限,将数据和状态封装成响应写回给请求方。
        2. 如果是create请求,因为上面已经更新过内存DataTree的数据了,此时只需要返回路径和状态给请求方就可以了。

2. **SyncRequestProcessor:**
    1. 对服务器来说,只会启动一个SyncRequestProcessor线程。
    2. SyncRequestProcessor在3种不同情况下使用: 
        1. Leader - 将请求同步到磁盘,并将其转发到AckRequestProcessor,后者将ack发回给自己。 
        2. Follower - 将请求同步到磁盘,并将请求转发到SendAckRequestProcessor,后者将数据包发送到leader。 SendAckRequestProcessor是可刷新的,这使我们能够将推送数据包强制发送到leader。 
        3. Observer - 将提交的请求同步到磁盘(作为INFORM数据包接收)。 它永远不会将确认发送回给leader,因此nextProcessor将为null。 因为它只包含提交的txns,所以这改变了观察者上txnlog的语义。
    3. 该处理器的执行时机和主要作用: 
        1. 对leader来说: 
            1. 对于一个写操作,先经过预处理器,封装好要写的数据,然后提交给下一个处理器提案处理器 
            2. 提案处理器将要写的数据封装到提案请求中,发给所有follower
            3. 然后执行当前同步处理器,将这个写请求写到事务日志磁盘缓冲区,缓冲区对象LinkedList toFlush,大小一般为1000,超过该值会执行flush磁盘操作。 
        2. 对follower来说: 
            1. 接到leader发来的提案请求后,follower基本就是先执行该处理器,即将这个写请求写到事务日志磁盘缓冲区。 
        3. 对leader和follower来说,在写完一定数量的写请求后,会随机触发拍快照操作。
        4. 只要能写完事务缓存,没有异常中断,就表示赞同提案,调下一个处理器发送应答给leader。 相当于只要发了应答,就表示赞同提案,出现异常或者错误、网络问题等导致没有发应答给leader,就表示不赞同。 没有中间态度,而且只要能将事务写到磁盘,就一定要投赞同票。
  1. 处理器都是继承自Thred,不同角色服务器的处理器链:
    1. Observer:
      1. ObserverRequestProcessor
        1. client.getData请求到ObserverRequestProcessor,入队再从队列取出,调用nextProcessor.processRequest,交给下一个处理器,即CommitProcessor处理。
        2. 如果是写请求,将请求放到下一个处理器的队列后,会转发该写请求给leader。
      2. CommitProcessor
      3. FinalRequestProcessor
    2. Follower:
      1. FollowerRequestProcessor
        1. 和ObserverRequestProcessor的逻辑差不多。
      2. CommitProcessor
      3. FinalRequestProcessor
      4. 还有独立的SyncRequestProcessor -> SendAckRequestProcessor
      5. SendAckRequestProcessor:
        1. 向leader发送ACK。
    3. Leader:
      1. LeaderRequestProcessor
        1. 负责执行本地会话升级。 只有直接提交给领导者的请求才能通过此处理器。即客户端直连的请求才会走这个处理器。
      2. PrepRequestProcessor
        1. 首先是单线程,不断从LinkedBlockingQueue中拿请求数据,根据request.type判断属于客户端的哪一种请求。
        2. 如果是新增节点、修改节点等写操作,会调用zks.getNextZxid(),拿到递增的zxid(事务id)。
        3. 然后根据内存树DataTree的现有数据,来计算新的数据path名称,版本号,acl都应该是什么,封装数据,沿着处理器链向下传递。
        4. 注意这里的操作不会对内存树有影响,也就是操作结果不会在内存树生效。只是得到数据应该是什么样,比如创建的节点名称是什么等等。
      3. ProposalRequestProcessor(包含独立运行的线程SyncRequestProcessor->AckRequestProcessor)
        1. 这个处理器将请求分成两种进行处理,
          1. Follower转发过来的"sync"同步请求。
          2. 剩下的其他请求,主要是其他learner server端转过来的写请求,以及client的读请求。
        2. 先说如何处理Follower转发过来的"sync"同步请求:
          1. zk不能保证每个服务实例在每个时间都具有相同的ZooKeeper数据视图。由于网络延迟之类的因素, 一个客户端可能会在另一客户端收到更改通知之前执行更新。考虑两个客户端A和B的情况, 如果客户端A将znode/a的值从0设置为1,然后告诉客户端B读取/a,则客户端B可能读取旧值0,具体取决于连的哪个服务器。 如果客户端A和客户端B读取相同的值很重要,则客户端B应该在执行读取之前从ZooKeeper API方法中调用sync()方法。 sync是使得client当前连接着的ZooKeeper服务器,和ZooKeeper的Leader节点同步(sync)一下数据。 用法一般是同一线程串行执行,先调 zookeeper.sync("关注的路径path","可阻塞的回调对象","回调上下文对象"), 调用完后,会再调用"可阻塞的回调对象.await等阻塞方法",等待Leader和当前Follower数据同步完成,返回响应, 然后就可以调用zookeeper.getData("关注的路径path")了,可以保证在该路径的数据上,获取到和Leader一致的视图。 具体可以参考org.apache.zookeeper.cli.SyncCommand#exec()和org.apache.zookeeper.server.quorum .EphemeralNodeDeletionTest#testEphemeralNodeDeletion()。
          2. 当follower收到到客户端发来的sync请求时,会将这个请求添加到一个pendingSyncs队列里,然后将这个请求发送给leader, 直到收到leader的Leader.SYNC响应消息时,才将这个请求从pendingSyncs队列里移除,并commit这个请求。
          3. 当Leader收到一个sync请求时,如果leader当前没有待commit的决议,那么leader会立即发送一个Leader.SYNC消息给follower。 否则,leader会等到当前最后一个待commit的决议完成后,再发送Leader.SYNC消息给Follower。
          4. 其实这里面有一个隐含的逻辑,leader和follower之间的消息通信,是严格按顺序来发送的(TCP保证), 因此,当follower接收到Leader.SYNC消息时, 说明follower也一定接收到了leader之前(在leader接收到sync请求之前)发送的所有提案或者commit消息。 这样就可以确保follower内存中的数据和leader是同步的了。客户端就能从连接的follower读取到最新的数据了。
        3. 剩下的其他请求,主要是其他learner server端转过来的写请求,以及client的读请求。 交个下一个处理器执行,即CommitProcessor。
        4. AckRequestProcessor:它只是将前一个处理器的请求作为ACK转发给leader。相当于对于写数据,leader自己给自己投赞同票。
      4. CommitProcessor
      5. Leader.ToBeAppliedRequestProcessor
        1. 该请求处理器仅维护toBeApplied列表。将已确定提交,待应用到内存树的请求转发给下一个处理器执行。
      6. FinalRequestProcessor
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,056评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,842评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,938评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,296评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,292评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,413评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,824评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,493评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,686评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,502评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,553评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,281评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,820评论 3 305
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,873评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,109评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,699评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,257评论 2 341

推荐阅读更多精彩内容