五、Zookeeper基于API操作Node节点
2021-03-03 09:26
标签:http 最新版 nod value time 开启 down version max 安装zookeeper :linux下安装Zookeeper 3.4.14 zookeeper 分为5个包: org.apache.zookeeper //客户端主要类文件 org.apache.zookeeper.data // org.apache.zookeeper.server org.apache.zookeeper.server.quorum org.apache.zookeeper.server.upgrade 引入依赖: 注意:ZooKeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中处于“ CONNECTING的状态。当该会话真正创建完毕后 ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话 节点介绍: /** 五、Zookeeper基于API操作Node节点 标签:http 最新版 nod value time 开启 down version max 原文地址:https://www.cnblogs.com/aGboke/p/12991755.html
建立会话
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateSession implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
/*
建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException {
/*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
countDownLatch.await();
System.out.println("客户端与服务端会话真正建立了");
}
/*
回调方法:处理来自服务器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
countDownLatch.countDown();
}
}
}
创建节点
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何人
* AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
* OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateNote implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/*
建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\
Thread.sleep(Integer.MAX_VALUE);
}
/*
回调方法:处理来自服务器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 创建节点
try {
createNoteSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
创建节点的方法
*/
private static void createNoteSync() throws KeeperException, InterruptedException {
/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何人
* AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
* OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/
// 持久节点
String note_persistent = zooKeeper.create("/lg-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 临时节点
String note_ephemeral = zooKeeper.create("/lg-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 持久顺序节点
String note_persistent_sequential = zooKeeper.create("/lg-persistent_sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建的持久节点" + note_persistent);
System.out.println("创建的临时节点" + note_ephemeral);
System.out.println("创建的持久顺序节点" + note_persistent_sequential);
}
}
读取节点:
public class GetNoteData implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/*
建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\
Thread.sleep(Integer.MAX_VALUE);
}
/*
回调方法:处理来自服务器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
/*
子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知
要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听
*/
if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
List
删除节点
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class DeleteNote implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/*
建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\
Thread.sleep(Integer.MAX_VALUE);
}
/*
回调方法:处理来自服务器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 删除节点
try {
deleteNoteSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
删除节点的方法
*/
private void deleteNoteSync() throws KeeperException, InterruptedException {
/*
zooKeeper.exists(path,watch) :判断节点是否存在
zookeeper.delete(path,version) : 删除节点
*/
Stat stat = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(stat == null ? "该节点不存在":"该节点存在");
if(stat != null){
zooKeeper.delete("/lg-persistent/c1",-1);
}
Stat stat2 = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(stat2 == null ? "该节点不存在":"该节点存在");
}
}
更新节点
/*
更新数据节点内容的方法
*/
private void updateNoteSync() throws KeeperException, InterruptedException {
/*
path:路径
data:要修改的内容 byte[]
version:为-1,表示对最新版本的数据进行修改
zooKeeper.setData(path, data,version);
*/
byte[] data = zooKeeper.getData("/lg-persistent", false, null);
System.out.println("修改前的值:" + new String(data));
//修改/lg-persistent 的数据 stat: 状态信息对象
Stat stat = zooKeeper.setData("/lg-persistent", "客户端修改了节点数据".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/lg-persistent", false, null);
System.out.println("修改后的值:" + new String(data2));
}