zookeeper api和zkclient api使用
2021-07-04 04:31
标签:命令行 des https amp github catch style imp f11 --原生api --是GitHub上一个开源的ZooKeeper客户端 在原生Zookeeper API 接口上进行包装 zookeeper api和zkclient api使用 标签:命令行 des https amp github catch style imp f11 原文地址:https://www.cnblogs.com/LDDXFS/p/9864486.htmlzookeeper api
CreateSession 连接zookeeper
package lddxfs.zkstudy.zkdemo.test001;
import lddxfs.zkstudy.zkdemo.Constant;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/20
*/
public class CreateSession {
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new MyWatcher());
//Zookeeper是API提供的1个类,我们连接zk集群,进行相应的znode操作,都是通过ZooKeeper的实例进行,这个实例就是zk client,和命令行客户端是同样的角色
//Zookeeper实例的创建需要传递3个参数
//connectString 代表要连接zk集群服务,通过逗号分隔
Thread.sleep(Integer.MAX_VALUE);
}
static class MyWatcher implements Watcher {
public void process(WatchedEvent event) {
// 这个方法只会调用一次,在这个session建立完成调用
if (event.getState() == Event.KeeperState.SyncConnected) {
//连接建立事件的处理
System.out.println("Event:" + event);
System.out.println("=========Client Connected to zookeeper==========");
}
}
}
}
CreateNode 创建znode
package lddxfs.zkstudy.zkdemo.test002;
import lddxfs.zkstudy.zkdemo.Constant;
import org.apache.zookeeper.*;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/20
*/
public class CreateNode implements Watcher {
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new CreateNode());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
//Client段处理连接建立事件,处理动作为添加1个永久节点
if(event.getState()==Event.KeeperState.SyncConnected){
//创建znode节点
try {
createNodeSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//create node, synchronized
private void createNodeSync() throws KeeperException, InterruptedException {
System.out.println("Create node with Sync mode");
String path =zk.create("/node_by_java","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("New Node added: "+path);
}
}
GetChildrenSync 获取子节点不注册watch
package lddxfs.zkstudy.zkdemo.test003;
import lddxfs.zkstudy.zkdemo.Constant;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/20
*/
public class GetChildrenSync implements Watcher {
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
zk=new ZooKeeper(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,new GetChildrenSync());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
//只在连接建立后,查询 /的子节点列表
if(event.getState()==Event.KeeperState.SyncConnected){
//查询子节点列表
try {
getChildranSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//get children ,synchronized
private void getChildranSync() throws KeeperException, InterruptedException {
System.out.println("Get Childran in sync mode");
//false ,不关注子节点列表的变更事件(不注册watcher)
List
GetChildrenSync 获取子节点注册watch
package lddxfs.zkstudy.zkdemo.test004;
import lddxfs.zkstudy.zkdemo.Constant;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/20
*/
public class GetChildrenSync implements Watcher {
private static ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException {
zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new GetChildrenSync());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
//子节点列表变化 even的处理
if (event.getType() == Event.EventType.NodeChildrenChanged) {
//再次获取子节点列表
try {
//event.getPath()返回 哪个znode的子节点列表发生了变化
List
DeleteNodeSync 删除znode
package lddxfs.zkstudy.zkdemo.test005;
import lddxfs.zkstudy.zkdemo.Constant;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/20
*/
public class DeleteNodeSync implements Watcher {
private static ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException {
zk=new ZooKeeper(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,new DeleteNodeSync());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if(event.getState()== Event.KeeperState.SyncConnected){
if(event.getType()== Event.EventType.None&&event.getPath()==null){
try {
deleteNode("/nodeddd");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private void deleteNode(String path) throws KeeperException, InterruptedException {
System.out.println("Delete Node in sync mode");
zk.delete(path,-1);
System.out.println("Node delete :"+path);
List
SetDataSync设置数据
package lddxfs.zkstudy.zkdemo.test006;
import lddxfs.zkstudy.zkdemo.Constant;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/20
*/
public class SetDataSync implements Watcher {
private static ZooKeeper zk;
public static void main(String[] args) throws IOException, InterruptedException {
zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new SetDataSync());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
if (event.getType() == Event.EventType.None && event.getPath() == null) {
try {
String path = zk.create("/testdata", "12345".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat = zk.setData(path, "zhonddd".getBytes(), -1);
System.out.println("");
byte[] datass = zk.getData(path, false, stat);
System.out.println(new String(datass));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
zkclient api
CreateSession 连接zookeeper
package lddxfs.zkstudy.zkclientdemo.test001;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
*/
//zkclient 是GitHub上一个开源的ZooKeeper客户端 在原生Zookeeper API 接口上进行包装
// 同时在内部实现了session超时重连 ,Watcher反复注册等功能
public class CreateSession {
public static void main(String[] args) {
ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer());
/**
* 1)和zookeeper原生API不同 通过zkclient API创建会话 需要提供session timeout,connection timeout两个定时器
* 2)同时要提供 1个序列化器实例,原因在于后续创建zonde节点时,写入的数据(java对象)会自动通过序列化器来转换为byte[]
* 3)同理 ,读取出的byte[] 的数据,也会自动通过序列化器直接转换为java对象
*/
}
}
CreateNode
package lddxfs.zkstudy.zkclientdemo.test002;
import lddxfs.zkstudy.zkclientdemo.User;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
*/
public class CreateNode {
public static void main(String[] args) {
ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer());
User user=new User();
//直接将数据user写入,自动序列化为byte[]
String path= zkClient.create("/node_zkclient",user, CreateMode.PERSISTENT);
System.out.println("Create path is:"+path);
/**
* 通过客户端查看会是这样格式的数据
* [zk: 192.168.10.132:2185(CONNECTED) 3] get /node_zkclient
* ??sr lddxfs.zkstudy.zkclientdemo.User-U??t?‘LidtLjava/lang/Integer;LnametLjava/lang/String;xppp
*/
}
}
WriteData
package lddxfs.zkstudy.zkclientdemo.test003;
import lddxfs.zkstudy.zkclientdemo.User;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
*/
public class WriteData {
private static final String PATH = "/node_zkclient2";
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.CONNECTION_TIMEOUT, new SerializableSerializer());
User user = new User(1, "zhangsan");
zkClient.create(PATH, user, CreateMode.PERSISTENT);
user.setId(2);
user.setName("lisi");
zkClient.writeData(PATH, user);
}
}
ReadData
package lddxfs.zkstudy.zkclientdemo.test004;
import lddxfs.zkstudy.zkclientdemo.User;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
*/
public class ReadData {
private static final String PATH = "/node_zkclient3";
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.SESSION_TIMEOUT, new SerializableSerializer());
User user = new User(1, "wangwu");
String path = zkClient.create(PATH, user, CreateMode.PERSISTENT);
System.out.println("Create path is :" + path);
Stat stat = new Stat();
user = zkClient.readData(PATH, stat);
if (user != null) {
System.out.println(user);
System.out.println(stat);
}
}
}
GetChildren
package lddxfs.zkstudy.zkclientdemo.test005;
import lddxfs.zkstudy.zkclientdemo.User;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.util.List;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
*/
public class GetChildren {
public static void main(String[] args) {
ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer());
List
NodeExist
package lddxfs.zkstudy.zkclientdemo.test006;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
* 判断节点是否存在
*/
public class NodeExist {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.CONNECTION_TIMEOUT, new SerializableSerializer());
boolean exist = zkClient.exists("/dubbo");
System.out.println("Node exist status is:" + exist);
//Node exist status is:true
}
}
订阅子节点列表发生变化
package lddxfs.zkstudy.zkclientdemo.test007;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
* 订阅子节点列表发生变化
*/
public class SubscribeChildren {
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer());
zkClient.subscribeChildChanges("/888",new ZkChildListener());
Thread.sleep(Integer.MAX_VALUE);
/**
* 使用Cli.sh连接zk
* [zk: 192.168.10.132:2185(CONNECTED) 6] create /888 888
* Created /888
* [zk: 192.168.10.132:2185(CONNECTED) 7] create /888/999 999
* Created /888/999
* [zk: 192.168.10.132:2185(CONNECTED) 8]
* 控制台输出
* Parent path is/888
* Current children:[]
* Parent path is/888
* Current children:[999]
*/
}
}
package lddxfs.zkstudy.zkclientdemo.test007;
import org.I0Itec.zkclient.IZkChildListener;
import java.util.List;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
*/
public class ZkChildListener implements IZkChildListener {
public void handleChildChange(String parentPath, List
订阅数据变化
package lddxfs.zkstudy.zkclientdemo.test008;
import lddxfs.zkstudy.zkdemo.Constant;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
* 订阅数据变化
*/
public class SubscribeData {
public static void main(String[] args) throws InterruptedException {
//使用了新的序列化器,zk命令行写入的数据才能被检查
ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.CONNECTION_TIMEOUT, new BytesPushThroughSerializer());
zkClient.subscribeDataChanges("/node_zkclient", new ZkDataListener());
Thread.sleep(Integer.MAX_VALUE);
/**
* [zk: 192.168.10.132:2185(CONNECTED) 10] set /node_zkclient ddd
* data change dataPath:/node_zkclient
* data change data:[B@11d2adb4
*
*/
}
}
package lddxfs.zkstudy.zkclientdemo.test008;
import org.I0Itec.zkclient.IZkDataListener;
/**
* Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/)
* Date:2018/10/21
*/
public class ZkDataListener implements IZkDataListener {
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("data change dataPath:"+dataPath);
System.out.println("data change data:"+data);
}
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("deleted data dataPath:"+dataPath);
}
}
文章标题:zookeeper api和zkclient api使用
文章链接:http://soscw.com/index.php/essay/101570.html