目录
分布式锁案例
Zookeeper 是一个分布式的协调服务,它能够提供一系列的基础服务,比如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理等。其中,分布式锁是 Zookeeper 应用场景中的一个重要功能,可以用来解决分布式系统中多进程之间的互斥访问问题。
下面是一个简单的使用原生 Zookeeper 实现分布式锁的步骤概述:
1. 创建锁节点
首先,在 Zookeeper 中创建一个持久化的父节点(例如/distributed-lock
),用于保存所有的锁节点。
2. 获取锁
当一个客户端想要获取锁时,需要执行以下操作:
- 在
/distributed-lock
下创建一个临时有序节点,例如/distributed-lock/lock-0000000001
。 - 列出父节点的所有子节点,并根据序号进行排序。
- 检查创建的子节点是否是最小的一个,如果是,则获取锁成功;如果不是,则找到序号比当前节点小的最近的那个节点,并对该节点添加监听器。
3. 释放锁
当客户端不再需要锁时,只需要删除之前创建的临时节点即可释放锁。
4. 锁自动释放
由于锁节点是临时节点,所以当客户端与 Zookeeper 服务器的会话结束时,锁也会自动释放。
示例代码
这里给出一个简化版的 Java 代码示例,假设已经有一个 ZooKeeper
客户端实例 zooKeeper
:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLockExample {
private static final String LOCK_PATH = "/distributed-lock";
private static final String CLIENT_ID = "Client1";
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("Received event: " + event);
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(lockNodePath)) {
latch.countDown();
}
}
});
// 创建锁路径
Stat stat = zooKeeper.exists(LOCK_PATH, false);
if (stat == null) {
zooKeeper.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 尝试获取锁
String lockNodePath = zooKeeper.create(LOCK_PATH + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zooKeeper.getChildren(LOCK_PATH, true);
Collections.sort(children);
boolean isLocked = false;
for (String child : children) {
if (lockNodePath.endsWith(child)) {
if (children.indexOf(child) == 0) {
isLocked = true;
break;
} else {
String prevChild = children.get(children.indexOf(child) - 1);
String prevNodePath = LOCK_PATH + "/" + prevChild;
CountDownLatch latch = new CountDownLatch(1);
zooKeeper.exists(prevNodePath, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
});
latch.await();
isLocked = true;
break;
}
}
}
if (isLocked) {
System.out.println(CLIENT_ID + " acquired the lock");
// 执行需要锁定的操作
Thread.sleep(5000); // 模拟长时间运行的任务
} else {
System.out.println(CLIENT_ID + " failed to acquire the lock");
}
// 释放锁
zooKeeper.delete(lockNodePath, -1);
zooKeeper.close();
}
这个示例中,我们使用了 ZooKeeper 的客户端 API 来创建锁节点,并通过监听前一个节点的状态来等待锁的释放。
分布式锁测试
为了测试分布式锁的有效性,你需要设置多个客户端同时尝试获取锁,并观察它们的行为是否符合预期。下面是一个简单的测试方案:
测试环境准备
- 启动 Zookeeper 服务:确保有一个可用的 Zookeeper 服务器。
- 编写客户端程序:根据上一个回答中提供的示例代码,你可以编写多个客户端程序,每个程序都试图获取相同的锁。
测试步骤
- 启动多个客户端:同时启动多个客户端程序,每个客户端都尝试获取同一个锁。
- 检查锁的获取情况:观察哪些客户端获得了锁,哪些客户端没有获得锁,并且等待其他客户端释放锁。
- 检查锁的释放情况:当持有锁的客户端完成任务后,它应该释放锁,然后观察下一个等待的客户端是否能够成功获取锁。
- 重复上述过程:多次执行上述步骤,以确保分布式锁机制的稳定性和可靠性。
示例代码
基于上一个回答中的示例代码,你可以创建两个或更多的客户端来模拟并发获取锁的情况。这里是一个简单的示例,展示如何编写两个客户端程序来测试分布式锁。
Client1.java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Client1 implements Runnable {
private final ZooKeeper zooKeeper;
private final String lockPath;
public Client1(ZooKeeper zooKeeper, String lockPath) throws InterruptedException, KeeperException {
this.zooKeeper = zooKeeper;
this.lockPath = lockPath;
}
@Override
public void run() {
try {
// 创建锁路径
Stat stat = zooKeeper.exists(lockPath, false);
if (stat == null) {
zooKeeper.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 尝试获取锁
String lockNodePath = zooKeeper.create(lockPath + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zooKeeper.getChildren(lockPath, true);
Collections.sort(children);
boolean isLocked = false;
for (String child : children) {
if (lockNodePath.endsWith(child)) {
if (children.indexOf(child) == 0) {
isLocked = true;
break;
} else {
String prevChild = children.get(children.indexOf(child) - 1);
String prevNodePath = lockPath + "/" + prevChild;
CountDownLatch latch = new CountDownLatch(1);
zooKeeper.exists(prevNodePath, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
});
latch.await();
isLocked = true;
break;
}
}
}
if (isLocked) {
System.out.println("Client1 acquired the lock");
// 执行需要锁定的操作
Thread.sleep(5000); // 模拟长时间运行的任务
} else {
System.out.println("Client1 failed to acquire the lock");
}
// 释放锁
zooKeeper.delete(lockNodePath, -1);
zooKeeper.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, event -> {
System.out.println("Received event: " + event);
});
Client1 client = new Client1(zooKeeper, "/distributed-lock");
Thread thread = new Thread(client);
thread.start();
}
}
Client2.java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Client2 implements Runnable {
private final ZooKeeper zooKeeper;
private final String lockPath;
public Client2(ZooKeeper zooKeeper, String lockPath) throws InterruptedException, KeeperException {
this.zooKeeper = zooKeeper;
this.lockPath = lockPath;
}
@Override
public void run() {
try {
// 创建锁路径
Stat stat = zooKeeper.exists(lockPath, false);
if (stat == null) {
zooKeeper.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
// 尝试获取锁
String lockNodePath = zooKeeper.create(lockPath + "/lock-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zooKeeper.getChildren(lockPath, true);
Collections.sort(children);
boolean isLocked = false;
for (String child : children) {
if (lockNodePath.endsWith(child)) {
if (children.indexOf(child) == 0) {
isLocked = true;
break;
} else {
String prevChild = children.get(children.indexOf(child) - 1);
String prevNodePath = lockPath + "/" + prevChild;
CountDownLatch latch = new CountDownLatch(1);
zooKeeper.exists(prevNodePath, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
});
latch.await();
isLocked = true;
break;
}
}
}
if (isLocked) {
System.out.println("Client2 acquired the lock");
// 执行需要锁定的操作
Thread.sleep(5000); // 模拟长时间运行的任务
} else {
System.out.println("Client2 failed to acquire the lock");
}
// 释放锁
zooKeeper.delete(lockNodePath, -1);
zooKeeper.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, event -> {
System.out.println("Received event: " + event);
});
Client2 client = new Client2(zooKeeper, "/distributed-lock");
Thread thread = new Thread(client);
thread.start();
}
}
运行测试
- 启动 Zookeeper 服务器。
- 分别运行
Client1
和Client2
程序。 - 观察输出结果,确认只有一个客户端能够成功获取锁,并且在执行完任务后正确释放锁。
Curator 框架实现分布式锁案例
Curator 是一个 Apache ZooKeeper 的高级 Java 客户端库,它提供了许多简化 ZooKeeper 使用的工具类和框架。Curator 提供了一个非常方便的方式来实现分布式锁,其中包括了 InterProcessMutex 类,这是一个高级的分布式锁实现。
下面是一个使用 Curator 框架实现的分布式锁示例:
步骤 1: 添加依赖
首先,你需要在你的项目中添加 Curator 的依赖。如果你使用 Maven,可以在 pom.xml
文件中添加如下依赖:
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies
步骤 2: 创建 CuratorFramework 实例
接下来,我们需要创建一个 CuratorFramework 实例,这是 Curator 的主要入口点。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorDistributedLockExample {
private static final String CONNECT_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT_MS = 5000;
private static final int CONNECTION_TIMEOUT_MS = 3000;
private CuratorFramework client;
public CuratorDistributedLockExample() {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(CONNECT_STRING)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.retryPolicy(retryPolicy)
.namespace("locks") // 可选,指定命名空间
.build();
client.start();
}
public void close() throws Exception {
if (client != null) {
client.close();
}
}
}
步骤 3: 实现分布式锁
现在我们可以使用 Curator 提供的 InterProcessMutex
类来实现分布式锁。
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
public class CuratorDistributedLockExample extends CuratorDistributedLockExample {
private InterProcessMutex lock;
private final String lockPath = "/distributed-lock";
public CuratorDistributedLockExample() {
super();
lock = new InterProcessMutex(client, lockPath);
}
public void acquireLock() throws Exception {
System.out.println("Client is trying to acquire the lock...");
lock.acquire();
System.out.println("Client acquired the lock.");
}
public void releaseLock() throws Exception {
lock.release();
System.out.println("Client released the lock.");
}
public void doWorkWithLock() throws Exception {
acquireLock();
try {
System.out.println("Doing work with the lock...");
Thread.sleep(5000); // 模拟执行一些耗时操作
} finally {
releaseLock();
}
}
public static void main(String[] args) throws Exception {
CuratorDistributedLockExample example = new CuratorDistributedLockExample();
example.doWorkWithLock();
example.close();
}
}
说明
InterProcessMutex
:这是 Curator 提供的一个分布式锁实现。acquireLock
方法:尝试获取锁。releaseLock
方法:释放锁。doWorkWithLock
方法:在这个方法中,我们尝试获取锁,执行一些操作,然后释放锁。
运行示例
你可以运行上面的 main
方法来测试分布式锁的功能。你也可以创建多个 CuratorDistributedLockExample
实例来模拟并发获取锁的情况。
注意事项
- 确保你的 ZooKeeper 服务器正在运行。
- 如果你使用的是不同的 ZooKeeper 配置(如连接字符串),请相应地修改
CONNECT_STRING
和其他配置参数。 - 如果你想测试多个客户端同时尝试获取锁的情况,可以在不同的 JVM 或线程中启动多个
CuratorDistributedLockExample
实例。
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » 大数据技术之Zookeeper实现分布式锁(5)
发表评论 取消回复