python调用zookeeper

隔壁老王
• 阅读 2123

ZooKeeper

1. 简介

ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。

ZooKeeper通过其简单的架构和API解决了这个问题。ZooKeeper允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。

ZooKeeper框架最初是在“Yahoo!"上构建的,用于以简单而稳健的方式访问他们的应用程序。 后来,Apache ZooKeeper成为Hadoop,HBase和其他分布式框架使用的有组织服务的标准。 例如,Apache HBase使用ZooKeeper跟踪分布式数据的状态。

python调用zookeeper

2. 概念知识

层次命名空间

下图描述了用于内存表示的ZooKeeper文件系统的树结构(ZooKeeper的数据保存形式)。ZooKeeper节点称为 znode 。每个znode由一个名称标识,并用路径(/)序列分隔。

每个znode最多可存储1MB的数据。

python调用zookeeper

Znode的类型

Znode被分为持久(persistent)节点,顺序(sequential)节点和临时(ephemeral)节点。

  • 持久节点 - 即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
  • 临时节点 - 客户端活跃时,临时节点就是有效的。当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。因此,只有临时节点不允许有子节点。如果临时节点被删除,则下一个合适的节点将填充其位置。临时节点在leader选举中起着重要作用。
  • 顺序节点 - 顺序节点可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径。例如,如果将具有路径 /myapp 的znode创建为顺序节点,则ZooKeeper会将路径更改为 /myapp0000000001 ,并将下一个序列号设置为0000000002。如果两个顺序节点是同时创建的,那么ZooKeeper不会对每个znode使用相同的数字。顺序节点在锁定和同步中起重要作用。

Watches(监视)

监视是一种简单的机制,使客户端收到关于ZooKeeper集合中的更改的通知。客户端可以在读取特定znode时设置Watches。Watches会向注册的客户端发送任何znode(客户端注册表)更改的通知。

Znode更改是与znode相关的数据的修改或znode的子项中的更改。只触发一次watches。如果客户端想要再次通知,则必须通过另一个读取操作来完成。当连接会话过期时,客户端将与服务器断开连接,相关的watches也将被删除。

ZooKeeper安装

在安装ZooKeeper之前,请确保你的系统是在以下任一操作系统上运行:

任意Linux OS - 支持开发和部署。适合演示应用程序。

Windows OS - 仅支持开发。

Mac OS - 仅支持开发。

ZooKeeper服务器是用Java创建的,它在JVM上运行。你需要使用JDK 6或更高版本。

现在,按照以下步骤在你的机器上安装ZooKeeper框架。

步骤1:验证Java安装

相信你已经在系统上安装了Java环境。现在只需使用以下命令验证它。

$ java -version

如果你在机器上安装了Java,那么可以看到已安装的Java的版本。否则,请按照以下简单步骤安装最新版本的Java。

步骤1.1:下载JDK

通过访问链接下载最新版本的JDK,并下载最新版本的Java

步骤1.2:提取文件

通常,文件会下载到download文件夹中。验证并使用以下命令提取tar设置。

$ cd /path/to/download/
$ tar -zxvf jdk-8u181-linux-x64.gz  

步骤1.3:移动到/usr/local/jdk目录

要使Java对所有用户可用,请将提取的Java内容移动到“/usr/local/jdk"文件夹。

$ sudo mkdir /usr/local/jdk
$ sudo mv jdk1.8.0_181 /usr/local/jdk

步骤1.4:设置路径

要设置路径和JAVA_HOME变量,请将以下命令添加到〜/.bashrc文件中。

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_181
export PATH=$PATH:$JAVA_HOME/bin

现在,将所有更改应用到当前运行的系统中。

$ source ~/.bashrc

步骤1.5

使用步骤1中说明的验证命令(java -version)验证Java安装。

步骤2:ZooKeeper框架安装

步骤2.1:下载ZooKeeper

要在你的计算机上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper。

http://zookeeper.apache.org/releases.html

到目前为止,最新版本的ZooKeeper是3.4.12(ZooKeeper-3.4.12.tar.gz)。

步骤2.2:提取tar文件

