Zookeeper

image-20231011142155575



安装

#上传

#在opt创建zooKeeper目录
mkdir zooKeeper
#创建zookeeper数据目录
mkdir zkdata

#解压 /opt/zooKeeper
tar -zxvf apache-zookeeper-3.9.0-bin.tar.gz

#进入到conf目录
cd conf

#拷贝 -- 因为 zoo_sample 这个配置文件不能生效,名字要为 zoo
cp zoo_sample.cfg zoo.cfg

#编辑zoo.cfg 更改数据目录位置和更改端口
vim zoo.cfg
dataDir=/opt/zooKeeper/zkdata
# admin.serverPort 默认占8080端口
admin.serverPort=8882






数据模型

  • 默认持久化

  • 临时节点: -e

  • 顺序节点:-s

image-20231011142831459






服务端命令

【Linux】

#############  服务端   bin 目录 #############  
#启动 -- 成功:Starting zookeeper ... STARTED
./zkServer.sh start

#重启
./zkServer.sh restart

#关闭
./zkServer.sh stop

#查看状态
./zkServer.sh status


【Windows】

  • 启动 直接打开zkServer.cmd




客户端命令

image-20231011144815993

image-20231011145725964


#连接客户端  【使用bin目录下的 zkCli.sh ,本机不用写服务端口】
./zkCli.sh -server localhost:2181

#退出
quit

#查看节点 跟进目录在/后加目录
ls /
ls /dubbo

#创建节点 可携带数据 create /node/data
create /app1
create /app2 chen

#获取节点数据 get /node
get /app2

#更改节点数据 set /node data
set /app2/peng

#删除节点 delete /node
delete /app2
#删除全部节点 delete /node
deleteall /app2


  • 创建 临时、顺序 节点
#临时 create -e /node/node..
create -e /app1

#临时 create -s /node/node..
create -s /app2





JavaAPI - Curator



依赖

<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>


连接

/**
* @param connectstring 连接字符串。zk5 erver地址端口口"47.115.222.113:2181,47.115.222.114:2181”
* @param sessionTimeoutMs 会话超时时间单位ms
* @param connectionTimeoutMs 连接超时时间单位ms
* @param retryPolicy 重试策略
* @Param namespace 默认当前为根目录,增删查改在这目录之上
*/
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);

//第一种连接:
CuratorFramework client = CuratorFrameworkFactory.newClient("47.115.222.113:2181", 60*1000, 15*1000, retryPolicy);
//开启连接
client.start();

//第二种方式:链式编程
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("47.115.222.113:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("chen")
.build();
client.start();


创建节点

//1.创建节点  --  不设置数据,默认当前ip作为存储数据
String s = client.create().forPath("/app1");
System.out.println(s);

//2.节点带数据
String app2 = client.create().forPath("/app2", "hello".getBytes());
System.out.println(app2);

//3.设置节点类型
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","临时".getBytes());

//4.创建多级节点
client.create().creatingParentsIfNeeded().forPath("/app4/c1","多级节点".getBytes());


查询

//查询节点数据
byte[] bytes = client.getData().forPath("/app4/c1");

//查询子节点 ls
List<String> list = client.getChildren().forPath("/");

//查询节点信息 ls -s
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");


修改

//修改
client.setData().forPath("/app1","app1111".getBytes());


//根据节点信息的版本号进行修改
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
int version = stat.getVersion();
client.setData().withVersion(version).forPath("/app1","乐观锁锁原理".getBytes());


删除

//删除
client.delete().forPath("/app1");

//删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");

//必须删除成功 -- 防止网络抖动
client.delete().guaranteed().forPath("/app2");

//回调 删除成功执行指定方法
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("删除成功后的回调");
System.out.println(event);
}
}).forPath("/app3");




Watch事件监听

image-20231011161224882



NodeCache

  • 感知自己发生变化

//1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client,"/app1");

//2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("发生变化");
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("新数据"+new String(data));
}
});

//3.开启监听 (加载缓存数据)
nodeCache.start(true);

while (true){}


PathChildrenCache

  • 只感知子节点变化
// PathChildrenCache : 监听指定的节点的子节点

//1.PathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2",true);

//2.绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("字节点发生变化");
System.out.println(event);

//监听字节点的数据变更,获取新数据
//1.获取类型
PathChildrenCacheEvent.Type type = event.getType();
//2.判断类型是否更新
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = event.getData().getData();
System.out.println("新数据"+new String(data));
}
}
});

//3.开启
pathChildrenCache.start();

while (true){}


TreeCache

// TreeCache : 监听指定的节点和其子节点

//1.PathChildrenCache
TreeCache treeCache = new TreeCache(client, "/app2");

//2.绑定监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("发生变化");

}
});

//3.开启
treeCache.start();

while (true){}





分布式锁

image-20231011170322442


image-20231011170848174



单机情况下:多线程可以使用同步代码块或者锁来解决并发问题,但集群不可以,不止一个Jvm

//在方法体中定义分布式锁
private InterProcessLock lock;
public Ticket12306(){

//客户端对象
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("47.115.222.113:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
client.start();

//参数:(客户端对象,节点路径)
lock = new InterProcessMutex(client,"/lock");
}



lock.acquire(3, TimeUnit.SECONDS); //获取锁(等待时间)

ock.release();//释放锁





集群

  • 1.按编号
  • 2.最近操作

image-20231011195718023

image-20231011212428493