Zookeeper客户端Curator基本API
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。
Curator包含了几个包:
- curator-framework:对zookeeper的底层api的一些封装
- curator-client:提供一些客户端的操作,例如重试策略等
- curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
Curator的基本Api
创建会话
- 使用静态工厂方法创建客户端
一个例子如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
"localhost:2181",
5000,
3000,
retryPolicy);
newClient静态工厂方法包含四个主要参数:
参数名 | 说明 |
---|---|
connectionString | 服务器列表,格式host1:port1,host2:port2,... |
retryPolicy | 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口 |
sessionTimeoutMs | 会话超时时间,单位毫秒,默认60000ms |
connectionTimeoutMs | 连接创建超时时间,单位毫秒,默认60000ms |
- 使用Fluent风格的Api创建会话
核心参数变为流式设置,一个列子如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
- 创建包含隔离命名空间的会话
为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
启动客户端
当创建会话成功,得到client的实例然后可以直接调用其start( )方法:
client.start();
数据节点操作
创建数据节点
Zookeeper的节点创建模式:
- PERSISTENT:持久化
- PERSISTENT_SEQUENTIAL:持久化并且带序列号
- EPHEMERAL:临时
- EPHEMERAL_SEQUENTIAL:临时并且带序列号
创建一个节点,初始内容为空:
client.create().forPath("path");
注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空
创建一个节点,附带初始化内容:
client.create().forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),内容为空:
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
创建一个节点,指定创建模式(临时节点),附带初始化内容:
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点:
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点。
删除数据节点
删除一个节点
client.delete().forPath("path");
注意,此方法只能删除叶子节点,否则会抛出异常。
删除一个节点,并且递归删除其所有的子节点:
client.delete().deletingChildrenIfNeeded().forPath("path");
删除一个节点,强制指定版本进行删除:
client.delete().withVersion(10086).forPath("path");
删除一个节点,强制保证删除:
client.delete().guaranteed().forPath("path");
guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
注意:上面的多个流式接口是可以自由组合的,例如:
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
读取数据节点数据
读取一个节点的数据内容
client.getData().forPath("path");
注意,此方法返的返回值是byte[ ];
读取一个节点的数据内容,同时获取到该节点的stat:
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");
更新数据节点数据
更新一个节点的数据内容:
client.setData().forPath("path","data".getBytes());
注意:该接口会返回一个Stat实例
更新一个节点的数据内容,强制指定版本进行更新:
client.setData().withVersion(10086).forPath("path","data".getBytes());
检查节点是否存在
client.checkExists().forPath("path");
注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操作。
获取某个节点的所有子节点路径
client.getChildren().forPath("path");
注意:该方法的返回值为List<String>
,获得ZNode的子节点Path列表。
事务
CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
异步接口
上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应码和节点的详细信息。
CuratorEventType
事件类型 | 对应CuratorFramework实例的方法 |
---|---|
CREATE | #create() |
DELETE | #delete() |
EXISTS | #checkExists() |
GET_DATA | #getData() |
SET_DATA | #setData() |
CHILDREN | #getChildren() |
SYNC | #sync(String,Object) |
GET_ACL | #getACL() |
SET_ACL | #setACL() |
WATCHED | #Watcher(Watcher) |
CLOSING | #close() |
响应码(#getResultCode())
响应码 | 意义 |
---|---|
0 | OK,即调用成功 |
-4 | ConnectionLoss,即客户端与服务端断开连接 |
-110 | NodeExists,即节点已经存在 |
-112 | SessionExpired,即会话过期 |
一个异步创建节点的例子如下:
client.create().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(curatorEvent.getType() + " " + curatorEvent.getResultCode());
}
}, Executors.newFixedThreadPool(2))
.forPath("/mbm", "123".getBytes());
注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。