分布式框架Zookeeper api的使用介绍
前言
ZooKeeper api共包含五个包,分别为:
- org.apache.zookeeper
- org.apache.zookeeper.data
- org.apache.zookeeper.server
- org.apache.zookeeper.server.quorum
- org.apache.zookeeper.server.upgrade
其中org.apache.zookeeper,包含Zookeeper类,他是我们编程时最常⽤的类文件。这个类是Zookeeper客户端的主要类文件。如果要使用Zookeeper服务,应⽤程序⾸先必须创建⼀个Zookeeper实例,这时就需要使用此类。⼀旦客户端和Zookeeper服务端建立起了连接,Zookeeper系统将会给本次连接会话分配⼀个ID值,并且客户端将会周期性的向服务器端发送心跳来维持会话连接。只要连接有效,客户端就可以使⽤Zookeeper API来做相应处理了
导入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
建立会话
package com.laGou.api;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateSession implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
// 计数工具类 CountDownLatch : 不让main方法结束,让线程处于等待阻塞
countDownLatch.await();
System.out.println("客户端与服务端会话真正建立了");
}
// 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的watcher通知,在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上的等待阻塞,⾄此,会话创建完毕
public void process(WatchedEvent watchedEvent) {
//当连接创建了,服务端发送给客户端SyncConnected事件
if(watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 解除主程序在CountDownLatch上的等待阻塞
countDownLatch.countDown();
}
}
}
注意,ZooKeeper 客户端和服务端会话的建立是⼀个异步的过程,也就是说在程序中,构造⽅法会在处理完客户端初始化工作后立即返回,在⼤多数情况下,此时并没有真正建立好⼀个可用的会话,在会话的生命周期中处于“CONNECTING”的状态。当该会话真正创建完毕后ZooKeeper服务端会向会话对应的客户端发送⼀个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话。
创建节点
package com.lagou.api;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
// 创建节点的方法
private static void createNoteSync() throws InterruptedException, KeeperException {
// 持久节点
String note_persistent = zooKeeper.create("/lg-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 临时节点
String note_ephemeral = zooKeeper.create("/lg-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 持久顺序节点
String note_sequential = zooKeeper.create("/lg-sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建的持久节点: " + note_persistent);
System.out.println("创建的临时节点: " + note_ephemeral);
System.out.println("创建的持久顺序节点: " + note_sequential);
}
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 创建节点
try {
createNoteSync();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
获取节点数据
package com.lagou.api;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.List;
public class GetNoteData implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.nodeChildrenChanged) {
List<String> children = null;
try {
children = zooKeeper.getChildren("/lg-persistent", true);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(children);
}
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 获取节点数据的方法
try {
getNoteData();
// 获取节点的子节点列表方法
getChildren();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void getNoteData() throws Exception {
byte[] data = zooKeeper.getData("/lg-persistent", false, null);
System.out.println(new String(data));
}
public static void getChildren() throws InterruptedException, KeeperException {
List<String> children = zooKeeper.getChildren("/lg-persistent", true);
System.out.println(children);
}
}
修改节点数据
package com.lagou.api;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class UpdateNoteData implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new UpdateNoteData());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 更新数据节点内容的方法
try {
updateNoteSync();
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException(e);
}
}
}
private void updateNoteSync() throws InterruptedException, KeeperException {
byte[] data = zooKeeper.getData("/lg-persistent", false, null);
System.out.println("修改前的值:" + new String(data));
// 修改 /lg-persistent 的数据 stat: 状态信息对象
Stat stat = zooKeeper.setData("/lg-persistent", "客户端修改了节点数据".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/lg-persistent", false, null);
System.out.println("修改后的值:" + new String(data2));
}
}
删除节点
package com.lagou.api;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class DeleteNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());
System.out.println(zooKeeper.getState());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 删除节点
try {
deleteNoteSync();
} catch (InterruptedException | KeeperException e) {
throw new RuntimeException(e);
}
}
}
private void deleteNoteSync() throws InterruptedException, KeeperException {
Stat stat = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(stat == null ? "该节点不存在" : "该节点存在");
if (stat != null) {
zooKeeper.delete("/lg-persistent/c1", -1);
}
Stat stat2 = zooKeeper.exists("/lg-persistent/c1", false);
System.out.println(stat2 == null ? "该节点不存在" : "该节点存在");
}
}
到此这篇关于分布式框架Zookeeper api的使用介绍的文章就介绍到这了,更多相关Zookeeper api内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
相关文章