IDEA环境
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
服务器动态上下线监听案例
先在集群创建/servers节点
create /servers "servers"
服务端向Zookeeper注册代码
import org.apache.zookeeper.*;
public class Server {
private String connectString;
private int sessionTimeOut;
private ZooKeeper zooKeeper;
private String parentNode = "/servers";
//创建客户端连接
public void getConnect() throws Exception{
connectString = "node01:2181,node02:22181,node03:2181";
sessionTimeOut = 10000;
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
//注册服务器
public void registerServer(String hostname) throws Exception {
String create = zooKeeper.create(parentNode +"/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online " + create);
}
//业务功能
public void business(String hostname) throws Exception {
System.out.println(hostname + " is working...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
//获取连接
Server server = new Server();
server.getConnect();
//注册服务信息
server.registerServer(args[0]);
//启动业务功能
server.business(args[0]);
}
}
客户端代码
import org.apache.zookeeper.*;
import java.util.ArrayList;
import java.util.List;
public class Client {
private String connectString;
private int sessionTimeOut;
private ZooKeeper zooKeeper;
private String parentNode = "/servers";
//创建客户端连接
public void getConnect() throws Exception{
connectString = "node01:2181,node02:22181,node03:2181";
sessionTimeOut = 10000;
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
//获取服务器列表信息
public void getServerList() throws Exception {
//获取服务器子节点信息,并对父节点进行监听
List<String> children = zooKeeper.getChildren(parentNode, true);
//存储服务器信息列表
ArrayList<String> servers = new ArrayList<String>();
//遍历所有节点,获取节点中的主机名称
for (String child : children) {
byte[] data = zooKeeper.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
//打印服务器列表信息
System.out.println(servers);
}
//业务功能
public void business() throws Exception {
System.out.println("client is working...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
//获取zk连接
Client client = new Client();
client.getConnect();
//获取servers的子节点信息,从中获取服务器列表
client.getServerList();
//业务进程启动
client.business();
}
}
测试
启动客户端
在node01上zk客户端的/servers目录上创建临时序列化节点,观察控制台变化
create -e -s /servers/node02 "node02" create -e -s /servers/node03 "node03" delete /servers/node020000000000
启动服务端
在IDEA运行配置中加入参数 node02
运行main方法,查看控制台
选举机制
节点信息说明
SID
:服务器ID,作为服务器的唯一标识,不能重复ZXID
:事务ID,记录了修改的顺序,如果zxid1小于zxid2,那么zxid1在zxid2之前发生Epoch
:每个Leader任期的代号,初始为0,每投一次票这个数就会增加
第一次启动
- 假设有5台机器,myid分别为1,2,...,5
- 服务器1启动,投自己一票,状态为LOOKING
- 服务器2启动,投自己一票,状态为LOOKING
- 服务器1与服务器2交换选票信息
- 由于服务器2的myid大于1的myid,1把票投给2
- 此时服务器1 0票,服务器2 2票
- 服务器3启动,按上述逻辑,服务器1,2 0票,服务器3 3票
- 由于服务器3票数过半,状态变为LEADER,1,2状态改为FLLOWING
- 服务器4启动,由于已经有LEADER了,所以服务器4状态变为FLLOWING
- 服务器5和服务器4一样
非第一次启动
- 如果存在LEADER,只需要和LEADER建立连接同步状态即可
- 如果集群中不存在LEADER,则按照以下规则进行选举
- EPOCH大的直接胜出
- EPOCH相同,ZXID大的胜出
- ZXID也相同,SID(或myid,两个数值一样)大的胜出
监听器原理
- 首先要有一个main()线程
- main线程中创建Zookeeper客户端,这时会创建两个线程
- 一个负责通信(connect),一个负责监听(listener)
- connect线程将注册的监听事件发送给Zookeeper
- 将注册的监听事件添加到Zookeeper的监听器列表中
- Zookeeper监听到数据或路径有变化,就会向listener线程发送消息
- listener内部重写process方法来响应变化
写数据流程
- 假设集群有三台机器,follower1,follower2,leader
- Client向follower1发送一个写的请求
- follower1会进一步把请求转发给leader
- leader收到请求后,将请求广播给各个follower
- 各个follower收到请求后会将请求加入写队列,并向leader发送成功信息
- 当leader收到半数以上的成功信息,说明写操作可以执行
- leader会向各个follower发送提交信息
- 各个follower收到消息后会将队列里的写请求执行,此时写成功
集群安装多少台zk合适
经验
10台服务器:3台zk
20台服务器:5台zk
100台服务器:11台zk
200台服务器:11台zk
为什么不装更多zk
- zk多,可以提高可靠性,但过多的zk会提高通信延迟
为什么要装奇数台
- 3台时,2台坏了就不能用了
- 4台时,2台坏了就不能用了
- 5台时,3台坏了就不能用了
- 6台时,3台坏了就不能用了
- ......
- 可以发现2n和2n-1的容忍度是一样的,没有必要多加一台zk