使用以下命令提取tar文件

$ cd /path/to/download/
$ tar -zxvf zookeeper-3.4.12.tar.gz
$ cd zookeeper-3.4.12
$ mkdir data

步骤2.3:创建配置文件

使用命令 vi conf/zoo.cfg 和所有以下参数设置为起点,打开名为 conf/zoo.cfg 的配置文件。

$ vi conf/zoo.cfg

tickTime = 2000
dataDir = /path/to/zookeeper/data
clientPort = 2181

一旦成功保存配置文件,再次返回终端。你现在可以启动zookeeper服务器。

步骤2.4:启动ZooKeeper服务器

执行以下命令

$ bin/zkServer.sh start

执行此命令后,你将收到以下响应

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.12/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

步骤2.5:启动CLI

键入以下命令

$ bin/zkCli.sh

键入上述命令后,将连接到ZooKeeper服务器,你应该得到以下响应。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

停止ZooKeeper服务器

连接服务器并执行所有操作后,可以使用以下命令停止zookeeper服务器。

$ bin/zkServer.sh stop

Kazoo

kazoo是Python连接操作ZooKeeper的客户端库。我们可以通过kazoo来使用ZooKeeper。

1. 安装

pip install kazoo

2. 使用

连接ZooKeeper

from kazoo.client import KazooClient

zk = KazooClient(hosts='127.0.0.1:2181')

# 启动连接
zk.start() 

# 停止连接
zk.stop()

创建节点

# 创建节点路径,但不能设置节点数据值
zk.ensure_path("/my/favorite")

# 创建节点,并设置节点保存数据,ephemeral表示是否是临时节点,sequence表示是否是顺序节点
zk.create("/my/favorite/node", b"a value", ephemeral=True, sequence=True)

读取节点

# 获取子节点列表
children = zk.get_children("/my/favorite")

# 获取节点数据data 和节点状态stat
data, stat = zk.get("/my/favorite")

设置监视

def my_func(event):
    # 检查最新的节点数据

# 当子节点发生变化的时候,调用my_func
children = zk.get_children("/my/favorite/node", watch=my_func)

server端

import threading
from kazoo.client import KazooClient

