Zookeeper 从入门到入土⑥:API

zk 有 Session,没有线程池的概念

1. 原生 API

Zookeeper API 共包含五个包:

  • org.apache.zookeeper
  • org.apache.zookeeper.data
  • org.apache.zookeeper.server
  • org.apache.zookeeper.server.quorum
  • org.apache.zookeeper.server.upgrade

依赖:

1
2
3
4
<dependency> <groupId>org.apache.zookeeper</groupId> 
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>

1.1. 建立会话

ZooKeeper 客户端和服务端会话的建⽴是⼀个异步的过程。
所以如果在 new ZooKeeper 后立即结束方法会话不能建立完毕,会话的⽣命周期中处于 CONNECTING 的状态。
当会话真正创建完毕后 ZooKeeper 服务端会向会话对应的客户端发送⼀个事件通知以告知客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CreateSession {

private static CountDownLatch countDownLatch = new CountDownLatch(1);

public static void main(String[] args) throws InterruptedException, IOException {
/*
客户端可以通过创建⼀个 zk 实例来连接 zk 服务器
- connectString:连接地址:IP:端⼝
- sesssionTimeOut:会话超时时间:单位毫秒
- Wather:监听器(当特定事件触发监听时,zk 会通过 watcher 通知到客户端)
*/
ZooKeeper zooKeeper = new ZooKeeper("10.211.55.4:2181,10.211.55.5:2181", 5000, watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
});
System.out.println(zooKeeper.getState());
countDownLatch.await();
System.out.println("=========Client Connected to zookeeper==========");
}

}

1.2. 创建节点

ZooKeeper#create(String path, byte[] data, List acl, CreateMode createMode):

  • path:节点创建的路径
  • data:节点创建要保存的数据
  • acl:节点创建的权限信息(4 种类型)
    • ZooDefs.Ids.ANYONE_ID_UNSAFE:表示任何⼈
    • ZooDefs.Ids.AUTH_IDS:此 ID 仅可⽤于设置 ACL。它将被客户机验证的 ID 替换。
    • ZooDefs.Ids.OPEN_ACL_UNSAFE:这是⼀个完全开放的 ACL (常⽤)–> world:anyone
    • ZooDefs.Ids.CREATOR_ALL_ACL:此 ACL 授予创建者身份验证 ID 的所有权限
  • createMode:创建节点的类型(4 种类型)
    • CreateMode.PERSISTENT:持久节点
    • CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
    • CreateMode.EPHEMERAL:临时节点
    • CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      public class CreateNote {

      private static CountDownLatch countDownLatch = new CountDownLatch(1);
      private static ZooKeeper zooKeeper;

      public static void main(String[] args) throws Exception {
      zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
      if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
      countDownLatch.countDown();
      }

      // 调⽤创建节点⽅法
      try {
      createNodeSync();
      } catch (Exception e) {
      e.printStackTrace();
      }
      });
      countDownLatch.await();
      }

      private static void createNodeSync() throws Exception {
      String nodePersistent = zooKeeper.create("/lg_persistent", "持久节点内容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      String nodePersistentSequential = zooKeeper.create("/lg_persistent_sequential", "持久节点内容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
      String nodeEpersistent = zooKeeper.create("/lg_ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

      System.out.println("创建的持久节点是:" + nodePersistent);
      System.out.println("创建的持久顺序节点是:" + nodePersistentSequential);
      System.out.println("创建的临时节点是:" + nodeEpersistent);
      }
      }

1.3. 获取节点数据

  1. ZooKeeper#getData(String path, boolean watch, Stat stat):
  • path:获取数据的路径
  • watch:是否开启监听。ture 代表使用创建 zk 的那个监听
  • stat:节点状态信息,null 则表示获取最新版本的数据
  1. ZooKeeper#getChildren(String path, boolean watch)
  • path:路径
  • watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class GetNoteData {

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
    zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, watchedEvent -> {
    if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
    try {
    getNoteData();
    getChildren();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    // ⼦节点列表发⽣变化时,服务器会发出 NodeChildrenChanged 通知,但不会把变化情况告诉给客户端
    // 需要客户端⾃⾏获取,且通知是⼀次性的,需反复注册监听
    if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
    // 再次获取节点数据
    try {
    List<String> children = zooKeeper.getChildren(watchedEvent.getPath(), true);
    System.out.println(children);
    } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
    }
    }
    });
    Thread.sleep(Integer.MAX_VALUE);
    }

    private static void getNoteData() throws Exception {
    byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true, null);
    System.out.println(new String(data, StandardCharsets.UTF_8));
    }

    private static void getChildren() throws KeeperException, InterruptedException {
    List<String> children = zooKeeper.getChildren("/lg_persistent", true);
    System.out.println(children);
    }
    }

