前言
本次专题我们要探讨的内容是使用 zookeeper 实现自己的分布式应用程序,相信大家也都了解过 zookeeper,比如我们使用的分布式框架——Dubbo,就是用 zookeeper 实现的注册中心,再比如 Hadoop 用它来实现集群中的高可用,当然,包括耳熟能详的 Kafka 用它来实现生产者的负载均衡等等,而在我们现有的技术体系中,可能对 zookeeper 的了解估计就这么多了,因为我们只是习惯了开源框架的集成,只是知道它可以实现这些功能,而屏蔽了想继续了解它的好奇心,假如我们把 zookeeper 的一些特性加入到我们现有的系统中,用它也能解决了分布式系统中的一些问题,那该多酷啊?好,接下来,我们就一步一步的来探讨。
Zookeeper 简介
我们先看官网上对它的介绍:
A Distributed Coordination Service for Distributed Applications.
ZooKeeper is a distributed, open-source coordination service for distributed applications. It exposes a simple set of primitives that distributed applications can build upon to implement higher level services for synchronization, configuration maintenance, and groups and naming. It is designed to be easy to program to, and uses a data model styled after the familiar directory tree structure of file systems. It runs in Java and has bindings for both Java and C.
Coordination services are notoriously hard to get right. They are especially prone to errors such as race conditions and deadlock. The motivation behind ZooKeeper is to relieve distributed applications the responsibility of implementing coordination services from scratch.
译文:
ZooKeeper 是一个为分布式应用提供的分布式、开源的协调服务。它公开了一组简单的原语,分布式应用程序可以根据这些原语来实现用于同步、配置维护以及组和命名的高级服务。它被设计为易于编程,并使用了文件系统中常见的目录树结构样式的数据模型。它在 Java 中运行,并且有针对 Java 和 C 的绑定。 协调服务是出了名的难搞。它们特别容易出现竞态条件和死锁等错误。
相信大家看了官网上的介绍,也有了对它的一个简单认识,我们摘出来比较重要的词句,比如同步、配置维护,命名,这些词是什么意思?意思是说在分布式系统中,我们可以用它实现同步的功能,也可以实现配置信息的维护,和命名服务,然后就是它的数据呈现格式是目录树,就像一个个的树状结构一样,当然我们先对它有一个大致了解,一下部分我们会详细探讨它的核心知识点与代码实例。
Zookeeper 的核心理念
要理解 zookeeper 的核心,记住三个词就可以了,一致、有头、数据树,什么意思?我们逐个做分析。
解决分布式系统数据一致性问题
zookeeper 集群中,所有的机器数据都会保持一致,从客户端来看,这些集群所有的数据都是一致的,从其中一台删除或修改一条数据,其他剩余机器的数据也会被删除或修改。
有头
只要是集群的机器有一半以上的能运行,永远都会有一个 leader,所有客户端的连接和数据操作都会先连接到 leader 那里(写和更新数据),然后 leader 再去转发到其他的 follower 机器。
数据树
zookeeper 存储的数据结构类型,就像一颗树,每一个节点都会绑定一个数据(data),所以叫做数据树。
角色分析
在一个完整的 zookeeper 集中,永远都会有 n 个 follower,也就是跟随者,永远都会有一个 leader,也就是领导者。
集群的搭建步骤与核心理念的验证
我们的安装环境是 linux,因为 zookeeper 的数量最好是奇数倍,所以我们要准备 3 台 linux 环境的虚拟机,分别部署 zookeeper。
1. 下载地址:
2. 下载完毕后在其中一台虚拟机中通过 tar -xvf
解压到 /usr/local 目录中。
3. 然后把解压后的 zookeeper 文件价分别 copy 到其他两台机器的相同目录中。
4. 进入 /usr/local/apache-zookeeper-3.5.5-bin/conf 通过 cp zoo_sample.cfg zoo.cfg
命令复制 zoo_sample.cfg 并修改文件名称为 zoo.cfg 此文件就是为 zookeeper 的配置文件。
5. 通过 vim zoo.cfg
命令编辑此文件,会看到如下内容,我们简单分析一下:
-
tickTime=2000
:是一个计时单位,两秒钟一个计时单位。 -
initLimit=10
:启动的时候 leader 与 foolwer 之间连接最大的心跳数,有多少个计时单位,10 个就是 10 乘以 2000,也就是 20 秒,如果超过了这个时间则连接失败,如果在设定的时间段内,半数以上的跟随者未能完成同步,领导者便会宣布放弃领导地位,进行另一次的领导选举。如果 zk 集群环境数量确实很大,同步数据的时间会变长,因此这种情况下可以适当调大该参数。默认为 10。 -
syncLimit=5
:follower 服务器与 leader 服务器之间 请求和应答之间能容忍的最多心跳数,有多少个计时单位,默认为 5*2=10 秒。 -
dataDir=/usr/local/zookeeperData
:zookeeper 在硬盘上存储数据和命名 zk 服务器 id 的地方。 -
clientport=2181
:客户端端口是 2181,也就是程序连接 zk 向外暴漏的端口是 2181。
集群配置:
server.1=192.168.56.102:2888:3888
server.2=192.168.56.103:2888:3888
server.3=192.168.56.104:2888:3888
代表的意思:
- Server.1 对应的是 0 号机,对应服务器的 id 就是 1
- Server.2 对应的是 1 号机,对应服务器的 id 就是 2
- Server.3 对应的是 2 号机,对应服务器的 id 就是 3
192.168.56.101:2888:3888 的含义是:
- 192.168.56.102 服务器的 ip 地址
- 2888 代表 zk 服务器集群之间的通信端口号
- 3888 代表 zk 集群之间进行 leader 选举的端口号
-
:wq
保存退出,到这一步 zookeeper 配置信息已经配置完毕,然后把 zoo.cfg 分别 copy 到其他的两台机器中。
7. 然后更改各个虚拟机的 myid:目录对应 zoo.cfg 中的 dataDir 目录,然后在 3 台虚拟机的 /usr/local/zookeeperData 目录下面分别创建 myid 文件,然后在 3 台机器中 分别执行 vim myid
命令,然后在第 1 台(192.168.56.102)虚拟机中设置为 1,第 2 台(192.168.56.103)设置为 2,第 3 台(192.168.56.104)设置为 3,然后存盘退出。
8. 分别进入 3 台虚拟机的 /usr/local/apache-zookeeper-3.5.5-bin/bin 目录,然后执行 ./zkServer.sh start
命令进行启动 zookeeper 服务,然后执行 ./zkServer.sh status
查看 zookeeper 启动的状态,如果信息如下则启动成功:
仔细观察 mode 的值,第一台、第二台服务器的 mode 值分别为 follower 代表是跟随者机器,第三台 mode 的值为 leader,代表是领导者机器,正应了我们上面提到的,zookeeper 集群中,只有一台是 leader 服务器,其他全部为 follower 服务器。
操作验证
我们验证一下它的数据一致性问题:
进入到其中一台 zookeeper 服务的 bin 目录,执行 ./zkCl.sh
命令进入到客户端操作,然后执行 create /myZkCom 'myZkCom'
创建一个 myZkCom 节点,值也为 myZkCom,然后 在其他两台 zookeeper 机器中进入客户端,分别执行 get /myZkCom
执行结果如下:
三个 zookeeper 服务器都有 myZkCom 节点,说明是在 zookeeper 集群中,所有的数据都是保持一致的,接下来我们来探讨它的节点类型和常用命令。
节点类型与常用命令
zookeeper 的节点类型分为持久化节点、持久化顺序节点、 临时节点和临时顺序节点这四大类,我们分别来看一下它们的含义。
持久化节点
节点创建后会被持久化,客户端与 zookeeper 断开连接后,该节点依旧存在,只有主动调用 delete 方法的时候才可以删除它,创建该类型节点的命令为 create /myNode " "
,一旦创建完,就持久化到 zookeeper 磁盘上上面了,哪怕是 zookeeper 服务重启也会存在。
持久化顺序节点
除了该节点是持久化之外,节点名称还进行了顺序编号,它的执行命令为 create -s /jin
创建节点结果是 /jin0000000062,如果再次执行 create -s /jin 那么节点名称为 /jin0000000063。
临时节点
节点创建后在创建者超时连接或失去连接的时候,节点会被删除,操作命令为create -e /hao
。
临时顺序节点
除了该节点是临时节点之外,它的节点名称也进行了顺序编号,节点创建的命令为 create -s -e /hao
,四个命令只是 zookeeper 的一部分,我们继续来看一下其他常用的命令。
常用命令操作
我们接下来探讨它常用的一些命令,建议小伙伴们 进入客户端操作一下这些命令,废话不多说,我们直接进入正题。
查看所有目录:
ls /
查看 service10 的下面节点:
ls /service10
查看 service11 节点的数据:
get /service11
创建 merryyou 节点,节点的内容为 merryyou:
create /merryyou merryyou
获得 merryyou 节点内容:
get /merryyou
创建临时节点 ( 断开重连之后,临时节点自动消失):
create -e /merryyou/temp merryyou
根据版本号更新 dataVersion 乐观锁,再次执行 set /merryyou test-merryyou 1
命令时,会出现 version No is not valid : /merryyou
:
set /merryyou test-merryyou 1 set
删除节点:
delete /merryyou/sec000000000
ACL 权限控制
zookeeper 的节点有 5 种操作权限:CREATE、READ、WRITE、DELETE、ADMIN
也就是 增、删、改、查、管理权限,这 5 种权限简写为 crwda(即:每个单词的首字符缩写)。
注:这 5 种权限中,delete 是指对子节点的删除权限,其它 4 种权限指对自身节点的操作权限。
身份的认证有 4 种方式:
- world:默认方式,相当于全世界都能访问
- auth:代表已经认证通过的用户(cli 中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户)
- digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
- ip:使用 Ip 地址认证
- 使用[scheme:id:permissions]来表示 acl 权限
getAcl:获取某个节点的 acl 权限信息
#获取节点权限信息默认为 world:cdrwa 任何人都可以访问
getAcl /merryyou
#设置节点权限 crwa 不允许删除
setAcl /merryyou world:anyone:crwa
#设置节点的权限信息为 rda
setAcl /merryyou world:anyone:rda
这些命令在网上也能找的到,当然大家也不用刻意去背,忘记了知道在哪查就可以了。
读写流程
我们列举了很多命令操作,它的读写流程什么样的呢,比如执行了 create /myTree ""
创建 myTree 节点的命令,在 zookeeper 集群中是怎么操作的呢?我先画一个流程图,然后再进行详细分析。
流程图讲解
我们简单分析一下创建节点的流程图:
1. client 向 Zookeeper 的 server1 发送一个写请求,客户端写数据到 server1 上。
2. 如果 server1 不是 Leader,那么 server1 会把接收到的写请求转发给 Leader;然后 Leader 会将写请求转发给每一个 server。
3. server1 和 server2 负责写数据,并且两个 follower 的写入数据是一致的,保存相同的数据副本。
- server1 和 server2 写数据成功后,通知 Leader;
- 当 leader 收到集群半数以上的节点写成功的消息后,说明该写操作执行成功。
温馨提示:这里是 3 台服务器,只要 2 台 Follower 服务器写成功就 ok。因为 client 访问的是 server1,所以 leader 会告知 server1 集群中数据写成功。
4. 被访问的 server1 进一步通知 client 数据写成功,这时,客户端就知道整个写操作成功了。
5. 如果是读操作,就很简单,访问哪一个 server,哪一个 server 直接就会给客户端返回结果。
ZAB 协议
ZAB 协议是为 ZooKeeper 专门设计的一种支持崩溃恢复的一致性协议。基于该协议,ZooKeeper 实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性,ZAB 协议分为两种模式:
- 崩溃恢复模式:当服务启动或 leader 服务器崩溃退出与重启,会进入崩溃恢复模式,然后选举 leader 服务器,当 leader 被选举出来后,且集群中有过半的机器完成与 leader 服务器的状态同步,就会退出恢复模式。
- 消息广播模式:当集群中有过半的 follower 完成 与 leader 的状态同步,就进入消息广播模式。当有新的 server 加入到 zookeeper 服务中,会以恢复模式启动,找到 leader 服务器,完成状态同步,然后一起参与到消息广播模式。
leader 节点挂掉之后会发生什么
我们直接还是做实验,实验如下步骤:
我们通过./zkServer.sh stop 关闭 leader 节点。
-
然后在其他的两个节点分别执行 ./zkServer.sh status 命令 查看其状态信息,操作结果如下:
请看,leader 服务关闭之后,第二台 zookeeper 自动升级为了 leader,第一台 zookeeper 则是为 follower,有小伙伴可能有疑问了,为什么不是第一台是 leader 而是第二台成为了 leader?大家可以把这个问题先放一放,到下面我们再继续探讨,我们接下来学习用 java 程序怎么使用 zookeeper。
第一个 Zookeeper 程序
好,废话不多说,我们直接上代码:
首先我们引入 zookeeper 的 maven 依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
HelloZk.java
package com.zk.run;
import com.common.ZkConnect;
public class HelloZk {
public static void main(String[] args) throws Exception {
ZkConnect connection =new ZkConnect();
connection.connect();//连接 zookeeper 服务
connection.createPersistentNode("/HelloMyZKxxo","888");//创建一个路径名 HelloMyZKxxo 的持久化节点
String zkData = connection.getData("/HelloMyZKxxo");//获取 HelloMyZKxxo 节点数据
System.out.println(zkData);
connection.deleteNode("/HelloMyZKxxo");//删除 HelloMyZKxxo
}
}
我们创建了一个 HelloZk 的运行主类,实例化一个 ZkConnect 对象,该对象有一个连接 zookeeper 的 connect 方法,然后通过调用 connection.createPersistentNode()方法,创建一个持久化节点,然后通过调用 connection.getData()方法获取该节点的数据,最后删除该节点,这是一个非常简单的创建节点、获取节点内容和删除节点的操作,接下来我们重点来看一下 ZkConnect 类:
package com.common;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
/**
* @Classname ZkConnect
* @Description TODO
* @Date 2019/8/26 11:32
* @Created by youDaily
*/
public class ZkConnect implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static final String ADDRESS = "192.168.56.102:2181,192.168.56.103:2181,192.168.56.104:2181";
private ZooKeeper zooKeeper;
@Override
public void process(WatchedEvent event) {
System.out.println("receive the event:" + event);
if (Event.KeeperState.SyncConnected == event.getState()) {
countDownLatch.countDown();
}
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void connect(){
try {
zooKeeper = new ZooKeeper(ADDRESS, 5000, this);
countDownLatch.await();
System.out.println("已连接!");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建持久化节点
* @param path
* @param data
* @return
* @throws Exception
*/
public String createPersistentNode(String path,String data) throws Exception{
return this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
*创建临时节点
* @param path
* @param data
* @return
* @throws Exception
*/
public String createEphemeralNode(String path,String data) throws Exception{
return this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
/**
* 获取节点数据
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getData(String path) throws KeeperException, InterruptedException {
byte [] b = this.zooKeeper.getData(path, new Watcher(){
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted){
System.out.println("节点路径:"+event.getPath()+"已被删除……");
}
}
},null);
return new String(b);
}
/**
* 删除节点
* @param path
* @throws KeeperException
* @throws InterruptedException
*/
public void deleteNode(String path) throws KeeperException, InterruptedException {
this.zooKeeper.delete(path,-1);
}
}
此类是操作 zookeeper 的核心类,我们具体来分析一下这些代码的含义:
首先该类实现了 Watch 接口,此接口是 zookeeper指定事件处理程序类必须实现的公共接口,也就是说当客户端连接 zookeeper 服务、增删改查节点时,通过向客户端注册 Watcher, 然后以回调的方式触发 Watcher 实现类的 process 方法。
我们实例化了一个 countDownLatch 门闩属性,因为连接 zookeeper 是一个异步操作,所以用它来堵塞客户端对 zookeeper 的连接操作,当连接成功后调用 countDownLatch.countDown()释放当前线程的堵塞。
然后我们声明了连接 zookeeper 集群的 ip 地址,因为是集群所有是三个 ip 地址,并通过逗号隔开。声明一个 zooKeeper 属性,用它来创建、删除节点等操作。
接下来我们看 connect() 方法,此方法的作用是连接 zookeeper 服务的,通过实例化 zookeeper 对象,然后把 ip 地址传进去,再把 watch,也就是当前的对象传进去,意思是说连接 zookeeper 时,注册了 watch,一旦连接成功或者失败会触发 watch 接口实现类的 process 方法,因为 ZkConnect 本身就是一个 Wacher,所以直接传 this,上面也提到过,连接 zookeeper 服务本身是一个异步操作,所以我们采用 countDownLatch 进行堵塞,当连接成功时会触发 process 方法,当此方法的参数 WatchedEvent 为 Event.KeeperState.SyncConnected 已连接时,则执行 countDownLatch.countDown();
取消堵塞。
我们接下来看 createPersistentNode 方法,此方法是用来创建持久化节点的,我们重点看创建节点的方法:
zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
此方法第一个参数是节点的路径名称,第二个参数是节点数据,第三个参数是创建节点的权限,ZooDefs.Ids.OPEN_ACL_UNSAFE 代表如何连接的客户端都可以操作该节点,最后一个参数是节点的类型,我们通过 CreateMode.PERSISTENT 属性设置为持久化节点,当然也可以设置其他的三种数据类型。
然后我们来看 createEphemeralNode 方法,此方法和 createPersistentNode 类似,唯一的区别是此节点是临时节点,客户端一旦失去连接后,节点则自动删除。
getData 方法的作用为获取节点数据,此方法只有一个 path 参数,也就是说通过路径就可以获取数据,并且在此方法上面注册了 watcher,当获取该节点的数据发生变化时,则会触发 watch 实现类的 process 回调函数,我们为了简单测试,只是判断了当删除该节点的时候打印一句话。
deleteNode 方法的作用顾名思义是用来删除节点的,第二个参数是版本号,当为-1 时则不使用版本号删除。
测试
我们运行 HelloZk 类,看看是否可以达到所预期的结果:
ok,没有问题,当节点删除的时候,也进入到了 watch 类中的回调函数,相信大家都已经掌握了这个小例子,接下来我们将进入项目实战,请大家该倒水的倒水,该上厕所的上厕所,因为重点要来了!
Zookeeper 实战——编写自己的分布式应用程序
我们将探讨三个实例做为 zookeeper 的进阶学习,相信大家理解了这三个例子后更能体会到 zookeeper 的强大,并能在真实的项目中,实现自己的分布式应用程序,好,废话不多说,我们一起来开始探讨。
实现注册中心
注册中心在分布式通信系统的作用极其的重要,比如 Dubbo、SpringCloud 都有自己的注册中心,在没有注册中心的时候,consumer 获取 provider 的服务信息(ip 地址和端口号)时,要不就是写死在 consumer 端,要不就是保存在数据库中,这样做会有三个弊端:
- 第一,假如 provider 端的 ip 地址或者端口号改变了,consumer 端也需要开发人员手动修改,很麻烦。
- 第二,如果 provider 的服务非常的多,对于 consumer 端来说,服务信息会变得非常不好管理,如果 provider 服务出现问题,consumer 端会无感知,导致 provider 那边出现问题了也不知道。
- 第三,如果 provider 增加服务器,如果想通过负载均衡访问 provider 集群,就需要增加 nginx 才可以,如果 provider 服务非常的多,需要把所有的服务列表写在 nginx 中,如果 provider 端的 ip 地址和端口号有改动,又得修改 nginx 配置文件,同样很麻烦,那么 zookeeper 是怎么解决以上问题的?我们就根据下面的流程图一步一步的分析。
发布与订阅服务流程图
上面的流程图是注册中心的第一大步,provider 发布服务和 consumer 订阅服务,总共是四个步骤,我们来分析一下他们发布与订阅过程:
- 首先 provider1 和 provider2 会连接 zookeeper 服务器集群。
- 第二步,它们分别把自己的 ip 地址和端口号在/myServer/1.0.1 节点下创建子节点,分别是/myServer/1.0.1/127.0.0.1:8080 和/myServer/1.0.1/127.0.0.1:8081。
- 第三步,consumer 开始连接 zookeeper 集群。
- 第四步,consumer 获取 /myServer/1.0.1 节点下的所有子节点。
- 最后一步,返回给所有的服务器列表给 consumer 端,并缓存到 consumer 本地。
这就是发布与订阅的过程,有小伙伴可能存在疑问,节点 /myServer/1.0.1 是什么意思?大概解释一下,首先 myServer 是 provider 发布服务的实例名称,比如订单服务提供方与支付服务提供方的实例名是不一样的,不同的实例名会区分不同的服务提供者,1.0.1 是服务提供方的版本号,有版本号的好处是可以令 consumer 端有更多的服务选择,比如 provider 端有服务更新,需要 consumer 端修改一些代码,但是 consumer 端还不能立刻完成,那么有了版本号就可以让 consumer 调用老服务,如果代码更新了,再根据版最新的版本号调用新服务即可。
服务提供方增加服务器的流程图
随着用户的增多,并发量也一直在增加,provider 端避免不了增加服务器以此来分担来自 consumer 访问的压力,我还是沿着发布与订阅的结果之上用流程图进行分析。
- 一开始 consumer 端一直在监听/myServer/1.0.1 下的子节点。
- 当服务提供方新增一个 provider3 节点时,该节点会往 zookeeper 集群中的 /myServer/1.0.1 目录下 创建一个新的子节点 /myServer/1.0.1/127.0.0.1:8082,然后 zookeeper 会及时的通知 consumer 端(consumer 也有可能是一个集群)/myServer/1.0.1 下的子节点产生了变化,然后 consumer 拿到最新的服务列表,最后更新本地缓存的服务信息,这是服务提供方新增或者减少机器时对 consumer 端的感知流程,我们一开始也提到过,provider 服务器的增加是负载并发压力的,那么它是如何进行负载的呢?我们继续往下聊。
负载均衡流程图
从注册中心获取 provider 的服务信息一般是多条,也就是说 provider 是一个集群,那么就可以通过负载均衡保持服务提供方的稳定性,我们还是以一个流程图分析:
我们简单来分析一下 consumer 与 provider 端之间是怎么实现负载均衡的:
- 沿着上一个流程分析,consumer 端会从注册中心获取 provider 的服务列表并缓存到本地。
- 然后 consumer 端可以通过服务列表实现负载均衡算法,比如随机获取一个 provider 的 ip 和端口号或者挨着排顺序的轮询访问 provider 服务。
- 负载均衡算法是在 consumer 端实现的,要实现什么样的负载均衡算法是 consumer 端说了算。
关于注册中心的流程图已经分析完了,但是我们还是要结合应用程序再更深一层的探讨。
代码实现服务注册与发现
终于到了我们的代码环节了,废话不多说,我们直接上注册中心的代码:
Provider1.java
package com.provider;
import com.registCenterCommon.RegistCenterProvider;
import java.net.Socket;
import java.net.ServerSocket;
/**
* 服务提供方
*/
public class Provider1 {
public static boolean isRunning = true;
/**
* 服务名称
*/
private static String serviceName = "myServer";
/**
* 端口号
*/
private static int port = 8080;
/**
* ip
*/
private static String ip = "127.0.0.1";
/**
* 版本
*/
private static String version = "1.0.1";
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(port);
//把当前的服务信息注册到注册中心中
RegistCenterProvider registCenterProvider = new RegistCenterProvider(ip,version,port,serviceName);
registCenterProvider.register();
while(isRunning){
Socket socket = serverSocket.accept();
System.out.println("当前连接的服务版本、ip 和端口号为:/"+version+"/"+ip+":"+port);
}
serverSocket.close();
}
}
此类是用来模拟服务提供方,专门为 consumer 端提供服务,当然还有 Provider2、Provider3,因为除了端口号不一样之外,其他的代码都一样,所以 Provider2 和 Provider3 的代码就不一一的贴了,这三个类一起启动,用来模拟服务提供方的一个集群,然后我们分析一下 Provider1 的代码:
- 首先定义了四个属性,服务名称、ip、port 和版本号,这四个属性代表 Provider 的服务信息,他们的作用就不再赘述了,相信大家都能想的到。
- 在 main 函数中,通过实例化一个 ServerSocket 对象提供网络服务,监听的端口号为 8080,ip 地址为 127.0.0.1。
- 然后重点来了,我们实例化一个 RegistCenterProvider 对象代表 Provider 的注册中心,并把四个属性传进去,然后通过调用该对象的 register()方法实现注册功能,只要调用了该方法就能把 provider 端的服务名称、ip、port 和版本号注册到了 zookeeper 服务器中去,RegistCenterProvider 类我们稍后会详细分析,现在只需要知道它是服务提供者的注册中心就行了。
- 注册完毕后,在 while 循环中一直接收 consumer 的网络连接,因为我们只是模拟 provider 服务,所以只是实现接收 consumer 端的连接,并且打印当前连接的服务信息,没有做其他的读写操作,ok,provider 的代码就这么简单,具体的步骤就是启动的时候把对外提供服务的服务名称 ip、port 和版本号注册到注册中心里面去,然后通过 ServerSocekt 一直对外提供网络连接服务,如果有 consumer 连接到了当前的网络,则打印服务信息,然后我们接下来分析 RegistCenterProvider 类,看它如何实现注册中心功能的。
RegistCenterProvider.java
package com.registCenterCommon;
import com.common.ZkConnect;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 服务提供端注册中心
*/
public class RegistCenterProvider {
/**
* ip 地址
*/
private String ip;
/**
* 端口号
*/
private String version;
/**
* 端口号
*/
private int port;
/**
* 服务名称
*/
private String serviceName;
public RegistCenterProvider(String ip, String version, int port, String serviceName) {
this.ip = ip;
this.version = version;
this.port = port;
this.serviceName = serviceName;
}
/**
* 服务提供方注册
* @throws Exception
*/
public void register() throws Exception {
ZkConnect zk = new ZkConnect();
zk.connect();
ZooKeeper zooKeeper = zk.getZooKeeper();
if(zooKeeper.exists("/"+serviceName,false)==null){
zk.createPersistentNode("/"+serviceName,"");
}
Stat stat = zooKeeper.exists("/"+serviceName+"/"+version,false);
if(stat == null){
zk.createPersistentNode("/"+serviceName+"/"+version,"");
}
zk.createEphemeralNode("/"+serviceName+"/"+version+"/"+ip+":"+port,"");
System.out.println("服务提供方注册成功,注册信息为:/"+serviceName+"/"+version+"/"+ip+":"+port);
}
}
RegistCenterProvider 类实现了服务提供者的注册中心功能,我们来具体分析一下:
四个属性不用再赘述。
我们具体分析一下 register()方法,首先通过实例化 ZkConnect 对象,然后通过调用它的 connect() 方法连接到 zookeeper 集群,并通过获取 getZooKeeper() 方法拿到 ZooKeeper 对象。
接着 通过 zooKeeper.exists("/"+serviceName,false) 判断当前的节点是否存在,此方法有两个参数,第一个是路径名称,第二个为是否监听此路径,把当前的 serviceName 传进去,并设置不监听此节点,如果不存在,则调用 createPersistentNode() 方法创建以服务名称为根路径的持久化节点,为什么是持久化节点 ?首先因为它下面要有子节点,如果为临时节点,则当创建子节点的时候会失败,然后就是它读的频率要很高,而写的频率会很低。
然后会接着判断 /serviceMame/version 在 zookeeper 中是否存在,如果不存在,则一样创建持久化节点。
最后一步,根据 /serviceName/version/ip:port 路径创建临时节点,为什么是临时节点?因为当前的服务可能会宕机,如果宕机就需要删除该节点,而临时节点正好提供了这个特性,客户端与 zookeeper 服务端断开连接之后,/serviceName/version/ip:port 节点会被删除,当然删除之后,也会给 consumer 端通知此节点已被删除,然后我们来看一下 consumer 端的程序。
Consumer.java
package com.consumer;
import java.net.Socket;
import com.registCenterCommon.Connect;
import com.registCenterCommon.RegistCenterConsumer;
import java.util.List;
import java.util.Random;
/**
* @Classname Consumer
*/
public class Consumer {
private static String serviceName = "myServer";
private static String version = "1.0.1";
public static void main(String[] args) throws Exception {
RegistCenterConsumer registCenterConsumer = new RegistCenterConsumer(serviceName,version);
List<Connect> services = registCenterConsumer.pullServiceList();
for(int i = 0;i<20;i++){
int randomIndex = new Random().nextInt(services.size());
Connect connect = services.get(randomIndex);
Socket socket = new Socket(connect.getIp(),connect.getPort());
System.out.println(connect+"连接成功!!!");
}
Thread.sleep(40000);
System.out.println("重新访问…………………………");
System.out.println("最新列表信息为:"+services);
for(int i = 0;i<20;i++){
int randomIndex = new Random().nextInt(services.size());
Connect connect = services.get(randomIndex);
Socket socket = new Socket(connect.getIp(),connect.getPort());
System.out.println(connect+"连接成功!!!");
}
}
}
我们来具体分析一下 Consumer 端的代码:
- serviceName、version 两个属性想必大家也不会陌生,不再赘述。
- 实例化 RegistCenterConsumer 类 并把 serviceName 和 version 属性传进去,RegistCenterConsumer 就是服务调用方连接注册中心的核心类,我们稍后会详细讲解这个类的具体实现。
- 接着调用 RegistCenterConsumer 对象的 pullServiceList() 方法,此方法是获取 provider 端已经注册到注册中心的服务列表数据的泛型是一个 Connect 类,此类是我们自己封装的连接类,主要是封装 ip 和端口号的,只要获取了服务列表,我们就能通过实现自己的负载均衡访问 Provider 端的服务了。
- 然后我们写一个了循环进行测试,并随机获取服务列表中的 Connect 对象,然后通过实例化 Socekt 对象远程连接 Provider 服务,而为了测试当 Provider 增加和删除服务时,访问的服务连接是否也会跟着改变,所以我们在下面又进行了一次循环,和第一个循环一模一样,接下来我们看一下 RegistCenterConsumer 类的代码。
RegistCenterConsumer.java
package com.registCenterCommon;
import com.common.ZkConnect;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class RegistCenterConsumer {
/**
* 端口号
*/
private String version;
/**
* 服务名称
*/
private String serviceName;
/**
* zk 对象
*/
private ZooKeeper zooKeeper;
/**
* 服务列表容器
*/
private final List<Connect> serviceList = new CopyOnWriteArrayList();
public RegistCenterConsumer(String serviceName,String version){
this.serviceName = serviceName;
this.version = version;
}
/**
* 服务提供方注册
* @throws Exception
*/
public List<Connect> pullServiceList() throws Exception {
ZkConnect zk = new ZkConnect();
zk.connect();
zooKeeper = zk.getZooKeeper();
List<String> serverList = this.getServerList("/"+serviceName+"/"+version);
serviceList.addAll(this.getConnectByString(serverList));
return serviceList;
}
/**
* 根据服务列表获取连接对象列表
* @param list
* @return
*/
private List<Connect> getConnectByString(List<String> list){
List<Connect> connectList = new ArrayList<>();
for(String str : list){
String ip = str.substring(0,str.indexOf(":"));
String port = str.substring(str.indexOf(":")+1,str.length());
connectList.add(new Connect(ip,Integer.parseInt(port)));
}
return connectList;
}
/**
* 功能描述: <br>
* 〈获取集群的服务列表〉
* @Param: [path]
* @Return: java.util.List<java.lang.String>
*/
private List<String> getServerList(String path) {
try {
return zooKeeper.getChildren(path, new serverListWatch());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
class serverListWatch implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
System.out.println("服务列表节点数据产生变化~~~~~~");
serviceList.clear();
serviceList.addAll(getConnectByString(getServerList(watchedEvent.getPath())));
System.out.println("最新服务器列表:"+serviceList);
}
}
}
}
RegistCenterConsumer 类的作用是实现服务调用方注册中心的,废话不多说,咱们直接进入分析阶段:
- 首先我们声明了四个属性,version、serviceName 和 zooKeeper 不用多说,第四个属性 serviceList 是存放 服务信息列表的,因为会涉及到线程读写安全问题,所以采用了 CopyOnWriteArrayList。
- 然后我们看一下 pullServiceList() 方法,此方法相信大家还记得,它是获取服务信息的主方法,方法的内部,第一步还是同样的先连接 zookeeper 服务,然后获取 zookeeper 对象,并把该对象赋给当前类的 zooKeeper 属性,这样我们操作 zookeeper 节点就方便很多了。
- 接着通过调用当前类的 getServerList("/"+serviceName+"/"+version); 方法获取服务列表,此方法我们稍后会进一步详细分析,该方法返回值是 List<String> 所以需要给它转换成 List<Connect>,那么调用 getConnectByString 方法就能实现,getConnectByString 方法很简单就是把 List<String> 循环然后拆分 String,并且重新封装 Connet 对象添加到 List 容器中,最后把结果值全部加到 serviceList 属性中并返回。
- 我们重点看一下 getServerList 方法,此方法直接返回了 zooKeeper.getChildren(path, new serverListWatch());通过 zookeeper 获取 /serviceName/version 目录下面的所有子节点,并在此基础上注册了一个监听类 serverListWatch,注册 Watch 的目的是当 /serviceName/version 下的子节点如果发生改变时,则会回调我们的客户端程序,然后更新服务列表数据。
- 我们看一下 serverListWatch 内部类,此类实现了 Watcher 接口,并实现 process 方法,什么时候会进入此方法呢?只要 /serviceName/version 的子节点发生改变时就会执行它,在方法内部,我们加了一个判断,当子节点发生改变时才会进入到处理逻辑中,如果成立,则调用 serviceList.clear();方法 清空服务列表,然后以同样的方式 serviceList.addAll(getConnectByString(getServerList(watchedEvent.getPath()))); 把最新的服务列表添加到 serviceList 容器中,这里面有一个细节就是 监听/serviceName/version 子节点 Watcher 需要重新注册,这样才能当子节点发生改变时重新回调 serverListWatch 类中 process 方法。
- 这就是注册中心的所有程序代码了,当然还有一个 Connet 类,主要封装 ip 和 port 属性的,此类非常简单所以没有占用篇幅,接下来我们做一下测试。
测试
测试流程如下:
- 保证 zookeeper 集群可用。
- 先启动 Provider1、Provider2、Provider3。
- 然后启动 Consumer。
运行结果如下:
Provider1
Provider2
Provider3
Consumer
大家请看,Consumer 端启动之后可以随机的访问 Provider 三个服务,从而达到了负载均衡的效果,当把 Provider1 服务停掉之后,会发生什么呢?我们也做一下实验,结果如下:
我们把 Provider1 服务停掉之后,Consumer 端就会有了一个获取最新服务列表的回调,然后下一次再进行访问 Provider 服务集群的时候,Provider1 将不会被访问的到,这就是 zookeeper 实现注册中心的整个内容,当然此功能还需要大量的优化和完善的点,由于篇幅有限只能和大家一起分享注册中心的核心内容,感兴趣的小伙伴可以进一步优化。
接下来我们来探讨另一个实战案例——用 zookeeper 实现 HA 架构。
实现 HA(高可用),让你的服务器永不宕机
zookeeper 还有一个特别重要的场景,就是对分布式系统实现 HA,比如 Hadoop 就是采用 zookeeper 来实现高可用的,而在我们的分布式应用程序中,该怎么实现 HA 呢?接下来我们就一步一步的解密它吧。