class ThreadServer(object):
    def __init__(self, host, port, handlers):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        self.sock.bind((host, port))
        self.handlers = handlers

    def serve(self):
        """
        开始服务
        """
        self.sock.listen(128)
        self.register_zk()
        print("开始监听")
        while True:
            conn, addr = self.sock.accept()
            print("建立链接%s" % str(addr))
            t = threading.Thread(target=self.handle, args=(conn,))
            t.start()

    def handle(self, client):
        stub = ServerStub(client, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            print("客户端关闭连接")

        client.close()

    def register_zk(self):
        """
        注册到zookeeper
        """
        self.zk = KazooClient(hosts='127.0.0.1:2181')
        self.zk.start()
        self.zk.ensure_path('/rpc')  # 创建根节点
        value = json.dumps({'host': self.host, 'port': self.port)
        # 创建服务子节点
        self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)

client端

from services import ThreadServer
from services import InvalidOperation
import sys


class Handlers:
    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1:
        :param num2:
        :return:
        """
        if num2 == 0:
            raise InvalidOperation()
        val = num1 / num2
        return val


if __name__ == '__main__':
    if len(sys.argv) < 3:
        print("usage:python server.py [host] [port]")
        exit(1)
    host = sys.argv[1]
    port = sys.argv[2]
    server = ThreadServer(host, int(port), Handlers)
    server.serve()

server改写

import random
import time

class DistributedChannel(object):
    def __init__(self):
        self._zk = KazooClient(hosts='127.0.0.1:2181')
        self._zk.start()
        self._get_servers()

    def _get_servers(self, event=None):
        """
        从zookeeper获取服务器地址信息列表
        """
        servers = self._zk.get_children('/rpc', watch=self._get_servers)
        print(servers)
        self._servers = []
        for server in servers:
            data = self._zk.get('/rpc/' + server)[0]
            addr = json.loads(data)
            self._servers.append(addr)

    def _get_server(self):
        """
        随机选出一个可用的服务器
        """
        return random.choice(self._servers)

    def get_connection(self):
        """
        提供一个可用的tcp连接
        """
        while True:
            server = self._get_server()
            print(server)
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((server['host'], server['port']))
            except ConnectionRefusedError:
                time.sleep(1)
                continue
            else:
                break
        return sock

client改写

from services import ClientStub
from services import DistributedChannel
from services import InvalidOperation
import time


channel = DistributedChannel()

for i in range(50):
    try:
        stub = ClientStub(channel)
        val = stub.divide(i)
    except InvalidOperation as e:
        print(e.message)
    else:
        print(val)
    time.sleep(1)

测试完整代码

import json
from kazoo.client import KazooClient

zk = KazooClient(hosts="192.168.218.136:2181")

# 启动连接
zk.start()
# 创建节点路径,但不能设置节点数据值
zk.ensure_path("/rpc")
addr1 = {"host": "127.0.0.1", "port": 8001}
addr1_str = json.dumps(addr1)
# 创建节点,并设置节点保存数据,ephemeral表示是否是临时节点,sequence表示是否是顺序节点
zk.create("/rpc/server", addr1_str.encode(), ephemeral=True, sequence=True)

addr2 = {"host": "127.0.0.1", "port": 8002}
addr2_str = json.dumps(addr2)
zk.create("/rpc/server", addr2_str.encode(), ephemeral=True, sequence=True)


# 获取子节点列表
children = zk.get_children("/rpc")

# 获取节点数据data 和节点状态stat
for i in zk.get_children("/rpc"):
    print(zk.get("/rpc/"+i)[0])


def on_change(event):
    print(event)


# 设置监视, 该监视只触发一次
servers = zk.get_children("/rpc", watch=on_change)

# 停止连接
zk.stop()

service.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Wjy
import struct
from io import BytesIO
import socket
import threading
import json
import random
import time
from kazoo.client import KazooClient


class InvalidOperation(Exception):

    def __init__(self, message=None):
        self.message = message or "invalid operation"


class MethodProtocol(object):
    """
    解读方法名
    """
    def __init__(self, connection):
        self.conn = connection

    def _read_all(self, size):
        """
        帮助我们读取二进制数据
        :param size: 想要读取的二进制数据大小
        :return: 二进制数据 bytes
        """
        # self.conn
        # 读取二进制数据
        # socket.recv(4) => ?4
        # BytesIO.read
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # socket
            have = 0
            buff = b""
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have += l

                if l == 0:
                    # 表示客户端socket关闭了
                    raise EOFError()
            return buff

    def get_method_name(self):
        """
        提供方法名
        :return: str 方法名
        """
        # 读取字符串长度
        buff = self._read_all(4)
        length = struct.unpack("!I", buff)[0]

        # 读取字符串
        buff = self._read_all(length)
        name = buff.decode()
        return name


class DivideProtocol(object):
    """
    divide过程消息协议转换工具
    """

    def args_encode(self, num1, num2=1):
        """
        将原始的调用请求参数转换打包成二进制消息数据
        :param num1: int
        :param num2: int
        :return: bytes 二进制消息shuju
        """
        name = "divide"

        # 处理方法的名字 字符串
        # 处理字符串的长度
        buff = struct.pack("!I", 6)
        # 处理字符
        buff += name.encode()

        # 处理参数1
        # 处理序号
        buff2 = struct.pack("!B", 1)
        # 处理参数值
        buff2 += struct.pack("!i", num1)

        # 处理参数2
        if num2 != 1:
            # 处理序号
            buff2 += struct.pack("!B", 2)
            # 处理参数值
            buff2 += struct.pack("!i", num2)

        # 处理消息长度,边界固定
        length = len(buff2)
        buff += struct.pack("!I", length)

        buff += buff2

        return buff

    def _read_all(self, size):
        """
        帮助我们读取二进制数据
        :param size: 想要读取的二进制数据大小
        :return: 二进制数据 bytes
        """
        # self.conn
        # 读取二进制数据
        # socket.recv(4) => ?4
        # BytesIO.read
        if isinstance(self.conn, BytesIO):
            buff = self.conn.read(size)
            return buff
        else:
            # socket
            have = 0
            buff = b""
            while have < size:
                chunk = self.conn.recv(size - have)
                buff += chunk
                l = len(chunk)
                have += l

                if l == 0:
                    # 表示客户端socket关闭了
                    raise EOFError()
            return buff

    def args_decode(self, connection):
        """
        接受调用请求消息数据并进行解析
        :param connection: 连接对象 socket BytesIO
        :return: dict 包含了解析之后的参数
        """
        param_len_map = {
            1: 4,
            2: 4,
        }
        param_fmt_map = {
            1: "!i",
            2: "!i",
        }
        param_name_map = {
            1: "num1",
            2: "num2"
        }

        # 保存用来返回的参数
        # args = {"num1": xxx, "num2": xxx}
        args = {

        }

        self.conn = connection
        # 处理方法的名已经提前被处理(稍后实现)

        # 处理消息边界
        # 读取二进制数据
        # socket.recv(4) => ?4
        # BytesIO.read
        buff = self._read_all(4)
        # 将二进制数据转换为python数据类型
        length = struct.unpack("!I", buff)[0]

        # 已经读取处理的字节数
        have = 0

        # 处理第一个参数
        # 处理参数序号
        buff = self._read_all(1)
        have += 1
        param_seg = struct.unpack("!B", buff)[0]

        # 处理参数值
        param_len = param_len_map[param_seg]
        buff = self._read_all(param_len)
        have += param_len
        param_fmt = param_fmt_map[param_seg]
        param = struct.unpack(param_fmt, buff)[0]

        param_name = param_name_map[param_seg]
        args[param_name] = param

        if have >= length:
            return args

        # 处理第二个参数
        # 处理参数序号
        buff = self._read_all(1)
        param_seg = struct.unpack("!B", buff)[0]

        # 处理参数值
        param_len = param_len_map[param_seg]
        buff = self._read_all(param_len)
        param_fmt = param_fmt_map[param_seg]
        param = struct.unpack(param_fmt, buff)[0]

        param_name = param_name_map[param_seg]
        args[param_name] = param

        return args

    def result_encode(self, result):
        """
        将原始结果数据转换为消息协议二进制数据
        :param result: 原始结果数据 float InvalidOperation
        :return: bytes 消息协议二进制数据
        """
        # 正常
        if isinstance(result, float):
            # 处理返回值类型
            buff = struct.pack("!B", 1)
            buff += struct.pack("!f", result)
            return buff
        # 异常
        else:
            # 处理返回值类型
            buff = struct.pack("!B", 2)
            # 处理返回值
            length = len(result.message)
            # 处理字符串长度
            buff += struct.pack("!I", length)
            # 处理字符
            buff += result.message.encode()
            return buff

    def result_decode(self, connection):
        """
        将返回值消息数据转换为原始返回值
        :param connection: socket BytesIO
        :return: float InvalidOperation对象
        """
        self.conn = connection

        # 处理返回值类型
        buff = self._read_all(1)
        result_type = struct.unpack("!B", buff)[0]

        if result_type == 1:
            # 正常
            # 读取float数量
            buff = self._read_all(4)
            val = struct.unpack("!f", buff)[0]
            return val
        else:
            # 异常
            # 读取字符串的长度
            buff = self._read_all(4)
            length = struct.unpack("!I", buff)[0]

            # 读取字符串
            buff = self._read_all(length)
            message = buff.decode()

            return InvalidOperation(message)


class Channel(object):
    """
    用户客户端建立网络连接
    """
    def __init__(self, host, port):
        """

        :param host: 服务器地址
        :param port: 服务器端口号
        """
        self.host = host
        self.port = port

    def get_connection(self):
        """
        获取连接对象
        :return: 与服务器通讯的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock


class DistributedChannel(object):
    """
    支持分布式的zookeeper的RPC客户端通讯连接工具
    """
    def __init__(self):
        # 创建kazoo对象,用来跟zookeeper连接,获取信息
        self.zk = KazooClient("192.168.218.136:2181")
        self.zk.start()
        self._servers = []
        self._get_servers()

    def _get_servers(self, event=None):
        """
        从zookeeper中获取所有可用的RPC服务器地址信息
        :return:
        """
        self._servers = []
        # 从zookeeper中获取/rpc节点下所有可用的rpc服务器节点
        servers = self.zk.get_children("/rpc", watch=self._get_servers)
        # 遍历节点,获取服务器的地址信息
        for server in servers:
            addr_data = self.zk.get("/rpc/" + server)[0]
            addr = json.loads(addr_data)
            self._servers.append(addr)

    def _get_server(self):
        """
        从可用的服务器列表中选出一台服务器
        :return:
        """
        return random.choice(self._servers)

    def get_connection(self):
        """
        提供一个具体的与RPC服务器的连接socket
        :return:
        """
        while True:
            addr = self._get_server()
            print(addr)
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((addr["host"], addr["port"]))
            except ConnectionRefusedError:
                time.sleep(1)
                continue
            else:
                return sock



class Server(object):
    """
    RPC服务器
    """
    def __init__(self, host, port, handlers):
        self.host = host
        self.port = port
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 设置socket
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 绑定地址
        sock.bind((self.host, self.port))
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)
        print("服务器开始监听")

        # 接收客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print("与客户端%s建立了连接" % str(client_addr))

            # 交给ServerStub,完成客户端的具体的RPC调用请求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                        stub.process()
            except EOFError:
                # 表示客户端关闭了连接
                print("客户端关闭了连接")
                client_sock.close()