1.4. 修改节点数据

Stat ZooKeeper#setData(String path, byte[] data, int version)

  • path:路径
  • data:要修改的内容
  • version:为 -1,表示对最新版本的数据进⾏修改
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class UpdateNote {
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
    zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
    try {
    updateNodeSync();
    } catch (Exception e) {
    e.printStackTrace();
    }
    });
    Thread.sleep(Integer.MAX_VALUE);
    }

    private static void updateNodeSync() throws Exception {
    byte[] data = zooKeeper.getData("/lg_persistent", false, null);
    System.out.println("修改前的值:"+new String(data));

    // 修改 stat:状态信息对象
    // version: -1 代表最新版本
    Stat stat = zooKeeper.setData("/lg_persistent", "客户端修改内容".getBytes(), -1);

    byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
    System.out.println("修改后的值:"+new String(data2));
    }
    }

1.5. 删除节点

  1. ZooKeeper#exists(String path, boolean watch): 判断节点是否存在
  2. ZooKeeper#delete(String path, int version): 删除节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class DeleteNote {
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
    zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
    try {
    deleteNodeSync();
    } catch (Exception e) {
    e.printStackTrace();
    }
    });
    Thread.sleep(Integer.MAX_VALUE);
    }

    private static void deleteNodeSync() throws KeeperException, InterruptedException {
    Stat exists = zooKeeper.exists("/lg_persistent/lg-children", false);
    System.out.println(exists == null ? "该节点不存在":"该节点存在");

    zooKeeper.delete("/lg_persistent/lg-children",-1);

    Stat exists2 = zooKeeper.exists("/lg_persistent/lg-children", false);
    System.out.println(exists2 == null ? "该节点不存在":"该节点存在");
    }
    }

2. Curator

  1. 【跟着实例学习 ZooKeeper】–队列
  2. 【跟着实例学习 ZooKeeper】–缓存
  3. 【跟着实例学习 ZooKeeper】–计数器
  4. 【跟着实例学习 ZooKeeper】–Curator 扩展库
  5. 【跟着实例学习 ZooKeeper】–Barrier
  6. 【跟着实例学习 ZooKeeper】–临时节点
  7. 【跟着实例学习 ZooKeeper】–Curator 框架应用
  8. 【跟着实例学习 ZooKeeper】– Leader 选举
  9. 【跟着实例学习 ZooKeeper】–分布式锁

2.1. 连接

重试策略:

  • RetryNTimes:          重试 n 次
  • RetryOneTime:        重试 1 次
  • RetryForever:          永远重试
  • RetryUntilElapsed: 重试直到超过最大重试时间
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // 同 RetryNTimes
    // RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
    // 实际调用 CuratorFrameworkFactory.builder().build();
    // CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.1.110:2181", retryPolicy);

    // 1. 重试 3 次,每次间隔 5s
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

    // 2. 只重试 1 次
    RetryPolicy retryPolicy2 = new RetryOneTime(3000);

    // 3. 永远重试,每次间隔 3s
    RetryPolicy retryPolicy3 = new RetryForever(3000);

    // 4. 重试直到超过 2s,每次间隔 3s
    RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

    CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("192.168.1.110:2181")
    .sessionTimeoutMs(10000)
    .retryPolicy(retryPolicy)
    .namespace("workspace")
    .build();
    client.start();

2.2. 创建

