Zookeeper 从入门到入土⑥:API
zk 有 Session,没有线程池的概念
1. 原生 API
Zookeeper API 共包含五个包:
- org.apache.zookeeper
- org.apache.zookeeper.data
- org.apache.zookeeper.server
- org.apache.zookeeper.server.quorum
- org.apache.zookeeper.server.upgrade
依赖:
1 | <dependency> <groupId>org.apache.zookeeper</groupId> |
1.1. 建立会话
ZooKeeper 客户端和服务端会话的建⽴是⼀个异步的过程。
所以如果在 new ZooKeeper
后立即结束方法会话不能建立完毕,会话的⽣命周期中处于 CONNECTING 的状态。
当会话真正创建完毕后 ZooKeeper 服务端会向会话对应的客户端发送⼀个事件通知以告知客户端。
1 | public class CreateSession { |
1.2. 创建节点
ZooKeeper#create(String path, byte[] data, List
- path:节点创建的路径
- data:节点创建要保存的数据
- acl:节点创建的权限信息(4 种类型)
- ZooDefs.Ids.ANYONE_ID_UNSAFE:表示任何⼈
- ZooDefs.Ids.AUTH_IDS:此 ID 仅可⽤于设置 ACL。它将被客户机验证的 ID 替换。
- ZooDefs.Ids.OPEN_ACL_UNSAFE:这是⼀个完全开放的 ACL (常⽤)–> world:anyone
- ZooDefs.Ids.CREATOR_ALL_ACL:此 ACL 授予创建者身份验证 ID 的所有权限
- createMode:创建节点的类型(4 种类型)
- CreateMode.PERSISTENT:持久节点
- CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
- CreateMode.EPHEMERAL:临时节点
- CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class CreateNote {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
// 调⽤创建节点⽅法
try {
createNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
});
countDownLatch.await();
}
private static void createNodeSync() throws Exception {
String nodePersistent = zooKeeper.create("/lg_persistent", "持久节点内容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String nodePersistentSequential = zooKeeper.create("/lg_persistent_sequential", "持久节点内容".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
String nodeEpersistent = zooKeeper.create("/lg_ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建的持久节点是:" + nodePersistent);
System.out.println("创建的持久顺序节点是:" + nodePersistentSequential);
System.out.println("创建的临时节点是:" + nodeEpersistent);
}
}
1.3. 获取节点数据
- ZooKeeper#getData(String path, boolean watch, Stat stat):
- path:获取数据的路径
- watch:是否开启监听。ture 代表使用创建 zk 的那个监听
- stat:节点状态信息,null 则表示获取最新版本的数据
- ZooKeeper#getChildren(String path, boolean watch)
- path:路径
- watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class GetNoteData {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
try {
getNoteData();
getChildren();
} catch (Exception e) {
e.printStackTrace();
}
}
// ⼦节点列表发⽣变化时,服务器会发出 NodeChildrenChanged 通知,但不会把变化情况告诉给客户端
// 需要客户端⾃⾏获取,且通知是⼀次性的,需反复注册监听
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
// 再次获取节点数据
try {
List<String> children = zooKeeper.getChildren(watchedEvent.getPath(), true);
System.out.println(children);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static void getNoteData() throws Exception {
byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true, null);
System.out.println(new String(data, StandardCharsets.UTF_8));
}
private static void getChildren() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/lg_persistent", true);
System.out.println(children);
}
}
1.4. 修改节点数据
Stat ZooKeeper#setData(String path, byte[] data, int version)
- path:路径
- data:要修改的内容
- version:为 -1,表示对最新版本的数据进⾏修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class UpdateNote {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
try {
updateNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static void updateNodeSync() throws Exception {
byte[] data = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改前的值:"+new String(data));
// 修改 stat:状态信息对象
// version: -1 代表最新版本
Stat stat = zooKeeper.setData("/lg_persistent", "客户端修改内容".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改后的值:"+new String(data2));
}
}
1.5. 删除节点
- ZooKeeper#exists(String path, boolean watch): 判断节点是否存在
- ZooKeeper#delete(String path, int version): 删除节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public class DeleteNote {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, watchedEvent -> {
try {
deleteNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread.sleep(Integer.MAX_VALUE);
}
private static void deleteNodeSync() throws KeeperException, InterruptedException {
Stat exists = zooKeeper.exists("/lg_persistent/lg-children", false);
System.out.println(exists == null ? "该节点不存在":"该节点存在");
zooKeeper.delete("/lg_persistent/lg-children",-1);
Stat exists2 = zooKeeper.exists("/lg_persistent/lg-children", false);
System.out.println(exists2 == null ? "该节点不存在":"该节点存在");
}
}
2. Curator
- 【跟着实例学习 ZooKeeper】–队列
- 【跟着实例学习 ZooKeeper】–缓存
- 【跟着实例学习 ZooKeeper】–计数器
- 【跟着实例学习 ZooKeeper】–Curator 扩展库
- 【跟着实例学习 ZooKeeper】–Barrier
- 【跟着实例学习 ZooKeeper】–临时节点
- 【跟着实例学习 ZooKeeper】–Curator 框架应用
- 【跟着实例学习 ZooKeeper】– Leader 选举
- 【跟着实例学习 ZooKeeper】–分布式锁
2.1. 连接
重试策略:
- RetryNTimes: 重试 n 次
- RetryOneTime: 重试 1 次
- RetryForever: 永远重试
- RetryUntilElapsed: 重试直到超过最大重试时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// 同 RetryNTimes
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
// 实际调用 CuratorFrameworkFactory.builder().build();
// CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.1.110:2181", retryPolicy);
// 1. 重试 3 次,每次间隔 5s
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
// 2. 只重试 1 次
RetryPolicy retryPolicy2 = new RetryOneTime(3000);
// 3. 永远重试,每次间隔 3s
RetryPolicy retryPolicy3 = new RetryForever(3000);
// 4. 重试直到超过 2s,每次间隔 3s
RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.1.110:2181")
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.namespace("workspace")
.build();
client.start();
2.2. 创建
1 | String path = client.create() |
2.3. 设置节点数据
1 | client.setData() |
2.4. 删除
1 | client.delete() |
2.5. 读取节点数据
1 | Stat stat = new Stat(); |
2.6. 获取子节点
1 | List<String> nodeList = client.getChildren().forPath(nodePath); |
2.7. 检查节点是否存在
1 | public Boolean checkExists(String nodePath) { |
2.8. 监听器
usingWatcher
概述: 监听只会触发一次
1 | byte[] data = client.getData() |
NodeCache
概述: 永久监听 当前路径的节点 的创建、更新、删除
start():
- NodeCache#start()
- NodeCache#start (boolean buildInitial): true 表示初始化时获取 node 的值并保存
1
2
3
4
5
6
7
8
9
10
11NodeCache nodeCache = new NodeCache(client, nodePath);
// true 表示初始化时获取 node 的值并保存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
}
nodeCache.getListenable().addListener(() -> {
ChildData currentData = nodeCache.getCurrentData();
System.out.println("节点 "+ currentData.getPath() + " 的数据为:" + new String(currentData.getData()));
});
PathChildrenCache
概述: 永久监听 当前路径下的子节点 的创建、更新、删除
start():
- NodeCache#start(): 默认 buildInitial 为 false
- NodeCache#start(boolean buildInitial): true 表示初始化时获取 node 的值并保存
- NodeCache#start(StartMode mode):
- StartMode.POST_INITIALIZED_EVENT: 异步初始化,并触发 INITIALIZED 事件
- StartMode.NORMAL: 异步初始化
- StartMode.BUILD_INITIAL_CACHE: 同步初始化
事件类型:
- INITIALIZED: 初始化事件
- CHILD_ADDED: 子节点添加事件
- CHILD_REMOVED: 子节点删除事件
- CHILD_UPDATED: 子节点更新事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30// true 表示把节点内容放入 stat 里
PathChildrenCache childrenCache = new PathChildrenCache(client, nodePath, true);
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
// 同步初始化后可获取子节点数据
List<ChildData> childDataList = childrenCache.getCurrentData();
for (ChildData childData : childDataList) {
System.out.println(new String(childData.getData()));
}
// 监听子节点
childrenCache.getListenable().addListener((client, event) -> {
ChildData eventData = event.getData();
// 初始化事件
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
System.out.println("子节点初始化完成");
}
// 子节点添加事件
else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
System.out.println("添加子节点 " + eventData.getPath() + ": " + new String(eventData.getData()));
}
// 子节点删除事件
else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
System.out.println("删除子节点 " + eventData.getPath());
}
// 子节点更新事件
else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
System.out.println("修改子节点 " + eventData.getPath() + ": " + new String(eventData.getData()));
}
});