37 Zookeeper

lix_uan
• 阅读 1127

Zookeeper入门

Zookeeper工作机制

37 Zookeeper

Zookeeper特点

37 Zookeeper

  • 一个领导者(Leader),多个跟随者(Follower)组成的集群
  • 集群中只要有半数以上的节点存活,Zookeeper集群就能正常服务
  • 每个Server保存一份相同数据的副本,Client无论连接到哪个Server,数据都是一致的
  • 来自同一个Client的更新请求按其发送顺序依次执行
  • 数据更新原子性,一次数据更新要么成功,要么失败
  • 实时性,在一定时间范围内,Client能读到最新数据

数据结构

37 Zookeeper

  • 整体上可以看作一棵树
  • 每个节点称作一个ZNode,每个ZNode默认能够存储1MB的数据
  • 每个ZNode都可以通过其路径唯一标识

Zookeeper应用场景

统一命名服务

37 Zookeeper

统一配置管理

37 Zookeeper

统一集群管理

37 Zookeeper

服务器动态上下线

37 Zookeeper

软负载均衡

37 Zookeeper

Zookeeper安装

集群规划

  • node01,node02,node03都部署Zookeeper

解压安装

tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/
xsync zookeeper-3.5.7/

配置服务编号

# 在/opt/module/zookeeper-3.5.7/这个目录下创建zkData
mkdir -p zkData

# 在/opt/module/zookeeper-3.5.7/zkData目录下创建一个myid的文件
touch myid

# 编辑myid文件,添加对应的编号(1,2,3)

xsync zookeeper-3.5.7/

配置zoo.cfg文件

# 重命名/opt/module/zookeeper-3.5.7/conf这个目录下的zoo_sample.cfg为zoo.cfg
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg

# 修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/zkData

# 增加如下配置
# 2888是Follower与Leader服务器交换信息的端口
# 3888是Leader挂了后用来执行选举时服务器互相通信的端口

#######################cluster##########################
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

# 同步zoo.cfg配置文件
xsync zoo.cfg

分别启动Zookeeper

bin/zkServer.sh start
# 查看启动状态
bin/zkServer.sh status

客户端命令行操作

# 启动客户端
bin/zkCli.sh

# 查看当前znode中所包含的内容
ls /

# 查看当前节点详细数据
ls -s /

# 分别创建两个普通节点
create /sanguo "diaochan"
create /sanguo/shuguo "liubei"

# 获得节点的值
get /sanguo
get -s /sanguo

# 创建临时节点,临时节点退出客户端后会自动删除
create -e /sanguo/wuguo "zhouyu"

# 创建带序号的节点
create -s /sanguo/weiguo "caocao"

# 修改节点的数据
set /sanguo/weiguo "caopi"

# 节点的值变化监听,注册监听/sanguo节点数据变化
get -w /sanguo

# 节点的子节点变化监听(路径变化)
ls -w /sanguo

# 删除节点
delete /sanguo/jin

# 递归删除节点
deleteall /sanguo/shuguo

# 查看节点状态
stat /sanguo

API应用

添加pom.xml文件

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>
</dependencies>

src/main/resources目录下新建log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

初始化ZooKeeper客户端

public class Zookeeper {

    private String connectString;
    private int sessionTimeout;
    private ZooKeeper zkClient;

    @Before   //获取客户端对象
public void init() throws IOException {

        connectString = "node01:2181,node02:2181,node03:2181";
        sessionTimeout = 10000;

       //参数解读 1集群连接字符串  2连接超时时间 单位:毫秒  3当前客户端默认的监控器
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }

    @After //关闭客户端对象
    public void close() throws InterruptedException {
        zkClient.close();
    }
}

获取子节点列表,不监听

@Test
public void ls() throws IOException, KeeperException, InterruptedException {
    //用客户端对象做各种操作
    List<String> children = zkClient.getChildren("/", false);
    System.out.println(children);
}

获取子节点列表,并监听

@Test
public void lsAndWatch() throws KeeperException, InterruptedException {
    List<String> children = zkClient.getChildren("/lixuan", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event);
        }
    });
System.out.println(children);

    //因为设置了监听,所以当前线程不能结束
    Thread.sleep(Long.MAX_VALUE);
}

创建子节点

@Test
public void create() throws KeeperException, InterruptedException {
//参数解读 1节点路径  2节点存储的数据  
//3节点的权限(使用Ids选个OPEN即可) 4节点类型 短暂 持久 短暂带序号 持久带序号
     String path = zkClient.create("/lixuan", "lixuan".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    //创建临时节点
//String path = zkClient.create("/atguigu2", "shanguigu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

System.out.println(path);

    //创建临时节点的话,需要线程阻塞
    //Thread.sleep(10000);
}

判断Znode是否存在

@Test
public void exist() throws Exception {

    Stat stat = zkClient.exists("/atguigu", false);

    System.out.println(stat == null ? "not exist" : "exist");
}

获取子节点的数据,不监听

@Test
public void get() throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists("/lixuan", false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }

    byte[] data = zkClient.getData("/lixuan", false, stat);
    System.out.println(new String(data));
}

获取子节点数据,并监听

@Test
public void getAndWatch() throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists("/atguigu", false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }

    byte[] data = zkClient.getData("/atguigu", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println(event);
        }
    }, stat);
    System.out.println(new String(data));
    //线程阻塞
    Thread.sleep(Long.MAX_VALUE);
}

设置节点的值

@Test
public void set() throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists("/lixuan", false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }
    //参数解读 1节点路径 2节点的值 3版本号
    zkClient.setData("/lixuan", "lixuan".getBytes(), stat.getVersion());
}

递归删除非空节点

//封装一个方法,方便递归调用
public void deleteAll(String path, ZooKeeper zk) throws KeeperException, InterruptedException {
    //判断节点是否存在
    Stat stat = zkClient.exists(path, false);
    if (stat == null) {
        System.out.println("节点不存在...");
        return;
    }
    //先获取当前传入节点下的所有子节点
    List<String> children = zk.getChildren(path, false);
    if (children.isEmpty()) {
        //说明传入的节点没有子节点,可以直接删除
        zk.delete(path, stat.getVersion());
    } else {
        //如果传入的节点有子节点,循环所有子节点
        for (String child : children) {
            //删除子节点,但是不知道子节点下面还有没有子节点,所以递归调用
            deleteAll(path + "/" + child, zk);  
        }
        //删除完所有子节点以后,记得删除传入的节点
        zk.delete(path, stat.getVersion());
    }
}
//测试deleteAll
@Test
public void testDeleteAll() throws KeeperException, InterruptedException {
    deleteAll("/lixuan",zkClient);
}

Zookeeper内部原理

节点类型

  • 持久(Persistent)
  • 短暂(Ephemeral)
  • 说明
    • 创建znode时设置顺序标识,znode名称后会附加一个值,顺序时一个单调递增计数器,由父节点维护
    • 顺序号可以用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

Stat结构体

  • czxid 创建节点的事务zxid:
    • 每次修改都有唯一的zxid,zxid小说明事件更早发生
  • dataversion 数据变化号
  • dataLength 数据的长度
  • numChildren 子节点数量

监听器原理

37 Zookeeper

选举机制

  • 集群中半数以上的机器存活,集群可用。所以Zookeeper适合安装奇数台服务器

37 Zookeeper

  • 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING

  • 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的ID比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING

  • 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING

  • 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING

  • 服务器5启动,同4一样当小弟

写数据流程

37 Zookeeper

点赞
收藏
评论区
推荐文章

暂无数据

lix_uan
lix_uan
Lv1
学无止境,即刻前行
文章
7
粉丝
7
获赞
0