上面一篇文章,我们学会了zookeeper的基本API和命令,这一篇文章里我们用Java客户端来调用zookeeper的API,只是基本调用不会太详细,需要详细的可网上自行看文档。
一、环境准备
我这里的环境是window上的虚拟机IP为192.168.157.6,然后需要关闭防火墙(或者开放2181端口,因为这里只是测试,所以就直接执行命令systemctl stop firewalld关闭);新建一个maven项目,因为这篇案例笔记用了Java的客户端以及封装的更全的客户端zkClient所以pom.xml引入如下依赖:
<!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.2</version></dependency><!-- https://mvnrepository.com/artifact/com.101tec/zkclient --><dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version></dependency>
二、Java客户端测试
这里发现初始化链接的时候超级慢,如果超时时间为10秒以内,基本上都会超时,推测原因为当使用ip创建ZooKeeper对象时,如果host中没有ip到主机名的映射,ZooKeeper创建过程中会调用ZooInetAddress.getHostName()这个方法从网络中获取主机名,这里耗费时间太长所致。所以如果不相等就设置下host进行主机名的映射吧。
package com.suibibk.zookeeper;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.zookeeper.AsyncCallback;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs.Perms;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.ACL;import org.apache.zookeeper.data.Id;import org.apache.zookeeper.data.Stat;import org.junit.Before;import org.junit.Test;/*** zookeeper API测试* @author 小林同学**/public class CURD {ZooKeeper zookeeper;@Beforepublic void initZookeeper() throws IOException {String connectString = "192.168.157.6:2181";int sessionTimeout = 50*1000;zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {//可做其他操作(设置监听或观察者)System.out.println("初始化监听:"+event.getPath());}});}//------------------------------查询-------------------------------///*** 查询数据:没有启动监听,/test2节点数据改变不会促发监听* @throws KeeperException* @throws InterruptedException*/@Testpublic void getData1() throws KeeperException, InterruptedException {String path ="/test2";//不启动监听Boolean watch = false;Stat stat = new Stat();byte[] bytes = zookeeper.getData(path, watch, stat);String result =new String(bytes);System.out.println("result:"+result);System.out.println("stat:"+stat);//暂停线程Thread.sleep(Long.MAX_VALUE);}/*** 查询数据:启动监听,/test2节点数据改变会促发监听,但是指只会监听一次* @throws KeeperException* @throws InterruptedException*/@Testpublic void getData2() throws KeeperException, InterruptedException {String path ="/test2";//启动监听Boolean watch = true;Stat stat = new Stat();byte[] bytes = zookeeper.getData(path, watch, stat);String result =new String(bytes);System.out.println("result:"+result);System.out.println("stat:"+stat);//暂停线程Thread.sleep(Long.MAX_VALUE);}/*** 查询数据:自定义监听,这个也是只能监听一次,有自定义监听器了后就不会触发初始化时候的监听器* @throws KeeperException* @throws InterruptedException*/@Testpublic void getData3() throws KeeperException, InterruptedException {String path ="/test2";//启动监听Stat stat = new Stat();byte[] bytes = zookeeper.getData(path, new Watcher() {public void process(WatchedEvent event) {System.out.println("getData3:"+event);}}, stat);String result =new String(bytes);System.out.println("result:"+result);System.out.println("stat:"+stat);//暂停线程Thread.sleep(Long.MAX_VALUE);}/*** 查询数据:自定义监听,循环监听,有自定义监听器了后就不会触发初始化时候的监听器* @throws KeeperException* @throws InterruptedException*/@Testpublic void getData4() throws KeeperException, InterruptedException {String path ="/test2";//启动监听Stat stat = new Stat();byte[] bytes = zookeeper.getData(path, new Watcher() {public void process(WatchedEvent event) {System.out.println("getData3:"+event);//重新监听try {zookeeper.getData(event.getPath(), this, null);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}}, stat);String result =new String(bytes);System.out.println("result:"+result);System.out.println("stat:"+stat);//暂停线程Thread.sleep(Long.MAX_VALUE);}/*** 查询数据:用DataCallback返回* @throws KeeperException* @throws InterruptedException*/@Testpublic void getData5() throws KeeperException, InterruptedException {String path ="/test2";//启动监听String ctx ="我是外部参数";zookeeper.getData(path, false,new AsyncCallback.DataCallback() {public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {//状态码OK表示成功 对应 0System.out.println("rc:"+rc);//路径System.out.println("path:"+path);//上下文System.out.println("ctx:"+ctx);//数据System.out.println("data:"+new String(data));System.out.println("stat:"+stat);}}, ctx);//暂停线程(因为是异步获取的)Thread.sleep(Long.MAX_VALUE);}/*** 查询子节点* @throws KeeperException* @throws InterruptedException*/@Testpublic void getChildren() throws KeeperException, InterruptedException {String path ="/test2";List<String> paths = zookeeper.getChildren(path, false);for (String p : paths) {System.out.println("path:"+p);}//暂停线程Thread.sleep(Long.MAX_VALUE);}/*** 查询子节点,开启监听,添加自定义监听* @throws KeeperException* @throws InterruptedException*/@Testpublic void getChildren2() throws KeeperException, InterruptedException {String path ="/test2";List<String> paths = zookeeper.getChildren(path, new Watcher() {public void process(WatchedEvent event) {//这里监听打印的是父节点:只有在子节点的增删才会触发System.out.println("event:"+event);}});for (String p : paths) {System.out.println("path:"+p);}//暂停线程Thread.sleep(Long.MAX_VALUE);}//-------------------------------------添加----------------------------///*** 创建节点CreateMode可以选择持久节点、临时节点、持久序号节点、临时序号界节点* @throws InterruptedException* @throws KeeperException*/@Testpublic void create() throws KeeperException, InterruptedException {String path="/test2/suibibk";byte[] data ="https://www.suibibk.com".getBytes();//ACL事务是有Perms中的属性来或的//int READ = 1 << 0; 00000001 1//int WRITE = 1 << 1; 00000010 2//int CREATE = 1 << 2; 00000100 4//int DELETE = 1 << 3; 00001000 8//int ADMIN = 1 << 4; 00010000 16//int ALL = READ | WRITE | CREATE | DELETE | ADMIN;//规律如下,若想要READ和WRITE的权限,则只需要READ|WRITE即可,值在十进制为1+2//1,2,3,6,16这几个数字刚刚号可以凑出1~31来//eg:读写权限int perms = Perms.READ|Perms.WRITE;ACL acl1 = new ACL(perms, new Id("world", "anyone"));ACL acl2 = new ACL(perms, new Id("ip", "127.0.0.1"));ACL acl3 = new ACL(perms, new Id("ip", "192.168.157.1"));List<ACL> acl =new ArrayList<ACL>();acl.add(acl1);acl.add(acl2);acl.add(acl3);zookeeper.create(path, data, acl, CreateMode.PERSISTENT);}/*** 删除节点* @throws InterruptedException* @throws KeeperException*/@Testpublic void delete() throws KeeperException, InterruptedException {String path="/test2/suibibk";//-1表示不考虑当前版本信息zookeeper.delete(path, -1);}/*** 修改节点* @throws InterruptedException* @throws KeeperException*/@Testpublic void update() throws KeeperException, InterruptedException {String path="/test2";//-1表示不考虑当前版本信息zookeeper.setData(path, "炼词小程序".getBytes(), -1);}}
大家看例子应该可以很清楚了
三、zkClient客户端测试
package com.suibibk.zookeeper;import java.io.IOException;import java.util.List;import org.I0Itec.zkclient.IZkChildListener;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import org.apache.zookeeper.CreateMode;import org.junit.Before;import org.junit.Test;/*** ZKClient在原生API接口上进行了包装,同时在内部实现了诸如session超时重连、watcher反复注册等功能,* 使得zookeeper客户端繁琐的细节对开发人员透明。* @author 小林同学*/public class ZkClientTest {ZkClient zkClient;@Beforepublic void initZookeeper() throws IOException {String connectString = "192.168.157.6:2181";zkClient = new ZkClient(connectString, 50*1000);}/*** 创建节点*/@Testpublic void create() {zkClient.create("/test2/suibibk", "https://www.suibibkc.om", CreateMode.PERSISTENT);}/*** 查询节点*/@Testpublic void get() {List<String> strs = zkClient.getChildren("/test2");for (String path : strs) {System.out.println(path);}}/*** 修改节点*/@Testpublic void update() {zkClient.writeData("/test2", "lianci");}/*** 删除节点*/@Testpublic void delete() {zkClient.delete("/test2/suibibk");}/*** 添加监听* @throws InterruptedException*/@Testpublic void subscribe() throws InterruptedException {System.out.println("开始监听");//发现不会触发,由于zkClient创建连接的时候指定了默认的序列化类-new SerializableSerializer(),//所以存储在节点上的值也是序列化后的字节数组,当使用zkCli.sh在控制台set /xxx/xx的值时,存储的是普通的字符串字节数组。//所以当set值时虽然触发了值改变事件,但zkClient无法反序列化这个值。zkClient.subscribeDataChanges("/test2", new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {System.out.println("监听到节点删除:"+dataPath);}public void handleDataChange(String dataPath, Object data) throws Exception {System.out.println("监听到节点:"+dataPath+";数据改变:"+data);}});//这里才会触发zkClient.writeData("/test2", "dog");Thread.sleep(Integer.MAX_VALUE);}/*** 添加监听,监听子节点* @throws InterruptedException*/@Testpublic void subscribeChilds() throws InterruptedException {System.out.println("开始监听");//对父节点添加监听子节点变化。zkClient.subscribeChildChanges("/test2", new IZkChildListener() {public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {// 节点System.out.println(">>>parentPath: " + parentPath);// 子节点System.out.println(">>>currentChilds: " + currentChilds);}});Thread.sleep(Integer.MAX_VALUE);}}
结束!
