ZooKeeper
1. 简介
ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。
ZooKeeper通过其简单的架构和API解决了这个问题。ZooKeeper允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。
ZooKeeper框架最初是在“Yahoo!"上构建的,用于以简单而稳健的方式访问他们的应用程序。 后来,Apache ZooKeeper成为Hadoop,HBase和其他分布式框架使用的有组织服务的标准。 例如,Apache HBase使用ZooKeeper跟踪分布式数据的状态。
2. 概念知识
层次命名空间
下图描述了用于内存表示的ZooKeeper文件系统的树结构(ZooKeeper的数据保存形式)。ZooKeeper节点称为 znode 。每个znode由一个名称标识,并用路径(/)序列分隔。
每个znode最多可存储1MB的数据。
Znode的类型
Znode被分为持久(persistent)节点,顺序(sequential)节点和临时(ephemeral)节点。
- 持久节点 - 即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
- 临时节点 - 客户端活跃时,临时节点就是有效的。当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。因此,只有临时节点不允许有子节点。如果临时节点被删除,则下一个合适的节点将填充其位置。临时节点在leader选举中起着重要作用。
- 顺序节点 - 顺序节点可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径。例如,如果将具有路径 /myapp 的znode创建为顺序节点,则ZooKeeper会将路径更改为 /myapp0000000001 ,并将下一个序列号设置为0000000002。如果两个顺序节点是同时创建的,那么ZooKeeper不会对每个znode使用相同的数字。顺序节点在锁定和同步中起重要作用。
Watches(监视)
监视是一种简单的机制,使客户端收到关于ZooKeeper集合中的更改的通知。客户端可以在读取特定znode时设置Watches。Watches会向注册的客户端发送任何znode(客户端注册表)更改的通知。
Znode更改是与znode相关的数据的修改或znode的子项中的更改。只触发一次watches。如果客户端想要再次通知,则必须通过另一个读取操作来完成。当连接会话过期时,客户端将与服务器断开连接,相关的watches也将被删除。
ZooKeeper安装
在安装ZooKeeper之前,请确保你的系统是在以下任一操作系统上运行:
任意Linux OS - 支持开发和部署。适合演示应用程序。
Windows OS - 仅支持开发。
Mac OS - 仅支持开发。
ZooKeeper服务器是用Java创建的,它在JVM上运行。你需要使用JDK 6或更高版本。
现在,按照以下步骤在你的机器上安装ZooKeeper框架。
步骤1:验证Java安装
相信你已经在系统上安装了Java环境。现在只需使用以下命令验证它。
$ java -version
如果你在机器上安装了Java,那么可以看到已安装的Java的版本。否则,请按照以下简单步骤安装最新版本的Java。
步骤1.1:下载JDK
通过访问链接下载最新版本的JDK,并下载最新版本的Java。
步骤1.2:提取文件
通常,文件会下载到download文件夹中。验证并使用以下命令提取tar设置。
$ cd /path/to/download/
$ tar -zxvf jdk-8u181-linux-x64.gz
步骤1.3:移动到/usr/local/jdk目录
要使Java对所有用户可用,请将提取的Java内容移动到“/usr/local/jdk"文件夹。
$ sudo mkdir /usr/local/jdk
$ sudo mv jdk1.8.0_181 /usr/local/jdk
步骤1.4:设置路径
要设置路径和JAVA_HOME变量,请将以下命令添加到〜/.bashrc文件中。
export JAVA_HOME=/usr/local/jdk/jdk1.8.0_181
export PATH=$PATH:$JAVA_HOME/bin
现在,将所有更改应用到当前运行的系统中。
$ source ~/.bashrc
步骤1.5
使用步骤1中说明的验证命令(java -version)验证Java安装。
步骤2:ZooKeeper框架安装
步骤2.1:下载ZooKeeper
要在你的计算机上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper。
http://zookeeper.apache.org/releases.html
到目前为止,最新版本的ZooKeeper是3.4.12(ZooKeeper-3.4.12.tar.gz)。
步骤2.2:提取tar文件
使用以下命令提取tar文件
$ cd /path/to/download/
$ tar -zxvf zookeeper-3.4.12.tar.gz
$ cd zookeeper-3.4.12
$ mkdir data
步骤2.3:创建配置文件
使用命令 vi conf/zoo.cfg 和所有以下参数设置为起点,打开名为 conf/zoo.cfg 的配置文件。
$ vi conf/zoo.cfg
tickTime = 2000
dataDir = /path/to/zookeeper/data
clientPort = 2181
一旦成功保存配置文件,再次返回终端。你现在可以启动zookeeper服务器。
步骤2.4:启动ZooKeeper服务器
执行以下命令
$ bin/zkServer.sh start
执行此命令后,你将收到以下响应
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.12/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED
步骤2.5:启动CLI
键入以下命令
$ bin/zkCli.sh
键入上述命令后,将连接到ZooKeeper服务器,你应该得到以下响应。
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
停止ZooKeeper服务器
连接服务器并执行所有操作后,可以使用以下命令停止zookeeper服务器。
$ bin/zkServer.sh stop
Kazoo
kazoo是Python连接操作ZooKeeper的客户端库。我们可以通过kazoo来使用ZooKeeper。
1. 安装
pip install kazoo
2. 使用
连接ZooKeeper
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
# 启动连接
zk.start()
# 停止连接
zk.stop()
创建节点
# 创建节点路径,但不能设置节点数据值
zk.ensure_path("/my/favorite")
# 创建节点,并设置节点保存数据,ephemeral表示是否是临时节点,sequence表示是否是顺序节点
zk.create("/my/favorite/node", b"a value", ephemeral=True, sequence=True)
读取节点
# 获取子节点列表
children = zk.get_children("/my/favorite")
# 获取节点数据data 和节点状态stat
data, stat = zk.get("/my/favorite")
设置监视
def my_func(event):
# 检查最新的节点数据
# 当子节点发生变化的时候,调用my_func
children = zk.get_children("/my/favorite/node", watch=my_func)
server端
import threading
from kazoo.client import KazooClient
class ThreadServer(object):
def __init__(self, host, port, handlers):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
self.sock.bind((host, port))
self.handlers = handlers
def serve(self):
"""
开始服务
"""
self.sock.listen(128)
self.register_zk()
print("开始监听")
while True:
conn, addr = self.sock.accept()
print("建立链接%s" % str(addr))
t = threading.Thread(target=self.handle, args=(conn,))
t.start()
def handle(self, client):
stub = ServerStub(client, self.handlers)
try:
while True:
stub.process()
except EOFError:
print("客户端关闭连接")
client.close()
def register_zk(self):
"""
注册到zookeeper
"""
self.zk = KazooClient(hosts='127.0.0.1:2181')
self.zk.start()
self.zk.ensure_path('/rpc') # 创建根节点
value = json.dumps({'host': self.host, 'port': self.port)
# 创建服务子节点
self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)
client端
from services import ThreadServer
from services import InvalidOperation
import sys
class Handlers:
@staticmethod
def divide(num1, num2=1):
"""
除法
:param num1:
:param num2:
:return:
"""
if num2 == 0:
raise InvalidOperation()
val = num1 / num2
return val
if __name__ == '__main__':
if len(sys.argv) < 3:
print("usage:python server.py [host] [port]")
exit(1)
host = sys.argv[1]
port = sys.argv[2]
server = ThreadServer(host, int(port), Handlers)
server.serve()
server改写
import random
import time
class DistributedChannel(object):
def __init__(self):
self._zk = KazooClient(hosts='127.0.0.1:2181')
self._zk.start()
self._get_servers()
def _get_servers(self, event=None):
"""
从zookeeper获取服务器地址信息列表
"""
servers = self._zk.get_children('/rpc', watch=self._get_servers)
print(servers)
self._servers = []
for server in servers:
data = self._zk.get('/rpc/' + server)[0]
addr = json.loads(data)
self._servers.append(addr)
def _get_server(self):
"""
随机选出一个可用的服务器
"""
return random.choice(self._servers)
def get_connection(self):
"""
提供一个可用的tcp连接
"""
while True:
server = self._get_server()
print(server)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server['host'], server['port']))
except ConnectionRefusedError:
time.sleep(1)
continue
else:
break
return sock
client改写
from services import ClientStub
from services import DistributedChannel
from services import InvalidOperation
import time
channel = DistributedChannel()
for i in range(50):
try:
stub = ClientStub(channel)
val = stub.divide(i)
except InvalidOperation as e:
print(e.message)
else:
print(val)
time.sleep(1)
测试完整代码
import json
from kazoo.client import KazooClient
zk = KazooClient(hosts="192.168.218.136:2181")
# 启动连接
zk.start()
# 创建节点路径,但不能设置节点数据值
zk.ensure_path("/rpc")
addr1 = {"host": "127.0.0.1", "port": 8001}
addr1_str = json.dumps(addr1)
# 创建节点,并设置节点保存数据,ephemeral表示是否是临时节点,sequence表示是否是顺序节点
zk.create("/rpc/server", addr1_str.encode(), ephemeral=True, sequence=True)
addr2 = {"host": "127.0.0.1", "port": 8002}
addr2_str = json.dumps(addr2)
zk.create("/rpc/server", addr2_str.encode(), ephemeral=True, sequence=True)
# 获取子节点列表
children = zk.get_children("/rpc")
# 获取节点数据data 和节点状态stat
for i in zk.get_children("/rpc"):
print(zk.get("/rpc/"+i)[0])
def on_change(event):
print(event)
# 设置监视, 该监视只触发一次
servers = zk.get_children("/rpc", watch=on_change)
# 停止连接
zk.stop()
service.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Wjy
import struct
from io import BytesIO
import socket
import threading
import json
import random
import time
from kazoo.client import KazooClient
class InvalidOperation(Exception):
def __init__(self, message=None):
self.message = message or "invalid operation"
class MethodProtocol(object):
"""
解读方法名
"""
def __init__(self, connection):
self.conn = connection
def _read_all(self, size):
"""
帮助我们读取二进制数据
:param size: 想要读取的二进制数据大小
:return: 二进制数据 bytes
"""
# self.conn
# 读取二进制数据
# socket.recv(4) => ?4
# BytesIO.read
if isinstance(self.conn, BytesIO):
buff = self.conn.read(size)
return buff
else:
# socket
have = 0
buff = b""
while have < size:
chunk = self.conn.recv(size - have)
buff += chunk
l = len(chunk)
have += l
if l == 0:
# 表示客户端socket关闭了
raise EOFError()
return buff
def get_method_name(self):
"""
提供方法名
:return: str 方法名
"""
# 读取字符串长度
buff = self._read_all(4)
length = struct.unpack("!I", buff)[0]
# 读取字符串
buff = self._read_all(length)
name = buff.decode()
return name
class DivideProtocol(object):
"""
divide过程消息协议转换工具
"""
def args_encode(self, num1, num2=1):
"""
将原始的调用请求参数转换打包成二进制消息数据
:param num1: int
:param num2: int
:return: bytes 二进制消息shuju
"""
name = "divide"
# 处理方法的名字 字符串
# 处理字符串的长度
buff = struct.pack("!I", 6)
# 处理字符
buff += name.encode()
# 处理参数1
# 处理序号
buff2 = struct.pack("!B", 1)
# 处理参数值
buff2 += struct.pack("!i", num1)
# 处理参数2
if num2 != 1:
# 处理序号
buff2 += struct.pack("!B", 2)
# 处理参数值
buff2 += struct.pack("!i", num2)
# 处理消息长度,边界固定
length = len(buff2)
buff += struct.pack("!I", length)
buff += buff2
return buff
def _read_all(self, size):
"""
帮助我们读取二进制数据
:param size: 想要读取的二进制数据大小
:return: 二进制数据 bytes
"""
# self.conn
# 读取二进制数据
# socket.recv(4) => ?4
# BytesIO.read
if isinstance(self.conn, BytesIO):
buff = self.conn.read(size)
return buff
else:
# socket
have = 0
buff = b""
while have < size:
chunk = self.conn.recv(size - have)
buff += chunk
l = len(chunk)
have += l
if l == 0:
# 表示客户端socket关闭了
raise EOFError()
return buff
def args_decode(self, connection):
"""
接受调用请求消息数据并进行解析
:param connection: 连接对象 socket BytesIO
:return: dict 包含了解析之后的参数
"""
param_len_map = {
1: 4,
2: 4,
}
param_fmt_map = {
1: "!i",
2: "!i",
}
param_name_map = {
1: "num1",
2: "num2"
}
# 保存用来返回的参数
# args = {"num1": xxx, "num2": xxx}
args = {
}
self.conn = connection
# 处理方法的名已经提前被处理(稍后实现)
# 处理消息边界
# 读取二进制数据
# socket.recv(4) => ?4
# BytesIO.read
buff = self._read_all(4)
# 将二进制数据转换为python数据类型
length = struct.unpack("!I", buff)[0]
# 已经读取处理的字节数
have = 0
# 处理第一个参数
# 处理参数序号
buff = self._read_all(1)
have += 1
param_seg = struct.unpack("!B", buff)[0]
# 处理参数值
param_len = param_len_map[param_seg]
buff = self._read_all(param_len)
have += param_len
param_fmt = param_fmt_map[param_seg]
param = struct.unpack(param_fmt, buff)[0]
param_name = param_name_map[param_seg]
args[param_name] = param
if have >= length:
return args
# 处理第二个参数
# 处理参数序号
buff = self._read_all(1)
param_seg = struct.unpack("!B", buff)[0]
# 处理参数值
param_len = param_len_map[param_seg]
buff = self._read_all(param_len)
param_fmt = param_fmt_map[param_seg]
param = struct.unpack(param_fmt, buff)[0]
param_name = param_name_map[param_seg]
args[param_name] = param
return args
def result_encode(self, result):
"""
将原始结果数据转换为消息协议二进制数据
:param result: 原始结果数据 float InvalidOperation
:return: bytes 消息协议二进制数据
"""
# 正常
if isinstance(result, float):
# 处理返回值类型
buff = struct.pack("!B", 1)
buff += struct.pack("!f", result)
return buff
# 异常
else:
# 处理返回值类型
buff = struct.pack("!B", 2)
# 处理返回值
length = len(result.message)
# 处理字符串长度
buff += struct.pack("!I", length)
# 处理字符
buff += result.message.encode()
return buff
def result_decode(self, connection):
"""
将返回值消息数据转换为原始返回值
:param connection: socket BytesIO
:return: float InvalidOperation对象
"""
self.conn = connection
# 处理返回值类型
buff = self._read_all(1)
result_type = struct.unpack("!B", buff)[0]
if result_type == 1:
# 正常
# 读取float数量
buff = self._read_all(4)
val = struct.unpack("!f", buff)[0]
return val
else:
# 异常
# 读取字符串的长度
buff = self._read_all(4)
length = struct.unpack("!I", buff)[0]
# 读取字符串
buff = self._read_all(length)
message = buff.decode()
return InvalidOperation(message)
class Channel(object):
"""
用户客户端建立网络连接
"""
def __init__(self, host, port):
"""
:param host: 服务器地址
:param port: 服务器端口号
"""
self.host = host
self.port = port
def get_connection(self):
"""
获取连接对象
:return: 与服务器通讯的socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
return sock
class DistributedChannel(object):
"""
支持分布式的zookeeper的RPC客户端通讯连接工具
"""
def __init__(self):
# 创建kazoo对象,用来跟zookeeper连接,获取信息
self.zk = KazooClient("192.168.218.136:2181")
self.zk.start()
self._servers = []
self._get_servers()
def _get_servers(self, event=None):
"""
从zookeeper中获取所有可用的RPC服务器地址信息
:return:
"""
self._servers = []
# 从zookeeper中获取/rpc节点下所有可用的rpc服务器节点
servers = self.zk.get_children("/rpc", watch=self._get_servers)
# 遍历节点,获取服务器的地址信息
for server in servers:
addr_data = self.zk.get("/rpc/" + server)[0]
addr = json.loads(addr_data)
self._servers.append(addr)
def _get_server(self):
"""
从可用的服务器列表中选出一台服务器
:return:
"""
return random.choice(self._servers)
def get_connection(self):
"""
提供一个具体的与RPC服务器的连接socket
:return:
"""
while True:
addr = self._get_server()
print(addr)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((addr["host"], addr["port"]))
except ConnectionRefusedError:
time.sleep(1)
continue
else:
return sock
class Server(object):
"""
RPC服务器
"""
def __init__(self, host, port, handlers):
self.host = host
self.port = port
# 创建socket的工具对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址
sock.bind((self.host, self.port))
self.sock = sock
self.handlers = handlers
def serve(self):
"""
开启服务器运行,提供RPC服务
:return:
"""
# 开启服务器的监听,等待客户端的连接请求
self.sock.listen(128)
print("服务器开始监听")
# 接收客户端的连接请求
while True:
client_sock, client_addr = self.sock.accept()
print("与客户端%s建立了连接" % str(client_addr))
# 交给ServerStub,完成客户端的具体的RPC调用请求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
stub.process()
except EOFError:
# 表示客户端关闭了连接
print("客户端关闭了连接")
client_sock.close()
class ThreadServer(object):
"""
多线程RPC服务器
"""
def __init__(self, host, port, handlers):
# 创建socket的工具对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置socket
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址
sock.bind((host, port))
self.host = host
self.port = port
self.sock = sock
self.handlers = handlers
def serve(self):
"""
开启服务器运行,提供RPC服务
:return:
"""
# 开启服务器的监听,等待客户端的连接请求
self.sock.listen(128)
print("服务器开始监听")
# 注册到zookeeper
self.regoster_zookeeper()
# 接收客户端的连接请求
while True:
client_sock, client_addr = self.sock.accept()
print("与客户端%s建立了连接" % str(client_addr))
# 创建子线程处理这个客户端
t = threading.Thread(target=self.handle, args=(client_sock,))
# 开启子线程执行
t.start()
def regoster_zookeeper(self):
"""
在zookeeper中心中注册本服务器的地址信息
:return:
"""
# 创建kazoo客户端
zk = KazooClient("192.168.218.136:2181")
# 建立与zookeeper的连接
zk.start()
# 在zookeeper中创建节点保存数据
zk.ensure_path("/rpc")
data = json.dumps({"host": self.host, "port": self.port})
# 在zookeeper上存储服务器地址及端口,设置成临时节点,顺序节点
zk.create("/rpc/server", data.encode(), ephemeral=True, sequence=True)
def handle(self, client_sock):
"""
子线程调用的方法,用来处理一个客户端的请求
:return:
"""
# 交给ServerStub,完成客户端的具体的RPC调用请求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
stub.process()
except EOFError:
# 表示客户端关闭了连接
print("客户端关闭了连接")
client_sock.close()
class ClientStub(object):
"""
用来帮助客户端完成远程过程调用 RPC调用
stub = ClientStub()
stib.divide(200)
"""
def __init__(self, channel):
self.channel = channel
self.conn = self.channel.get_connection()
def divide(self, num1, num2=1):
# 将调用的参数打包成消息协议的数据
proto = DivideProtocol()
args = proto.args_encode(num1, num2)
# 将消息数据通过网络发送给服务器
self.conn.sendall(args)
# 接收服务器返回的返回值消息数据,并进行解析
result = proto.result_decode(self.conn)
# 将结果值(正常float 或 异常InvalidOperation)返回给客户端
if isinstance(result, float):
# 正常
return result
else:
# 异常
raise result
def add(self):
pass
class ServerStub:
"""
帮助服务端完成远程过程调用
"""
def __init__(self, connection, handlers):
"""
:param connection: 与客户端的连接
:param handlers: 真正本地被调用方法(函数 过程)
class Handlers:
@staticmethod
def divide(num1, num2=1):
pass
def add():
pass
"""
self.conn = connection
self.method_proto = MethodProtocol(self.conn)
self.process_map = {
"divide": self._process_divide,
"add": self._process_add,
}
self.handlers = handlers
def process(self):
"""
当服务端接收了一个客户端的连接,建立好连接后,完成远端调用处理
:return:
"""
# 接收消息数据,并解析方法的名字
name = self.method_proto.get_method_name()
# 根据机械获得的方法(过程)名,调用相应的过程协议,接收并解析消息数据
# self.process_map[name]()
_process = self.process_map[name]
_process()
def _process_divide(self):
"""
处理除法过程调用
:return:
"""
# 创建用于除法过程调用参数协议解析的工具
proto = DivideProtocol()
# 解析调用消息参数
args = proto.args_decode(self.conn)
# args = {"num1": xxx, "num2": xxx}
# 进行除法的本地过程调用
# 将本地调用过程的返回值(包括可能的异常)打包成消息协议数据,通过网络返回给客户端
try:
val = self.handlers.divide(**args)
except InvalidOperation as e:
ret_message = proto.result_encode(e)
else:
ret_message = proto.result_encode(val)
self.conn.sendall(ret_message)
def _process_add(self):
pass
if __name__ == '__main__':
# 狗贼消息数据
proto = DivideProtocol()
# divide(200, 100)
# message = proto.args_encode(200, 100)
# divide(200)
message = proto.args_encode(200)
conn = BytesIO()
conn.write(message)
conn.seek(0)
# 解析消息数据
method = MethodProtocol(conn)
name = method.get_method_name()
print(name)
args = proto.args_decode(conn)
print(args)
client.py
from service import ClientStub
from service import Channel
from service import InvalidOperation
from service import DistributedChannel
import time
# 创建与服务器的连接
# channel = Channel("127.0.0.1", 8000)
channel = DistributedChannel()
# 运行调用
for i in range(50):
try:
# 创建用于RPC调用的工具
stub = ClientStub(channel)
val = stub.divide(i * 100, 50)
except InvalidOperation as e:
print(e.message)
except Exception as e:
print(e)
else:
print(val)
time.sleep(1)
使用.py
from service import InvalidOperation
from service import Server, ThreadServer
import sys
class Handlers:
@staticmethod
def divide(num1, num2=1):
"""
除法
:param num1: int
:param num2: int
:return:
"""
if num2 == 0:
raise InvalidOperation()
val = num1 / num2
return val
if __name__ == '__main__':
# 开启服务器
# _server = Server("127.0.0.1", 8000, Handlers)
# _server.serve()
# 从启动命令中提取服务器运行的ip地址和端口号
host = sys.argv[1]
port = sys.argv[2]
_server = ThreadServer(host, int(port), Handlers)
_server.serve()