Zookeeper 从入门到入土⑥:API
zk 有 Session,没有线程池的概念
1. 原生 API
Zookeeper API 共包含五个包:
- org.apache.zookeeper
- org.apache.zookeeper.data
- org.apache.zookeeper.server
- org.apache.zookeeper.server.quorum
- org.apache.zookeeper.server.upgrade
依赖:
| 1 | <dependency> <groupId>org.apache.zookeeper</groupId> | 
1.1. 建立会话
ZooKeeper 客户端和服务端会话的建⽴是⼀个异步的过程。
所以如果在 new ZooKeeper 后立即结束方法会话不能建立完毕,会话的⽣命周期中处于 CONNECTING 的状态。
当会话真正创建完毕后 ZooKeeper 服务端会向会话对应的客户端发送⼀个事件通知以告知客户端。
| 1 | public class CreateSession { | 
1.2. 创建节点
ZooKeeper#create(String path, byte[] data, List
- path:节点创建的路径
- data:节点创建要保存的数据
-  acl:节点创建的权限信息(4 种类型) - ZooDefs.Ids.ANYONE_ID_UNSAFE:表示任何⼈
- ZooDefs.Ids.AUTH_IDS:此 ID 仅可⽤于设置 ACL。它将被客户机验证的 ID 替换。
- ZooDefs.Ids.OPEN_ACL_UNSAFE:这是⼀个完全开放的 ACL (常⽤)–> world:anyone
- ZooDefs.Ids.CREATOR_ALL_ACL:此 ACL 授予创建者身份验证 ID 的所有权限
 
-  createMode:创建节点的类型(4 种类型) - CreateMode.PERSISTENT:持久节点
- CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
- CreateMode.EPHEMERAL:临时节点
-  CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31public class CreateNote { 
 private static CountDownLatch countDownLatch = new CountDownLatch(1);
 private static ZooKeeper zooKeeper;
 public static void main(String[] args) throws Exception {
 zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
 if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
 countDownLatch.countDown();
 }
 // 调⽤创建节点⽅法
 try {
 createNodeSync();
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 countDownLatch.await();
 }
 private static void createNodeSync() throws Exception {
 String nodePersistent = zooKeeper.create("/lg_persistent", "持久节点内容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 String nodePersistentSequential = zooKeeper.create("/lg_persistent_sequential", "持久节点内容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
 String nodeEpersistent = zooKeeper.create("/lg_ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 System.out.println("创建的持久节点是:" + nodePersistent);
 System.out.println("创建的持久顺序节点是:" + nodePersistentSequential);
 System.out.println("创建的临时节点是:" + nodeEpersistent);
 }
 }
 
1.3. 获取节点数据
- ZooKeeper#getData(String path, boolean watch, Stat stat):
- path:获取数据的路径
- watch:是否开启监听。ture 代表使用创建 zk 的那个监听
- stat:节点状态信息,null 则表示获取最新版本的数据
- ZooKeeper#getChildren(String path, boolean watch)
- path:路径
-  watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40public class GetNoteData { 
 private static ZooKeeper zooKeeper;
 public static void main(String[] args) throws Exception {
 zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, watchedEvent -> {
 if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
 try {
 getNoteData();
 getChildren();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 // ⼦节点列表发⽣变化时,服务器会发出 NodeChildrenChanged 通知,但不会把变化情况告诉给客户端
 // 需要客户端⾃⾏获取,且通知是⼀次性的,需反复注册监听
 if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
 // 再次获取节点数据
 try {
 List<String> children = zooKeeper.getChildren(watchedEvent.getPath(), true);
 System.out.println(children);
 } catch (KeeperException | InterruptedException e) {
 e.printStackTrace();
 }
 }
 });
 Thread.sleep(Integer.MAX_VALUE);
 }
 private static void getNoteData() throws Exception {
 byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true, null);
 System.out.println(new String(data, StandardCharsets.UTF_8));
 }
 private static void getChildren() throws KeeperException, InterruptedException {
 List<String> children = zooKeeper.getChildren("/lg_persistent", true);
 System.out.println(children);
 }
 }
1.4. 修改节点数据
Stat ZooKeeper#setData(String path, byte[] data, int version)
- path:路径
- data:要修改的内容
-  version:为 -1,表示对最新版本的数据进⾏修改 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26public class UpdateNote { 
 private static ZooKeeper zooKeeper;
 public static void main(String[] args) throws Exception {
 zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
 try {
 updateNodeSync();
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 Thread.sleep(Integer.MAX_VALUE);
 }
 private static void updateNodeSync() throws Exception {
 byte[] data = zooKeeper.getData("/lg_persistent", false, null);
 System.out.println("修改前的值:"+new String(data));
 // 修改 stat:状态信息对象
 // version: -1 代表最新版本
 Stat stat = zooKeeper.setData("/lg_persistent", "客户端修改内容".getBytes(), -1);
 byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
 System.out.println("修改后的值:"+new String(data2));
 }
 }
1.5. 删除节点
- ZooKeeper#exists(String path, boolean watch): 判断节点是否存在
-  ZooKeeper#delete(String path, int version): 删除节点 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24public class DeleteNote { 
 private static ZooKeeper zooKeeper;
 public static void main(String[] args) throws Exception {
 zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
 try {
 deleteNodeSync();
 } catch (Exception e) {
 e.printStackTrace();
 }
 });
 Thread.sleep(Integer.MAX_VALUE);
 }
 private static void deleteNodeSync() throws KeeperException, InterruptedException {
 Stat exists = zooKeeper.exists("/lg_persistent/lg-children", false);
 System.out.println(exists == null ? "该节点不存在":"该节点存在");
 zooKeeper.delete("/lg_persistent/lg-children",-1);
 Stat exists2 = zooKeeper.exists("/lg_persistent/lg-children", false);
 System.out.println(exists2 == null ? "该节点不存在":"该节点存在");
 }
 }
