基于zookeeper创建分布式锁。
原理是创建 CreateMode.EPHEMERAL_SEQUENTIAL的节点。
zookeeper每次创建EPHEMERAL_SEQUENTIAL性质的节点时,都会在节点后面自动添加累加序号。这样当多个人同时去创建的时候,我们选取创建出来的节点序号时最小的那个作为主创建者,把其他人都标记为失败。
例如多个用户申请创建"/testlockpath/testlockname"的节点,生成的最终节点名:
可能是:"/testlockpath/testlockname0000000001",
也可能是:"/testlockpath/testlockname0000000002",
或者是:"/testlockpath/testlockname0000000003",
.。。
这种属性的节点还有一个特性就是如果client端断开连接后,节点会自动删除(当然如果客户端没有断开,节点是不会自动删除的)。这个属性太重要了,否则无法保证节点能够自动删除。
在下面例子中,定义了三个相关函数:
- lock(): 死锁,直到锁成功;即如果有其他人拥有锁,那么等待重试。
- tryLock(); 如果锁成功返回true,如果锁失败则返回false,不必等待。
- unlock():释放锁,对于lock()后的锁必须unlock();同样对于tryLock()成功后,也必须unlock锁;而对于tryLock()失败的场景,则不需要unlock。
package zkclient;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
public class ZKLock {
private static ZooKeeper zookeeper = null;
private static String endpoints = "localhost:2181";
private static String lockPath = "/testlockpath";
private static String lockName = "testlockname";
private static String lockFull = "";
public static void main(String args[]) {
CountDownLatch connectionLatch = new CountDownLatch(1);
try {
zookeeper = new ZooKeeper(endpoints, 2000, new Watcher() {
@Override
public void process(WatchedEvent we) {
// TODO Auto-generated method stub
if (we.getState() == KeeperState.SyncConnected) {
connectionLatch.countDown();
}
}
});
connectionLatch.await();
if (zookeeper.exists(lockPath, false) == null) {
zookeeper.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
System.out.println("locking");
if (tryLock()) {
System.out.println("lock success");
System.out.println("do_working");
Thread.sleep(10 * 1000);
System.out.println("unlock");
unlock();
System.out.println("unlock success");
} else {
System.out.println("locking failure");
}
// lock();
// System.out.println("locking success");
// Thread.sleep(10 * 1000);
// System.out.println("unlock");
// unlock();
zookeeper.close();
} catch (IOException | InterruptedException | KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void lock() throws KeeperException, InterruptedException {
lockFull = zookeeper.create(lockPath + "/" + lockName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
List<String> nodes = zookeeper.getChildren(lockPath, false);
Collections.sort(nodes);
if (lockFull.endsWith(nodes.get(0))) {
return;
}
Thread.sleep(100);
}
}
public static boolean tryLock() throws KeeperException, InterruptedException {
lockFull = zookeeper.create(lockPath + "/" + lockName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> nodes = zookeeper.getChildren(lockPath, false);
Collections.sort(nodes);
if (lockFull.endsWith(nodes.get(0))) {
return true;
}
else {
unlock();
return false;
}
}
public static void unlock() throws InterruptedException, KeeperException {
zookeeper.delete(lockFull, -1);
lockFull = null;
}
}