class ThreadServer(object):
    """
    多线程RPC服务器
    """
    def __init__(self, host, port, handlers):
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 设置socket
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 绑定地址
        sock.bind((host, port))
        self.host = host
        self.port = port
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)
        print("服务器开始监听")

        # 注册到zookeeper
        self.regoster_zookeeper()

        # 接收客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print("与客户端%s建立了连接" % str(client_addr))

            # 创建子线程处理这个客户端
            t = threading.Thread(target=self.handle, args=(client_sock,))
            # 开启子线程执行
            t.start()

    def regoster_zookeeper(self):
        """
        在zookeeper中心中注册本服务器的地址信息
        :return:
        """
        # 创建kazoo客户端
        zk = KazooClient("192.168.218.136:2181")
        # 建立与zookeeper的连接
        zk.start()
        # 在zookeeper中创建节点保存数据
        zk.ensure_path("/rpc")
        data = json.dumps({"host": self.host, "port": self.port})
        # 在zookeeper上存储服务器地址及端口,设置成临时节点,顺序节点
        zk.create("/rpc/server", data.encode(), ephemeral=True, sequence=True)

    def handle(self, client_sock):
        """
        子线程调用的方法,用来处理一个客户端的请求
        :return:
        """
        # 交给ServerStub,完成客户端的具体的RPC调用请求
        stub = ServerStub(client_sock, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            # 表示客户端关闭了连接
            print("客户端关闭了连接")
            client_sock.close()


class ClientStub(object):
    """
    用来帮助客户端完成远程过程调用 RPC调用

    stub = ClientStub()
    stib.divide(200)
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()

    def divide(self, num1, num2=1):
        # 将调用的参数打包成消息协议的数据
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)

        # 将消息数据通过网络发送给服务器
        self.conn.sendall(args)

        # 接收服务器返回的返回值消息数据,并进行解析
        result = proto.result_decode(self.conn)

        # 将结果值(正常float 或 异常InvalidOperation)返回给客户端
        if isinstance(result, float):
            # 正常
            return result
        else:
            # 异常
            raise result

    def add(self):
        pass


class ServerStub:
    """
    帮助服务端完成远程过程调用
    """
    def __init__(self, connection, handlers):
        """

        :param connection: 与客户端的连接
        :param handlers: 真正本地被调用方法(函数 过程)
        class Handlers:

            @staticmethod
            def divide(num1, num2=1):
                pass

            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = {
            "divide": self._process_divide,
            "add": self._process_add,
        }
        self.handlers = handlers

    def process(self):
        """
        当服务端接收了一个客户端的连接,建立好连接后,完成远端调用处理
        :return:
        """
        # 接收消息数据,并解析方法的名字
        name = self.method_proto.get_method_name()

        # 根据机械获得的方法(过程)名,调用相应的过程协议,接收并解析消息数据
        # self.process_map[name]()
        _process = self.process_map[name]
        _process()

    def _process_divide(self):
        """
        处理除法过程调用
        :return:
        """
        # 创建用于除法过程调用参数协议解析的工具
        proto = DivideProtocol()
        # 解析调用消息参数
        args = proto.args_decode(self.conn)
        # args = {"num1": xxx, "num2": xxx}

        # 进行除法的本地过程调用
        # 将本地调用过程的返回值(包括可能的异常)打包成消息协议数据,通过网络返回给客户端
        try:
            val = self.handlers.divide(**args)
        except InvalidOperation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)

        self.conn.sendall(ret_message)

    def _process_add(self):
        pass