2. Curator
- 【跟着实例学习 ZooKeeper】–队列
- 【跟着实例学习 ZooKeeper】–缓存
- 【跟着实例学习 ZooKeeper】–计数器
- 【跟着实例学习 ZooKeeper】–Curator 扩展库
- 【跟着实例学习 ZooKeeper】–Barrier
- 【跟着实例学习 ZooKeeper】–临时节点
- 【跟着实例学习 ZooKeeper】–Curator 框架应用
- 【跟着实例学习 ZooKeeper】– Leader 选举
- 【跟着实例学习 ZooKeeper】–分布式锁
2.1. 连接
重试策略:
- RetryNTimes: 重试 n 次
- RetryOneTime: 重试 1 次
- RetryForever: 永远重试
-  RetryUntilElapsed: 重试直到超过最大重试时间 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24// 同 RetryNTimes 
 // RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
 // 实际调用 CuratorFrameworkFactory.builder().build();
 // CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.1.110:2181", retryPolicy);
 // 1. 重试 3 次,每次间隔 5s
 RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
 // 2. 只重试 1 次
 RetryPolicy retryPolicy2 = new RetryOneTime(3000);
 // 3. 永远重试,每次间隔 3s
 RetryPolicy retryPolicy3 = new RetryForever(3000);
 // 4. 重试直到超过 2s,每次间隔 3s
 RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
 CuratorFramework client = CuratorFrameworkFactory.builder()
 .connectString("192.168.1.110:2181")
 .sessionTimeoutMs(10000)
 .retryPolicy(retryPolicy)
 .namespace("workspace")
 .build();
 client.start();
2.2. 创建
| 1 | String path = client.create() | 
2.3. 设置节点数据
| 1 | client.setData() | 
2.4. 删除
| 1 | client.delete() | 
2.5. 读取节点数据
| 1 | Stat stat = new Stat(); | 
2.6. 获取子节点
| 1 | List<String> nodeList = client.getChildren().forPath(nodePath); | 
2.7. 检查节点是否存在
| 1 | public Boolean checkExists(String nodePath) { | 
2.8. 监听器
usingWatcher
概述: 监听只会触发一次
| 1 | byte[] data = client.getData() | 
NodeCache
概述: 永久监听 当前路径的节点 的创建、更新、删除
start():
- NodeCache#start()
- NodeCache#start (boolean buildInitial): true 表示初始化时获取 node 的值并保存 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11NodeCache nodeCache = new NodeCache(client, nodePath); 
 // true 表示初始化时获取 node 的值并保存
 nodeCache.start(true);
 if (nodeCache.getCurrentData() != null) {
 System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
 }
 nodeCache.getListenable().addListener(() -> {
 ChildData currentData = nodeCache.getCurrentData();
 System.out.println("节点 "+ currentData.getPath() + " 的数据为:" + new String(currentData.getData()));
 });
PathChildrenCache
概述: 永久监听 当前路径下的子节点 的创建、更新、删除
start():
- NodeCache#start(): 默认 buildInitial 为 false
- NodeCache#start(boolean buildInitial): true 表示初始化时获取 node 的值并保存
- NodeCache#start(StartMode mode):
- StartMode.POST_INITIALIZED_EVENT: 异步初始化,并触发 INITIALIZED 事件
- StartMode.NORMAL: 异步初始化
- StartMode.BUILD_INITIAL_CACHE: 同步初始化
 
事件类型:
- INITIALIZED: 初始化事件
- CHILD_ADDED: 子节点添加事件
- CHILD_REMOVED: 子节点删除事件
-  CHILD_UPDATED:  子节点更新事件 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30// true 表示把节点内容放入 stat 里 
 PathChildrenCache childrenCache = new PathChildrenCache(client, nodePath, true);
 childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
 // 同步初始化后可获取子节点数据
 List<ChildData> childDataList = childrenCache.getCurrentData();
 for (ChildData childData : childDataList) {
 System.out.println(new String(childData.getData()));
 }
 // 监听子节点
 childrenCache.getListenable().addListener((client, event) -> {
 ChildData eventData = event.getData();
 // 初始化事件
 if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
 System.out.println("子节点初始化完成");
 }
 // 子节点添加事件
 else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
 System.out.println("添加子节点 " + eventData.getPath() + ": " + new String(eventData.getData()));
 }
 // 子节点删除事件
 else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
 System.out.println("删除子节点 " + eventData.getPath());
 }
 // 子节点更新事件
 else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
 System.out.println("修改子节点 " + eventData.getPath() + ": " + new String(eventData.getData()));
 }
 });
