Zookeeper(五)环境搭建与Curator使用
一 环境搭建
1.1 单机环境搭建
- 必要环境:JDK
- 下载地址:https://zookeeper.apache.org/
- 历史版本:https://archive.apache.org/dist/zookeeper/
- 我这里是本地环境,说名一下,无脑解压一下,放在本地环境目录
- 复制配置文件一份zoo_sample未zoo
- 修改配置文件
- 启动
- 查看
1.2 可视化工具ZooKeeper Assistant
- 查看状况
1.3 集群环境搭建
- 这里我是伪集群环境搭建,注意集群环境只能是奇数
- 这里我模拟三套服务器环境,一个主节点,两个从节点,主要是配置文件的变化
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=D:\\Tools\\Zookeeper\\ServerA\\data
dataLogDir=D:\\Tools\\Zookeeper\\ServerA\\log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=localhost:2886:3886
server.2=localhost:2887:3887
server.3=localhost:2888:3888
server.A=B:C:D;其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。
- myid 建立,依次写入1,2,3,id被称为Server ID,用来标识该机器在集群中的机器序号。同时,在每台ZooKeeper机器上,我们都需要在数据目录(即dataDir参数指定的那个目录)下创建一个 myid文件,该文件只有一行内容,并且是一个数字,即对应于每台机器的Server ID数字。
- 后面的B,C一样的,一键启动脚本
@echo off
start cmd /k "cd /d D:\Tools\Zookeeper\ServerA\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerA...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerB\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerB...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerC\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerC...
echo All ZooKeeper servers have been started.
- 查看启动日志可以看到主从节点情况
二 常用命令
1.1 命令行语法
命令行语法 | 功能描述 |
---|---|
help | 显示所有操作命令 |
ls path | 使用ls命令来查看当前znode的子节点[可监听], -w 监听子节点变化, -s 附加次级信息 |
create | 普通创建, -s 含有序列, -e 临时(重启或超时消失) |
get path | 获得节点的值[可监听] -w 监听节点内容变化, -s 附加次级信息 |
set | 设置节点的具体值 |
stat | 查看节点的状态 |
delete | 删除节点 |
deleteall | 递归删除节点 |
1.2 数据节点信息
[zk: bigdata01:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
- cZxid: 创建的事务zxid 每次修改Zookeeper状态都会产生一个zookeeper事务ID, 事务ID是Zookeeper中所有修改总的次序. 每次修改都有唯一的zxid, 如果zxid1小于zxid2, 那么zxid1在zxid2之前发生.
- ctime: znode被创建的毫秒数(从1970开始)
- mzxid: znode最后更新的事务zxid
- mtime: znode最后修改的毫秒数(从1970开始)
- pZxid: znode最后更新的子节点zxid
- cversion: znode子节点变化号, znode子节点修改次数
- dataversion:znode 数据变化号
- aclVersion:znode 访问控制列表的变化号
- ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
- dataLength:znode 的数据长度
- numChildren:znode 子节点数量
1.3 节点类型
这个命令就需要自己去练了
三 CuratorAPI使用
3.1 依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
3.1 创建会话
使用CuratorFrameworkFactory这个工厂类的两个静态方法来创建一个客户端
package com.shu;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import java.util.ArrayList;
import java.util.List;
/**
* @author 31380
* @description
* @create 2024/3/16 18:39
*/
public class CuratorUtils {
/**
* 创建连接
*
* @param connectionString 连接地址
* @param sessionTimeout 会话超时时间
* @param connectionTimeout 连接超时时间
* @return
*/
public static CuratorFramework createCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout) {
return CuratorFrameworkFactory.builder()
.connectString(connectionString)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectionTimeoutMs(connectionTimeout)
.build();
}
/**
* 创建连接
*
* @param connectionString 连接地址
* @param sessionTimeout 会话超时时间
* @param connectionTimeout 连接超时时间
* @param retryPolicy 重试策略
* @return
*/
public static CuratorFramework createCuratorFrameworkWithRetry(String connectionString,
int sessionTimeout,
int connectionTimeout,
RetryPolicy retryPolicy
) {
return CuratorFrameworkFactory.builder()
.connectString(connectionString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.build();
}
/**
* 创建一个隔离的命名空间
*/
public static CuratorFramework createNamespaceCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout, String namespace) {
return CuratorFrameworkFactory.builder()
.connectString(connectionString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.namespace(namespace)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
}
/**
* ZooDefs.Perms.READ:读权限
* ZooDefs.Perms.WRITE:写权限
* ZooDefs.Perms.CREATE:创建子节点权限
* ZooDefs.Perms.DELETE:删除权限
* ZooDefs.Perms.ADMIN:管理权限
* ZooDefs.Perms.ALL:所有权限
* 以下是一些常用的身份验证方案:
* Ids.ANYONE_ID_UNSAFE:表示任何人都可以访问
* Ids.AUTH_IDS:表示使用已验证的用户身份
* Ids.OPEN_ACL_UNSAFE:表示开放的ACL,任何人都可以访问
* ACL acl = new ACL(ZooDefs.Perms.READ, new Id("myUser", "myPassword"));
* @return
*/
public static List<ACL> getAclList() {
ArrayList<ACL> acls = new ArrayList<>();
// 权限设置
ACL acl = new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);
// 添加权限
acls.add(acl);
return acls;
}
}
3.2 基本使用增删改查
- 新增
package com.shu.base;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import java.util.List;
/**
* @author 31380
* @description
* @create 2024/3/16 18:43
*/
public class CuratorCreatTest {
/**
* 总结:curator创建节点方法
* 1.创建节点,如果节点已经存在则抛出异常 create().forPath()
* 2.withMode():节点类型: CreateMode.EPHEMERAL 临时节点,CreateMode.PERSISTENT 永久节点
* 3.递归创建节点 creatingParentsIfNeeded()
* 4.查询所有子节点 getChildren().forPath()
* 5.删除节点 delete().forPath()
* 6.判断节点是否存在 checkExists().forPath()
* 7.关闭连接 close()
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
// 创建节点,如果节点已经存在则抛出异常
try {
curatorClint.create().forPath("/test");
} catch (Exception e) {
System.out.println("创建节点失败!"+e.getMessage());
}
// 删除节点
try {
curatorClint.delete().forPath("/test");
System.out.println("删除节点成功!");
} catch (Exception e) {
System.out.println("删除节点失败!"+e.getMessage());
}
/**
* 临时节点(EPHEMERAL):临时创建的,会话结束节点自动被删除,也可以手动删除,临时节点不能拥有子节点.
* 持久节点(PERSISTENT):创建后永久存在,除非主动删除。
*/
// 临时节点,当会话结束后,节点自动删除
curatorClint.create().withMode(CreateMode.EPHEMERAL)
.forPath("/secondPath", "hello,word".getBytes());
System.out.println("临时节点:"+new String(curatorClint.getData().forPath("/secondPath")));
// 永久节点
curatorClint.create().withMode(CreateMode.PERSISTENT)
.forPath("/thirdPath", "hello,word".getBytes());
System.out.println("永久节点:"+new String(curatorClint.getData().forPath("/thirdPath")));
// 递归创建节点
curatorClint.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/parent/child", "hello,word".getBytes());
System.out.println("递归创建节点:"+new String(curatorClint.getData().forPath("/parent/child")));
// 查询所有子节点
List<String> list= curatorClint.getChildren().forPath("/");
System.out.println(list);
// 关闭连接
curatorClint.close();
}
}
- 读取
package com.shu.base;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import java.util.Date;
/**
* @author 31380
* @description 读取节点数据
* @create 2024/3/17 11:28
*/
public class CuratorReadTest {
/**
* 总结:
* 1. 读取单个节点数据:curatorClint.getData().forPath("/base/test")
* 2. 读取多个节点数据:curatorClint.getChildren().forPath("/test").forEach(System.out::println)
* 3. 读取节点数据并获取 stat:curatorClint.getData().storingStatIn(stat).forPath("/base/test")
* 4:Stat:节点状态,包含节点的版本、数据长度、子节点数量、创建时间、修改时间、最近一次修改的事务 ID、数据版本、ACL 版本、临时节点
* @param args
*/
public static void main(String[] args) {
// 地址
String connectString = "127.0.0.1:2181"; // 确保连接字符串正确
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
// 读取单个节点数据
try {
byte[] bytes = curatorClint.getData().forPath("/base/test");
System.out.println(new String(bytes));
System.out.println("读取节点数据成功!");
} catch (Exception e) {
System.out.println("读取节点数据失败!"+e.getMessage());
}
// 读取多个节点数据
try {
curatorClint.getChildren().forPath("/test").forEach(System.out::println);
System.out.println("读取多个节点数据成功!");
} catch (Exception e) {
System.out.println("读取多个节点数据失败!"+e.getMessage());
}
try {
Stat stat = new Stat();
byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");
String dataString = new String(data);
System.out.println("节点数据:" + dataString);
System.out.println("节点状态:");
System.out.println(" 节点创建版本:" + stat.getCversion());
System.out.println(" 数据长度:" + stat.getDataLength());
System.out.println(" 子节点数量:" + stat.getNumChildren());
System.out.println(" 创建时间:" + new Date(stat.getCtime()));
System.out.println(" 修改时间:" + new Date(stat.getMtime()));
System.out.println(" 最近一次修改的事务 ID:" + stat.getMzxid());
System.out.println(" 数据版本:" + stat.getVersion());
System.out.println(" ACL 版本:" + stat.getAversion());
System.out.println(" 临时节点:" + stat.getEphemeralOwner());
System.out.println("读取节点数据并获取 stat 成功!");
} catch (Exception e) {
System.out.println("读取节点数据并获取 stat 失败:" + e.getMessage());
}
// 关闭连接
curatorClint.close();
}
}
- 删除
package com.shu.base;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
/**
* @author 31380
* @description
* @create 2024/3/16 19:25
*/
public class CuratorDeleteTest {
/**
* 总结:
* 1. 删除节点:delete().forPath("/test")
* 2. 如果存在子节点,删除子节点:delete().deletingChildrenIfNeeded().forPath("/parent")
* 3. 递归删除节点:delete().deletingChildrenIfNeeded().forPath("/secondPath")
* 4. 判断节点是否存在:checkExists().forPath("/secondPath")
* 5. 关闭连接:close()
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
// 删除节点
try {
curatorClint.delete().forPath("/test");
System.out.println("删除节点成功!");
} catch (Exception e) {
System.out.println("删除节点失败!"+e.getMessage());
}
// 如果存在子节点,删除子节点
try {
curatorClint.delete().deletingChildrenIfNeeded().forPath("/parent");
System.out.println("删除节点成功!");
} catch (Exception e) {
System.out.println("删除节点失败!"+e.getMessage());
}
// 递归删除节点
curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath");
// 判断节点是否存在
if (curatorClint.checkExists().forPath("/secondPath") == null) {
System.out.println("节点不存在!");
} else {
System.out.println("节点存在!");
}
// 关闭连接
curatorClint.close();
}
}
- 修改
package com.shu.base;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
/**
* @author 31380
* @description
* @create 2024/3/17 11:35
*/
public class CuratorUpdateTest {
/**
* 总计
* 1. 更新节点:setData().forPath("/test", "hello,word".getBytes())
* 2. 指定版本更新节点:setData().withVersion(1).forPath("/test", "hello,word".getBytes())
* @param args
*/
public static void main(String[] args) throws Exception {
// 地址
String connectString = "127.0.0.1:2181";
//创建节点
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
// 更新节点
try {
curatorClint.setData().forPath("/base/test", "hello,word1111".getBytes());
// 获取节点数据
byte[] bytes = curatorClint.getData().forPath("/base/test");
System.out.println(new String(bytes));
System.out.println("更新节点成功!");
} catch (Exception e) {
System.out.println("更新节点失败!"+e.getMessage());
}
// 先获取节点的版本号
Stat stat = new Stat();
byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");
String dataString = new String(data);
System.out.println("节点数据:" + dataString);
System.out.println("节点状态:");
System.out.println(" 数据版本:" + stat.getVersion());
// 指定版本更新节点:CAS 机制
try {
curatorClint.setData().withVersion(stat.getVersion()).forPath("/base/test", "hello,word2222".getBytes());
// 获取节点数据
byte[] bytes = curatorClint.getData().forPath("/base/test");
System.out.println(new String(bytes));
System.out.println("更新节点成功!");
} catch (Exception e) {
System.out.println("更新节点失败!"+e.getMessage());
}
}
}
- 异步创建
package com.shu.base;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 31380
* @description Curator异步操作
* @create 2024/3/17 11:42
*/
public class CuratorAyncTest {
/**
* 总结:
* 1 异步操作:inBackground()
* 2.创建节点,如果节点已经存在则抛出异常 create().forPath()
* 3.递归创建节点 creatingParentsIfNeeded()
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 地址
String connectString = "127.0.0.1:2181";
//创建节点
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
CountDownLatch cdl = new CountDownLatch(2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {
System.out.println("Code:" + event.getResultCode());
System.out.println("Type:" + event.getType());
System.out.println("Path:" + event.getPath());
cdl.countDown();
}, executorService).forPath("/test1", "hello,word".getBytes());
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {
System.out.println("Code:" + event.getResultCode());
System.out.println("Type:" + event.getType());
System.out.println("Path:" + event.getPath());
cdl.countDown();
}).forPath("/test2", "hello,word".getBytes());
cdl.await();
executorService.shutdown();
curatorClint.close();
}
/**
* 事件类型
* CREATE, // 创建
* DELETE, // 删除
* EXISTS, // 存在
* GET_DATA, // 获取数据
* SET_DATA, // 设置数据
* CHILDREN, // 子节点
* SYNC, // 同步
* GET_ACL, // 获取ACL
* SET_ACL, // 设置ACL
* TRANSACTION, // 事务
* GET_CONFIG, // 获取配置
* RECONFIG, // 重新配置
* WATCHED, // 监听
* REMOVE_WATCHES, // 移除监听
* CLOSING; // 关闭
* @param args
*/
/**
* 响应码
* OK(0), // OK
* CONNECTIONLOSS(-4), // 连接丢失
* MARSHALLINGERROR(-7), // 编组错误
* UNIMPLEMENTED(-9), // 未实现
* OPERATIONTIMEOUT(-10), // 操作超时
* BADARGUMENTS(-8), // 错误参数
* APIERROR(-100), // API错误
* NONODE(-101), // 无节点·
*/
}
- 不同的顺序节点
package com.shu.base;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
/**
* @author 31380
* @description
* @create 2024/3/17 11:03
*/
public class CuratorSEQCreat {
/**
* 临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会有序列号。
* 持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会有序列号。
* @param args
*/
public static void main(String[] args) {
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
// 创建一个持久顺序节点A-1,A-2,A-3
try {
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());
System.out.println("创建节点成功!");
} catch (Exception e) {
System.out.println("创建节点失败!"+e.getMessage());
}
// 创建一个临时顺序节点B-1,B-2,B-3
try {
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());
curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());
System.out.println("创建节点成功!");
} catch (Exception e) {
System.out.println("创建节点失败!"+e.getMessage());
}
// 关闭连接
curatorClint.close();
}
}
- 事务
package com.shu.base;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import java.util.Collection;
/**
* @author 31380
* @description TODO
* @create 2024/3/16 19:28
*/
public class CuratorTransactionTest {
public static void main(String[] args) throws Exception {
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
Collection<CuratorTransactionResult> commit = curatorClint.inTransaction()
.create().forPath("/xiao", "456".getBytes()).and()
.setData().forPath("/xiao", "123".getBytes()).and()
.commit();
for (CuratorTransactionResult result : commit) {
System.out.println(result.getForPath() + "--->" + result.getType());
}
curatorClint.close();
}
}
3.3 ACL权限控制
package com.shu.acl;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
/**
* @author 31380
* @description
* @create 2024/3/16 19:56
*/
public class CuratorAclTest {
public static void main(String[] args) {
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
// 创建节点,ACL为ip:
try {
curatorClint.create().withACL(CuratorUtils.getAclList()).forPath("/test");
System.out.println("创建节点成功!");
} catch (Exception e) {
System.out.println("创建节点失败!"+e.getMessage());
}
}
}
/**
* @description
* @author 31380
* @create 2024/3/17 11:12
* Schema 代表权限控制模式,分别为:
* ● World 任何人
* ● Auth 不需要ID
* ● Digest 用户名和密码方式的认证
* ● IP Address IP地址方式的认证
* perms(权限),ZooKeeper支持如下权限
* ● CREATE: 创建子节点
* ● READ: 获取子节点与自身节点的数据信息
* ● WRITE:在Znode节点上写数据
* ● DELETE:删除子节点
* ● ADMIN:设置ACL权限
* ————————————————
*/
package com.shu.acl;
3.4 分布式锁
package com.shu.lock;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.text.SimpleDateFormat;
import java.util.concurrent.CountDownLatch;
/**
* @author 31380
* @description 分布式锁
* @create 2024/3/17 13:12
*/
public class LockTest {
/**
* 分布式锁:InterProcessMutex
* 1: 获取锁,acquire()
* 2: 释放锁,release()
* 3: 创建 InterProcessMutex 对象
* 4: 调用 acquire() 方法获取锁
* 5: 业务操作
* 6: 调用 release() 方法释放锁
* @param args
*/
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(10);
String connectString = "127.0.0.1:2181";
String lockPath = "/lock";
CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);
curatorFramework.start();
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
latch.await();
lock.acquire();
System.out.println(Thread.currentThread().getName() + "获取到锁");
// 模拟业务操作,生成订单号
Thread.sleep(1000);
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(System.currentTimeMillis());
System.out.println("生成的订单号:" + orderNo);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "Thread-" + i).start();
latch.countDown();
}
}
}
3.5 分布式计数器
package com.shu.lock;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
/**
* @author 31380
* @description 分布式计数器
* @create 2024/3/17 13:20
*/
public class RecipeDisAtomicIntTest {
/**
* 分布式计数器:DistributedAtomicInteger
* 1、创建DistributedAtomicInteger对象
* 2、调用add方法
* 3、获取当前值
*
* @param args
*/
public static void main(String[] args) {
String connectString = "127.0.0.1:2181";
String connectString2 = "127.0.0.1:2182";
String connectString3 = "127.0.0.1:2183";
CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);
curatorFramework.start();
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(curatorFramework, "/atomic", null);
try {
AtomicValue<Integer> added = atomicInteger.add(1);
System.out.println("1Result: " + added.succeeded());
// 获取当前值
System.out.println("2Result: " + added.postValue());
} catch (Exception e) {
throw new RuntimeException(e);
}
// 客户端2
CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);
curatorFramework2.start();
DistributedAtomicInteger atomicInteger2 = new DistributedAtomicInteger(curatorFramework2, "/atomic", null);
try {
AtomicValue<Integer> added = atomicInteger2.add(1);
System.out.println("2Result: " + added.succeeded());
// 获取当前值
System.out.println("2Result: " + added.postValue());
} catch (Exception e) {
throw new RuntimeException(e);
}
// 客户端3
CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);
curatorFramework3.start();
DistributedAtomicInteger atomicInteger3 = new DistributedAtomicInteger(curatorFramework3, "/atomic", null);
try {
AtomicValue<Integer> added = atomicInteger3.add(1);
System.out.println("3Result: " + added.succeeded());
// 获取当前值
System.out.println("3Result: " + added.postValue());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
3.6 分布式Barrier
package com.shu.lock;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
/**
* @author 31380
* @description
* 分布式Barrier:分布式 Barrier 是一种常见的同步原语,用于在分布式系统中协调多个进程或线程的执行顺序。
* 它可以用来实现诸如等待直到所有参与者都准备好,然后一起执行某项任务,或者等待直到某些条件达成后再继续执行的场景。
* @create 2024/3/17 13:27
*/
public class CycliBarrierTest {
static DistributedBarrier barrier;
public static void main(String[] args) {
String connectString = "127.0.0.1:2181";
String path="/barrier";
CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);
curatorFramework.start();
// 等待所有的线程到达barrier 10个线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
barrier = new DistributedBarrier(curatorFramework, path);
System.out.println(Thread.currentThread().getName() + "号barrier设置");
barrier.setBarrier();
barrier.waitOnBarrier();
System.out.println("启动...");
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
try {
Thread.sleep(2000);
barrier.removeBarrier();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.7 主从节点选举
package com.shu.master;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
/**
* @author 31380
* @description 主节点选举
* @create 2024/3/17 13:03
*/
public class MasterSelectTest {
/**
* 主节点选举:LeaderSelector
* 1、创建LeaderSelector对象
* 2、调用start方法
* 3、添加监听器
* 4、关闭连接
* @param args
*/
public static void main(String[] args) {
// 地址
String connectString = "127.0.0.1:2181";
String connectString2 = "127.0.0.1:2182";
String connectString3 = "127.0.0.1:2183";
// 创建并连接 CuratorFramework 实例
CuratorFramework curatorFramework1 = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);
CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);
CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);
curatorFramework1.start();
curatorFramework2.start();
curatorFramework3.start();
// 第一个节点
LeaderSelector leaderSelector1 = new LeaderSelector(curatorFramework1, "/master1", new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("节点1成为master节点");
Thread.sleep(10000);
System.out.println("节点1完成master操作,释放master权利");
}
});
leaderSelector1.autoRequeue();
leaderSelector1.start();
// 第二个节点
LeaderSelector leaderSelector2 = new LeaderSelector(curatorFramework2, "/master1", new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("节点2成为master节点");
Thread.sleep(10000);
System.out.println("节点2完成master操作,释放master权利");
}
});
leaderSelector2.autoRequeue();
leaderSelector2.start();
// 第三个节点
LeaderSelector leaderSelector3 = new LeaderSelector(curatorFramework3, "/master1", new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("节点3成为master节点");
Thread.sleep(10000);
System.out.println("节点3完成master操作,释放master权利");
}
});
leaderSelector3.autoRequeue();
leaderSelector3.start();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.8 NodeCache监听
package com.shu.watch;
import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
/**
* @author 31380
* @description
* @create 2024/3/16 19:38
*/
public class CuratorNodeCacheTest {
/**
* NodeCache:监听节点的新增、修改操作
* 1、创建NodeCache对象
* 2、调用start方法
* 3、添加监听器
* 4、关闭连接
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
String path = "/test";
CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);
System.out.println("连接成功!");
curatorClint.start();
final NodeCache nodeCache = new NodeCache(curatorClint, path);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("监听事件触发");
System.out.println("重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));
}
});
curatorClint.setData().forPath(path,"456".getBytes());
curatorClint.setData().forPath(path,"789".getBytes());
curatorClint.setData().forPath(path,"123".getBytes());
curatorClint.setData().forPath(path,"222".getBytes());
curatorClint.setData().forPath(path,"333".getBytes());
curatorClint.setData().forPath(path,"444".getBytes());
Thread.sleep(15000);
}
}
3.9 PathChildrenCache监听
package com.shu.watch;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
* @author 31380
* @description
* @create 2024/3/16 19:43
*/
public class CuratorPathChildrenCacheTest {
/**
* PathChildrenCache:监听子节点的新增、修改、删除操作
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorFramework client = getClient();
String parentPath = "/p1";
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,parentPath,false);
/* * StartMode:初始化方式
* POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件
* NORMAL:异步初始化
* BUILD_INITIAL_CACHE:同步初始化
* */
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("事件类型:" + event.getType() + ";操作节点:" + event.getData().getPath());
switch(event.getType()){
case CHILD_ADDED:
System.out.println("新增子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("更新子节点:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
default:
break;
}
}
});
String path = "/p1/c1";
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000); // 此处需留意,如果没有现成睡眠则无法触发监听事件
client.delete().forPath(path);
Thread.sleep(15000);
}
private static CuratorFramework getClient(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(3000)
.namespace("demo")
.build();
client.start();
return client;
}
}
3.10 TreeCache监听
package com.shu.watch;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
* @author 31380
* @description
* @create 2024/3/16 19:44
*/
public class CuratorWatcher3 {
private static final String CONNECT_ADDR = "127.0.0.1:2181";
private static final int SESSION_TIMEOUT = 5000;
public static void main(String[] args) throws Exception {
RetryPolicy policy = new ExponentialBackoffRetry(1000, 10);
CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(policy).build();
curator.start();
TreeCache treeCache = new TreeCache(curator, "/treeCache");
treeCache.start();
treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
switch (treeCacheEvent.getType()) {
case NODE_ADDED:
System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
+ ",状态:" + treeCacheEvent.getData().getStat());
break;
case NODE_UPDATED:
System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
+ ",状态:" + treeCacheEvent.getData().getStat());
break;
case NODE_REMOVED:
System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
+ ",状态:" + treeCacheEvent.getData().getStat());
break;
default:
break;
}
});
curator.create().forPath("/treeCache", "123".getBytes());
curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());
curator.setData().forPath("/treeCache", "789".getBytes());
curator.setData().forPath("/treeCache/c1", "910".getBytes());
curator.delete().forPath("/treeCache/c1");
curator.delete().forPath("/treeCache");
Thread.sleep(5000);
curator.close();
}
}
详细介绍参考书籍《从Paxos到Zookeeper:分布式一致性原理与实践》