1
2
3
4
5
6
7
String path = client.create()
// 1.递归创建节点
//.creatingParentsIfNeeded()
//.withMode(createMode)
// 2.ACL
//.withACL(aclList)
.forPath(nodePath, "data".getBytes());

2.3. 设置节点数据

1
2
3
4
client.setData()
// 可带版本
//.withVersion(version)
.forPath(nodePath, "data".getBytes());

2.4. 删除

1
2
3
4
5
6
7
8
client.delete()
// 1.如果删除失败,那么在后端还是继续会删除,直到成功(强制删除)
//.guaranteed()
// 2.如果有子节点,就删除
//.deletingChildrenIfNeeded()
// 3.指定版本
//.withVersion(1)
.forPath(nodePath);

2.5. 读取节点数据

1
2
3
4
5
6
7
8
9
Stat stat = new Stat();

byte[] data = client.getData()
// 包含状态查询
//.storingStatIn(stat)
.forPath(nodePath);

System.out.println("节点" + nodePath + "的数据为:" + new String(data));
System.out.println("该节点的版本号为:"+ stat.getVersion());

2.6. 获取子节点

1
List<String> nodeList = client.getChildren().forPath(nodePath);

2.7. 检查节点是否存在

1
2
3
public Boolean checkExists(String nodePath) {
return client.checkExists().forPath(nodePath) != null;
}

2.8. 监听器

usingWatcher
概述: 监听只会触发一次

1
2
3
byte[] data = client.getData()
.usingWatcher(event -> System.out.println("触发watcher,节点路径为:" + event.getPath()))
.forPath(nodePath);

NodeCache
概述: 永久监听 当前路径的节点 的创建、更新、删除

start():

  • NodeCache#start()
  • NodeCache#start (boolean buildInitial): true 表示初始化时获取 node 的值并保存
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    NodeCache nodeCache = new NodeCache(client, nodePath);
    // true 表示初始化时获取 node 的值并保存
    nodeCache.start(true);
    if (nodeCache.getCurrentData() != null) {
    System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
    }

    nodeCache.getListenable().addListener(() -> {
    ChildData currentData = nodeCache.getCurrentData();
    System.out.println("节点 "+ currentData.getPath() + " 的数据为:" + new String(currentData.getData()));
    });

PathChildrenCache
概述: 永久监听 当前路径下的子节点 的创建、更新、删除

start():

  • NodeCache#start():                                  默认 buildInitial 为 false
  • NodeCache#start(boolean buildInitial): true 表示初始化时获取 node 的值并保存
  • NodeCache#start(StartMode mode):
    • StartMode.POST_INITIALIZED_EVENT: 异步初始化,并触发 INITIALIZED 事件
    • StartMode.NORMAL:                               异步初始化
    • StartMode.BUILD_INITIAL_CACHE:        同步初始化

事件类型:

  • INITIALIZED:           初始化事件
  • CHILD_ADDED:      子节点添加事件
  • CHILD_REMOVED: 子节点删除事件
  • CHILD_UPDATED:  子节点更新事件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // true 表示把节点内容放入 stat 里
    PathChildrenCache childrenCache = new PathChildrenCache(client, nodePath, true);
    childrenCache.start(StartMode.POST_INITIALIZED_EVENT);

    // 同步初始化后可获取子节点数据
    List<ChildData> childDataList = childrenCache.getCurrentData();
    for (ChildData childData : childDataList) {
    System.out.println(new String(childData.getData()));
    }

    // 监听子节点
    childrenCache.getListenable().addListener((client, event) -> {
    ChildData eventData = event.getData();
    // 初始化事件
    if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
    System.out.println("子节点初始化完成");
    }
    // 子节点添加事件
    else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
    System.out.println("添加子节点 " + eventData.getPath() + ": " + new String(eventData.getData()));
    }
    // 子节点删除事件
    else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
    System.out.println("删除子节点 " + eventData.getPath());
    }
    // 子节点更新事件
    else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
    System.out.println("修改子节点 " + eventData.getPath() + ": " + new String(eventData.getData()));
    }
    });