Zookeeper

安装
mkdir zooKeeper
mkdir zkdata
tar -zxvf apache-zookeeper-3.9.0-bin.tar.gz
cd conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg dataDir=/opt/zooKeeper/zkdata
admin.serverPort=8882
|
数据模型

服务端命令
【Linux】
./zkServer.sh start
./zkServer.sh restart
./zkServer.sh stop
./zkServer.sh status
|
【Windows】
客户端命令


#连接客户端 【使用bin目录下的 zkCli.sh ,本机不用写服务端口】 ./zkCli.sh -server localhost:2181
#退出 quit
#查看节点 跟进目录在/后加目录 ls / ls /dubbo
#创建节点 可携带数据 create /node/data create /app1 create /app2 chen
#获取节点数据 get /node get /app2
#更改节点数据 set /node data set /app2/peng
#删除节点 delete /node delete /app2 #删除全部节点 delete /node deleteall /app2
|
create -e /app1
create -s /app2
|
JavaAPI - Curator
依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency>
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
|
连接
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
CuratorFramework client = CuratorFrameworkFactory.newClient("47.115.222.113:2181", 60*1000, 15*1000, retryPolicy);
client.start();
CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("47.115.222.113:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .namespace("chen") .build(); client.start();
|
创建节点
String s = client.create().forPath("/app1"); System.out.println(s);
String app2 = client.create().forPath("/app2", "hello".getBytes()); System.out.println(app2);
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","临时".getBytes());
client.create().creatingParentsIfNeeded().forPath("/app4/c1","多级节点".getBytes());
|
查询
byte[] bytes = client.getData().forPath("/app4/c1");
List<String> list = client.getChildren().forPath("/");
Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app1");
|
修改
client.setData().forPath("/app1","app1111".getBytes());
Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app1"); int version = stat.getVersion(); client.setData().withVersion(version).forPath("/app1","乐观锁锁原理".getBytes());
|
删除
client.delete().forPath("/app1");
client.delete().deletingChildrenIfNeeded().forPath("/app4");
client.delete().guaranteed().forPath("/app2");
client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("删除成功后的回调"); System.out.println(event); } }).forPath("/app3");
|
Watch事件监听

NodeCache
NodeCache nodeCache = new NodeCache(client,"/app1");
nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("发生变化"); byte[] data = nodeCache.getCurrentData().getData(); System.out.println("新数据"+new String(data)); } });
nodeCache.start(true);
while (true){}
|
PathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2",true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("字节点发生变化"); System.out.println(event);
PathChildrenCacheEvent.Type type = event.getType(); if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ byte[] data = event.getData().getData(); System.out.println("新数据"+new String(data)); } } });
pathChildrenCache.start();
while (true){}
|
TreeCache
TreeCache treeCache = new TreeCache(client, "/app2");
treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("发生变化");
} });
treeCache.start();
while (true){}
|
分布式锁


单机情况下:多线程可以使用同步代码块或者锁来解决并发问题,但集群不可以,不止一个Jvm
private InterProcessLock lock; public Ticket12306(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("47.115.222.113:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); client.start();
lock = new InterProcessMutex(client,"/lock"); }
lock.acquire(3, TimeUnit.SECONDS);
ock.release();
|
集群