if __name__ == '__main__':
    # 狗贼消息数据
    proto = DivideProtocol()
    # divide(200, 100)
    # message = proto.args_encode(200, 100)
    # divide(200)
    message = proto.args_encode(200)
    conn = BytesIO()
    conn.write(message)
    conn.seek(0)

    # 解析消息数据
    method = MethodProtocol(conn)
    name = method.get_method_name()
    print(name)

    args = proto.args_decode(conn)
    print(args)

client.py

from service import ClientStub
from service import Channel
from service import InvalidOperation
from service import DistributedChannel
import time

# 创建与服务器的连接
# channel = Channel("127.0.0.1", 8000)
channel = DistributedChannel()

# 运行调用
for i in range(50):
    try:
        # 创建用于RPC调用的工具
        stub = ClientStub(channel)

        val = stub.divide(i * 100, 50)
    except InvalidOperation as e:
        print(e.message)
    except Exception as e:
        print(e)
    else:
        print(val)

    time.sleep(1)

使用.py

from service import InvalidOperation
from service import Server, ThreadServer
import sys


class Handlers:

    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1: int
        :param num2: int
        :return:
        """
        if num2 == 0:
            raise InvalidOperation()
        val = num1 / num2
        return val


if __name__ == '__main__':
    # 开启服务器
    # _server = Server("127.0.0.1", 8000, Handlers)
    # _server.serve()

    # 从启动命令中提取服务器运行的ip地址和端口号
    host = sys.argv[1]
    port = sys.argv[2]

    _server = ThreadServer(host, int(port), Handlers)
    _server.serve()
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Stella981 Stella981
3年前
Hadoop 2.6.0 HA高可用集群配置详解(二)
Zookeeper集群安装Zookeeper是一个开源分布式协调服务,其独特的LeaderFollower集群结构,很好的解决了分布式单点问题。目前主要用于诸如:统一命名服务、配置管理、锁服务、集群管理等场景。大数据应用中主要使用Zookeeper的集群管理功能。本集群使用zookeeper3.4.5cdh5.7.1版本。首先在Hado
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
4cast
4castpackageloadcsv.KumarAwanish发布:2020122117:43:04.501348作者:KumarAwanish作者邮箱:awanish00@gmail.com首页:
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
可莉 可莉
3年前
2020年最新ZooKeeper面试题(附答案)
2020年最新ZooKeeper面试题1\.ZooKeeper是什么?ZooKeeper是一个开源的分布式协调服务。它是一个为分布式应用提供一致性服务的软件,分布式应用程序可以基于Zookeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布
Stella981 Stella981
3年前
2020年最新ZooKeeper面试题(附答案)
2020年最新ZooKeeper面试题1\.ZooKeeper是什么?ZooKeeper是一个开源的分布式协调服务。它是一个为分布式应用提供一致性服务的软件,分布式应用程序可以基于Zookeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
隔壁老王
隔壁老王
Lv1
千万程序员队伍中的一员。我住隔壁我姓王,同事们亲切得称呼我隔壁老王
文章
20
粉丝
2
获赞
7