Zookeeper基础教程(四):C#连接使用Zookeeper
2021-03-04 03:25
标签:状态改变 top item omr 节点 color 真实项目 await digest Zookeeper作为分布式的服务框架,虽然是java写的,但是强大的C#也可以连接使用。 C#要连接使用Zookeeper,需要借助第三方插件,而现在主要有两个插件可供使用,分别是ZooKeeperNetEx和Zookeeper.Net ZooKeeperNetEx是从java改过来的,因此里面的一些习惯是java风格的,但是好像有人在提供更新维护,支持最新的Zookeeper特性,而且摆脱了对.net framework的依赖,所以个人推荐使用ZooKeeperNetEx做开发,本文也已介绍ZooKeeperNetEx为主 新建一个控制台项目,在nuget中搜索ZooKeeperNetEx,并安装最新版 在Program的Main方法: 运行后显示结果: 这个简单的例子是使用ZooKeeperNetEx操作的简单例子,下面具体介绍 ZooKeeperNetEx连接Zookeeper只需要实例化ZooKeeper对象即可 实例化过程中至少需要三个参数 连接字符串(connectstring):host:port形式,多个地址之间使用英文逗号隔开 会话超时时间(sessionTimeout):当会话中,Zookeeper超过此时间未响应,则表示会话超时 监听器(watcher):这个连接过程中可以注册一个监听器,当连接过程中出现状态改变时,会通知到监听器 ZooKeeper对象实例化过程中会异步的去连接Zookeeper,所以例子中才有一个while循环来判断状态 而Zookeeper的连接状态有6种: 当应用连接到Zookeeper时,一般都是读取数据,所以主需要只读连接就可以满足的,不过具体还是要看需求。 当在指定的会话时间内未成功连接时,则会导致连接超时,因为这个过程是异步的,所以需要一个监听器来接收。 监听器其实是org.apache.zookeeper.Watcher的一个子类,这个需要开发者去继承实现它的process方法,比如上面的例子中我们就简单的实现 这里仅仅只是简单的输出节点路径、监听事件响应状态和监听事件类型 为什么要有监听器?监听器就类似一个回调,当发生某个事件时,我们的应用可能需要进行相应的处理,如当连接断开时,由于监听器的存在,我们可以让我们的应用程序重新与Zookeeper建立连接。 ZooKeeperNetEx创建znode节点使用的是createAsync异步方法,传入4个参数,分别是 节点路径(path)::创建的节点路径 节点数据(data):节点数据,它是一个字节数组,可以通过编码将字符串转化为字符数组 ACL权限(acl):ACL权限,可以使用已定义好的,也可以使用自定义,如: 自定义方式如: 节点类型(createMode):节点类型有4种,分别是CreateMode类的4个静态字段 createAsync异步方法会返回实际创建的znode路径,貌似没什么用-_-!! 上面这个是ZooKeeperNetEx创建znode节点的方法,而对znode的其他操作的参数就很简单了,这里就不在重述,需要具体操作才能理解,一个简单的介绍如下: 可以比较一下上一节介绍的zkCli对znode节点的操作就很容易理解了。 另外,需要注意的是,existsAsync方法、getDataAsync方法和getChildrenAsync方法可以在指定的znode注册一个监听器,setDataAsync方法却没有这个注册功能,这个是因为Zookeeper注册的监听器只会响应一次,当需要再次响应时,需要重新注册,这时就可以调用existsAsync方法或者getDataAsync方法或者getChildrenAsync方法进行重新注册了! 上一节说到ACL权限不仅可以在创建是给予,在创建后也可以修改,ZookeeperNetEx操作znode的ACL权限使用的方法如下: 说到ACL,自然就会认证存在,ZookeeperNetEx添加认证使用的是addAuthInfo方法 其中scheme就是我们上一节介绍的那几种: auth是认证数据,如果没有则可以是空的字节数组,如: ZookeeperNetEx关闭会话使用的是closeAsync方法,调用这个方法之后,当前连接对象ZooKeeper就不能再访问了 其他常用方法就不介绍了,一般时候基本上也用不上。 简单封装 真实项目中,我们连接Zookeeper多数只是为了创建znode节点,读取数据等等操作,一般不会去设置ACL等权限,甚至连认证都可能不会用到,为了更好使用ZookeeperNetEx,我做了一层简单的封装,用以满足常见的CRUD操作,同时也让它更符合我们.net开发的一些习惯。
Zookeeper.Net好像是是Apache官方提供的,但是5年没更新了,也就是说他依赖于.net framework,因此无法在.net core项目中使用using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AspNetCore.ZookeeperConsole
{
class Program
{
static void Main(string[] args)
{
//Zookeeper连接字符串,采用host:port格式,多个地址之间使用逗号(,)隔开
string connectionString = "192.168.209.133:2181,192.168.209.133:2181,192.168.209.133:2181";
//会话超时时间,单位毫秒
int sessionTimeOut = 10000;
//异步监听
var watcher = new MyWatcher("ConnectWatcher");
//连接
ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeOut, watcher);
Thread.Sleep(1000);//停一秒,等待连接完成
while (zooKeeper.getState() == ZooKeeper.States.CONNECTING)
{
Console.WriteLine("等待连接完成...");
Thread.Sleep(1000);
}
var state = zooKeeper.getState();
if (state != ZooKeeper.States.CONNECTED && state != ZooKeeper.States.CONNECTEDREADONLY)
{
Console.WriteLine("连接失败:" + state);
Console.ReadKey();
return;
}
//创建znode节点
{
var data = Encoding.UTF8.GetBytes("hello world");
List acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;//创建节点时的acl权限,也可以使用下面的自定义权限
//List acl = new List() {
// new ACL((int)ZooDefs.Perms.READ, new Id("ip", "127.0.0.1")),
// new ACL((int)(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE), new Id("auth", "id:pass"))
//};
CreateMode createMode = CreateMode.PERSISTENT;
zooKeeper.createAsync("/mynode", data, acl, createMode).Wait();
Console.WriteLine("完成创建节点");
}
//节点是否存在
{
var exists = zooKeeper.existsAsync("/mynode", new MyWatcher("ExistsWatcher")).GetAwaiter().GetResult();
Console.WriteLine("节点是否存在:" + exists);
}
//获取节点数据
{
var dataResult = zooKeeper.getDataAsync("/mynode", new MyWatcher("GetWatcher")).GetAwaiter().GetResult();
var value = Encoding.UTF8.GetString(dataResult.Data);
Console.WriteLine("完成读取节点:" + value);
}
//设置节点数据
{
var data = Encoding.UTF8.GetBytes("hello world again");
zooKeeper.setDataAsync("/mynode", data);
Console.WriteLine("设置节点数据");
}
//重新获取节点数据
{
var dataResult = zooKeeper.getDataAsync("/mynode", new MyWatcher("GetWatcher")).GetAwaiter().GetResult();
var value = Encoding.UTF8.GetString(dataResult.Data);
Console.WriteLine("重新获取节点数据:" + value);
}
//移除节点
{
zooKeeper.deleteAsync("/mynode").Wait();
Console.WriteLine("移除节点");
}
Console.WriteLine("完成");
Console.ReadKey();
}
}
class MyWatcher : Watcher
{
public string Name { get; private set; }
public MyWatcher(string name)
{
this.Name = name;
}
public override Task process(WatchedEvent @event)
{
var path = @event.getPath();
var state = @event.getState();
Console.WriteLine($"{Name} recieve: Path-{path} State-{@event.getState()} Type-{@event.get_Type()}");
return Task.FromResult(0);
}
}
}
//Zookeeper连接字符串,采用host:port格式,多个地址之间使用逗号(,)隔开
string connectionString = "192.168.209.133:2181,192.168.209.133:2181,192.168.209.133:2181";
//会话超时时间,单位毫秒
int sessionTimeOut = 10000;
//异步监听
var watcher = new MyWatcher("ConnectWatcher");
//连接
ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeOut, watcher);
Thread.Sleep(1000);//停一秒,等待连接完成
while (zooKeeper.getState() == ZooKeeper.States.CONNECTING)
{
Console.WriteLine("等待连接完成...");
Thread.Sleep(1000);
}
//ZooKeeper.States的枚举
CONNECTING = 0, //连接中
CONNECTED = 1, //已连接
CONNECTEDREADONLY = 2, //已连接,但是只能只读访问
CLOSED = 3, //已关闭连接
AUTH_FAILED = 4, //认证失败
NOT_CONNECTED = 5 //未连接 class MyWatcher : Watcher
{
public string Name { get; private set; }
public MyWatcher(string name)
{
this.Name = name;
}
public override Task process(WatchedEvent @event)
{
var path = @event.getPath();
var state = @event.getState();
Console.WriteLine($"{Name} recieve: Path-{path} State-{@event.getState()} Type-{@event.get_Type()}");
return Task.FromResult(0);
}
}
//监听事件响应状态,Watcher.Event.KeeperState的枚举
Expired = -112, //连接超时
Disconnected = 0, //连接断开
SyncConnected = 3, //已同步连接
AuthFailed = 4, //认证失败
ConnectedReadOnly = 5 //只读连接 //监听事件类型,Watcher.Event.EventType的枚举 None = -1, //非节点操作事件
NodeCreated = 1, //创建节点事件
NodeDeleted = 2, //删除节点事件
NodeDataChanged = 3, //节点数据改变
NodeChildrenChanged = 4 //子节点发生改变
//已经定义好的,ZooDefs.Ids的枚举
OPEN_ACL_UNSAFE:完全开放
CREATOR_ALL_ACL:创建该znode的连接拥有所有权限
READ_ACL_UNSAFE:所有的客户端都可读
List acl = new List() {
new ACL((int)ZooDefs.Perms.READ, new Id("ip", "127.0.0.1")),
new ACL((int)(ZooDefs.Perms.READ | ZooDefs.Perms.WRITE), new Id("auth", "id:pass"))
};
PERSISTENT:持久化节点
PERSISTENT_SEQUENTIAL:持久化有序节点
EPHEMERAL:临时节点(连接断开自动删除)
EPHEMERAL_SEQUENTIAL:临时有序节点(连接断开自动删除)
//删除znode节点
public Task deleteAsync(string path, int version = -1);
//指定的znode节点是否存在
public Task
//获取ACL权限
public Task getACLAsync(string path);
//设置ACL权限
public Task
public void addAuthInfo(string scheme, byte[] auth);
world:默认模式,所有客户端都拥有指定的权限。world下只有一个id选项,就是anyone,通常组合写法为world:anyone:[permissons];
auth:只有经过认证的用户才拥有指定的权限。通常组合写法为auth:user:password:[permissons],使用这种模式时,你需要先进行登录,之后采用auth模式设置权限时,user和password都将使用登录的用户名和密码;比如:
digest:只有经过认证的用户才拥有指定的权限。通常组合写法为digest:user:BASE64(SHA1(password)):[permissons],这种形式下的密码必须通过SHA1和BASE64进行双重加密;
ip:限制只有特定IP的客户端才拥有指定的权限。通常组成写法为ip:182.168.0.168:[permissions];
super:代表超级管理员,拥有所有的权限,需要修改Zookeeper启动脚本进行配置。
//world模式认证
zk.addAuthInfo("world",new byte[0]);
//auth模式认证
byte[] auth=Encoding.UTF8.GetBytes("id:pass")
zk.addAuthInfo("auth",new byte[0]);
//digest模式认证
byte[] auth=Encoding.UTF8.GetBytes("加密后的字符串")
zk.addAuthInfo("digest",new byte[0]);
public Task closeAsync();
using org.apache.zookeeper;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;
using System.Text;
using org.apache.zookeeper.data;
using org.apache.utils;
using System.Diagnostics;
namespace AspNetCore.ZookeeperConsole
{
///
文章标题:Zookeeper基础教程(四):C#连接使用Zookeeper
文章链接:http://soscw.com/essay/59802.html