Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用、ZKClient、Curator)
2021-07-12 12:08
标签:ice fixed 网络通 out watcher receive style xid dede 准备工作: 首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入zk的依赖 输出结果: CONNECTING 输出结果: Receive watched event:WatchedEvent state:SyncConnected type:None path:null 输出结果: 123 输出结果: c1 输出结果: Child: 0, path: /zk-test, ctx: ok, children: [c1], stat: 4294967369,4294967369,1535536716381,1535536716381,0,1,0,0,3,1,4294967370 输出结果: Receive watched event:WatchedEvent state:SyncConnected type:None path:null ZKClient的优点: 1)可以递归创建。在zookeeper命令行和zookeeper的原生API里面得先创建父节点才能创建子节点 准备工作: 首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入ZKClient的依赖 创建成功: 输出结果: 123 输出结果: /zk-client的子发生变化: [] curator是连接ZK应用最广泛的工具 原因如下: 1)zk应用场景(分布式锁,Master选举等等),curator包含了这些场景。 准备工作: 首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入curator的依赖 这里介绍一种算法:Backoff退避算法 有这样一种场景,有多个请求,如果网络出现阻塞,每1分钟重试一次。 创建成功: 输出结果: event code: 0, type: CREATE 输出结果: Current data: 0 输出结果: update--current data: test123 Curator事件监听: NodeCache:节点处理监听(会使用缓存)。回调接口NodeCacheListener PathChildrenCache:子节点缓存,处理子节点变化。回调接口PathChildrenCacheListener TreeCache:NodeCache和PathChildrenCache的结合体。回调接口TreeCacheListener Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用、ZKClient、Curator) 标签:ice fixed 网络通 out watcher receive style xid dede 原文地址:https://www.cnblogs.com/leeSmall/p/9576437.html一、Zookeeper原生API如何进行调用
1. 连接zk并监听事件
package com.study.demo.zk;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
//连接zk并监听事件
public class ZKDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
public static void main(String[] args) throws IOException {
ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDemo());
System.out.println(zk.getState());
try {
cdl.await();
} catch (Exception e) {
System.out.println("ZK Session established.");
}
}
//监听到事件时进行处理
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if (KeeperState.SyncConnected == event.getState()) {
cdl.countDown();
}
}
}
Receive watched event:WatchedEvent state:SyncConnected type:None path:null2. 创建znode并监听事件
package com.study.demo.zk;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
//创建znode并监听事件
public class ZKOperateDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKOperateDemo());
cdl.await();
String path1 = zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("Success create path: " + path1);
String path2 = zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create path: " + path2);
}
//监听到事件时进行处理
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if (KeeperState.SyncConnected == event.getState()) {
cdl.countDown();
}
}
}
Success create path: /zk-test-
Success create path: /zk-test-00000000113. 改变znode数据并监听事件
package com.study.demo.zk;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
//改变znode数据并监听事件
public class ZKDataDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDataDemo());
cdl.await();
zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(new String(zk.getData("/zk-test", true, stat)));
zk.getData("/zk-test", true, stat);
System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
zk.setData("/zk-test", "123".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
//监听到事件时进行处理
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
cdl.countDown();
} else if (event.getType() == EventType.NodeDataChanged) {
try {
System.out.println(new String(zk.getData(event.getPath(), true, stat)));
System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
} catch (Exception e) {
}
}
}
}
}
4294967354, 4294967354, 0
123
4294967354, 4294967355, 14. 改变子节点并监听事件
package com.study.demo.zk;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
//改变子节点并监听事件
public class ZKChildrenDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenDemo());
cdl.await();
zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
List
Child: [c1, c2]5. 异步调用并完成回调
package com.study.demo.zk;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
//异步调用并完成回调
class ChildrenCallback implements AsyncCallback.Children2Callback {
public void processResult(int rc, String path, Object ctx, List
6. 连接后创建回调
package com.study.demo.zk;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
//连接后创建回调
class IStringCallback implements AsyncCallback.StringCallback {
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("create path result: [" + rc + ", " + path + "," + ctx + ", real path name: " + name);
}
}
public class ZKAsyncDemo implements Watcher {
private static final CountDownLatch cdl = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKAsyncDemo());
cdl.await();
zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(),
new String("I am context"));
zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), new String("I am context"));
zk.create("/zk-test-", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new IStringCallback(), new String("I am context"));
Thread.sleep(Integer.MAX_VALUE);
}
//监听到事件时进行处理
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if (KeeperState.SyncConnected == event.getState()) {
cdl.countDown();
}
}
}
create path result: [0, /zk-test-,I am context, real path name: /zk-test-
create path result: [-110, /zk-test-,I am context, real path name: null
create path result: [0, /zk-test-,I am context, real path name: /zk-test-0000000016二、ZKClient
2)可以递归删除。在zookeeper命令行和zookeeper的原生API里面得先删除子节点才能删除父节点
3)避免不存在的异常 dependency>
groupId>com.101tecgroupId>
artifactId>zkclientartifactId>
version>0.10version>
dependency>
1. ZkClient递归创建顺序节点
package com.study.demo.client;
import org.I0Itec.zkclient.ZkClient;
/**
*
* @Description: ZkClient递归创建顺序节点
* @author leeSmall
* @date 2018年9月2日
*
*/
public class CreateNodeDemo {
public static void main(String[] args) {
ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
String path = "/zk-client/c1";
// 递归创建顺序节点 true:先创建父节点/zk-client
client.createPersistent(path, true);
}
}
2. ZkClient获取数据并监听事件
package com.study.demo.client;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
/**
*
* @Description: ZkClient获取数据
* @author leeSmall
* @date 2018年9月2日
*
*/
public class GetDataDemo {
public static void main(String[] args) throws InterruptedException {
String path = "/zk-client";
ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
//创建临时节点
client.createEphemeral(path, "123");
//注册父节点数据改变的事件
client.subscribeDataChanges(path, new IZkDataListener() {
//父节点数据改变事件
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(dataPath + " changed: " + data);
}
//父节点数据删除事件
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath + " deleted");
}
});
System.out.println(client.readData(path).toString());
client.writeData(path, "456");
Thread.sleep(1000);
client.delete(path);
//sleep的目的是为了更好的观察事件变化
Thread.sleep(Integer.MAX_VALUE);
}
}
/zk-client changed: 456
/zk-client deleted3. ZkClient获取子节点数据并监听事件
package com.study.demo.client;
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
/**
*
* @Description: ZkClient获取子节点数据
* @author leeSmall
* @date 2018年9月2日
*
*/
public class GetChildrenDemo {
public static void main(String[] args) throws InterruptedException {
String path = "/zk-client";
ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
//注册子节点数据改变的事件
client.subscribeChildChanges(path, new IZkChildListener() {
//子节点数据改变事件
public void handleChildChange(String parentPath, List
[]
/zk-client的子发生变化: [c1]
/zk-client的子发生变化: []
/zk-client的子发生变化: null三、Curator
2)应用场景出现极端的情况下,curator考虑到处理了。 dependency>
groupId>org.apache.curatorgroupId>
artifactId>curator-frameworkartifactId>
version>4.0.0version>
dependency>
dependency>
groupId>org.apache.curatorgroupId>
artifactId>curator-recipesartifactId>
version>4.0.0version>
dependency>
1. curator创建连接session
package com.study.demo.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
*
* @Description: curator创建连接session
* @author leeSmall
* @date 2018年9月2日
*
*/
public class CreateSessionDemo {
public static void main(String[] args) throws InterruptedException {
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
.sessionTimeoutMs(5000).retryPolicy(policy).build();
client.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
20:25 request1(block)
20:26 request2(block)
20:27 request3(block)
当网络通顺的时候,请求都累在一起来发送
20:28 request4(通顺)request2、3、4
那么前面的请求就没有意义了,所以就有了退避算法,按照指数间隔重试,比如第一次1分钟,第二次2分钟......随着时间的推移,重试间隔越长。2. curator递归创建顺序节点
package com.study.demo.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
*
* @Description: curator递归创建顺序节点
* @author leeSmall
* @date 2018年9月2日
*
*/
public class CreateNodeDemo {
public static void main(String[] args) throws Exception {
String path = "/zk-curator/c1";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
}
}
3. curator异步创建临时节点
package com.study.demo.curator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
*
* @Description: curator异步创建临时节点
* @author leeSmall
* @date 2018年9月2日
*
*/
public class CreateNodeAsyncDemo {
static CountDownLatch cdl = new CountDownLatch(2);
static ExecutorService es = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
String path = "/zk-curator";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
//创建临时节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
//回调事件处理
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
cdl.countDown();
}
}, es).forPath(path, "test".getBytes());
//创建临时节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
cdl.countDown();
}
}).forPath(path, "test".getBytes());
cdl.await();
es.shutdown();
}
}
event code: -110, type: CREATE4. curator更新节点数据
package com.study.demo.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
*
* @Description: curator更新节点数据
* @author leeSmall
* @date 2018年9月2日
*
*/
public class UpdateDataDemo {
public static void main(String[] args) throws Exception {
String path = "/zk-curator/c1";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("Current data: " + stat.getVersion());
System.out.println("Update data: "
+ client.setData().withVersion(stat.getVersion()).forPath(path, "some".getBytes()).getVersion());
}
}
Update data: 15. curator删除节点数据
package com.study.demo.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
*
* @Description: curator删除节点数据
* @author leeSmall
* @date 2018年9月2日
*
*/
public class DeleteNodeDemo {
public static void main(String[] args) throws Exception {
String path = "/zk-curator/c1";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
}
}
6. curator事件监听
package com.study.demo.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
*
* @Description: curator事件监听
* @author leeSmall
* @date 2018年9月2日
*
*/
public class NodeCacheDemo {
public static void main(String[] args) throws Exception {
String path = "/zk-curator/nodecache";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
final NodeCache nc = new NodeCache(client, path, false);
nc.start();
//通过回调函数监听事件
nc.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("update--current data: " + new String(nc.getCurrentData().getData()));
}
});
client.setData().forPath(path, "test123".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(5000);
nc.close();
}
}
上一篇:C#-运算符(四)
文章标题:Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用、ZKClient、Curator)
文章链接:http://soscw.com/index.php/essay/104158.html