使用kazoo改写grpc程序

之前我们用过grpc来实现过远程过程调用,这次我们将Zookeeper加入到我们的实践当中,所以需要改写其中的一部分程序
server:
在实现服务端的时候我们需要在开始服务之前将自己的服务信息注册到Zookeeper去,值得注意的是当我们需要测试断开连接是否会从Zookeeper中取消注册的时候我们最好在关闭的时候使用KazooClient.stop()方法,要不然连接不会瞬间关闭而是有一定的延迟,得出的测试效果也会大大折扣

import req_pb2_grpc as PService
import req_pb2 as Mess
import grpc
from concurrent import futures


# 实现被调用的具体代码
class DemoService(PService.DemoServicer):

    def __init__(self):
        self.city_db = {
            "beijing": [' python ', 'c++', 'go'],
            "shanghai": [' 产品 ', '123', '数学'],
            "wuhan": [' 语文 ', '英语', '数学']
        }
        self.answers = list(range(10))

    def Calculate(self, request, context):
        if request.num1 == 0:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("cannot divide by 0")
            return Mess.Result()

        if request.op == Mess.Work.ADD:
            result = request.num1 + request.num2
        elif request.op == Mess.Work.SUBTRACT:
            result = request.num1 - request.num2
        else:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("invalid op parameter")
            return Mess.Result()

        return Mess.Result(result=result)

        pass

    def GetCommunity(self, request, context):
        city = request.name
        subs = self.city_db.get(city)
        print("*" * 100)
        for sub in subs:
            import time
            # time.sleep(1)
            yield Mess.Subject(name=sub)

    def Accumulate(self, request_iterator, context):
        sum = 0
        for num in request_iterator:
            sum += num.val
            print(sum)
        return Mess.Sum(val=sum)

    def GuessNumber(self, request_iterator, context):
        for request in request_iterator:
            if request.val in self.answers:
                print(request.val)
                yield Mess.Answer(val=request.val, desc='bingo')


from kazoo.client import KazooClient
import json
import sys


class DistributeServer(object):
    def register_zookeeper(self, data):
        """
        注册到zookeeper
        :return:
        """
        # 创建kazoo客户端
        self.zk = KazooClient("127.0.0.1:2181")
        # 建立连接
        self.zk.start()
        # 在zookeeper中创建节点信息
        self.zk.ensure_path("/rpc")
        self.zk.create('/rpc/server', data, ephemeral=True, sequence=True)
        print("成功注册到zookeeper:", data.decode())

    # 开启服务器
    def serve(self, host, port):
        # 创建服务器-对象 多线程服务器
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        # 注册实现的服务方法到服务器对象中
        PService.add_DemoServicer_to_server(DemoService(), server)
        # 设置服务地址
        server.add_insecure_port('{}:{}'.format(host, port))
        # 开启服务
        print('start server')
        # 注册到zookeeper
        addr = {"host": host, "port": port}
        self.register_zookeeper(json.dumps(addr).encode())

        server.start()
        # 关闭服务
        import time

        try:
            time.sleep(1000)
        except KeyboardInterrupt:
            self.zk.stop()
            server.stop(0)
            print("stop server")
        pass


if __name__ == '__main__':
    port = sys.argv[1]
    host = sys.argv[2]
    server = DistributeServer()
    server.serve(port, host)

client:
在客户端需要进行rpc调用的时候首先从Zookeeper中获取当前的正常运行的服务有哪些,然后从中选取一个服务器去进行连接

import grpc
from grpc_demo import req_pb2_grpc, req_pb2


def invoke(stub):
    work = req_pb2.Work()
    work.num1 = 1
    work.num2 = 2
    work.op = req_pb2.Work.ADD
    result = stub.Calculate(work)
    print("result :", result.result)

    work.num1 = 0
    try:
        result = stub.Calculate(work)
    except grpc.RpcError as e:
        print('{}:{}'.format(e.code(), e.details()))


def invoke_get_sub(stub):
    city = req_pb2.City(name="beijing")
    subs = stub.GetCommunity(city)
    for sub in subs:
        print(sub.name)


def generate_delta():
    for i in range(10):
        import random
        import time
        time.sleep(0.5)
        delta = random.randint(1, 100)
        yield req_pb2.Delta(val=delta)


def invoke_sum(stub):
    delta_iterator = generate_delta()
    sum = stub.Accumulate(delta_iterator)
    print(sum.val)


def generate_number():
    for i in range(10):
        import random
        import time
        time.sleep(0.5)
        delta = random.randint(1, 100)
        yield req_pb2.Number(val=delta)


def invoke_guess(stub):
    number_iterator = generate_number()
    answers = stub.GuessNumber(number_iterator)
    for an in answers:
        print("{}:{}".format(an.val, an.desc))


from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
import json


class Client(object):
    def __init__(self):
        self.zk = KazooClient("127.0.0.1:2181")
        self.zk.start()
        self.servers = []
        self.get_servers()

    def get_servers(self, event=None):
        # 有变化的话回调g重新获取server列表
        servers = self.zk.get_children("/rpc", watch=self.get_servers)
        print("="*20)
        print("更新了!!!!!")
        print("="*20)

        if not servers:
            raise KazooException("暂无子节点")
        for server in servers:
            # server_list.append("/rpc/" + json(server_data))
            addr_data, node_state = self.zk.get("/rpc/" + server)
            self.servers.append(addr_data.decode())

    def get_server(self):
        import random
        server = json.loads(random.choice(self.servers))
        return "{}:{}".format(server["host"], server["port"])

    def run(self):
        # 获取分布式的channel 从zookeeper中获取
        ipport = self.get_server()
        print("REQUEST: ", ipport)
        # channel = None
        while True:
            try:
                channel = grpc.insecure_channel(ipport)
            except Exception:
                continue
            else:
                break
        # with grpc.insecure_channel(ipport) as channel:
        # 创建stub对象
        stub = req_pb2_grpc.DemoStub(channel)
        # invoke_get_sub(stub)
        try:
            invoke_get_sub(stub)
            # invoke(stub)
        except Exception as e:
            pass
        # invoke_guess(stub)

        channel.close()


if __name__ == '__main__':
    import time

    ins = Client()
    for i in range(50):
        time.sleep(1)
        ins.run()

测试:
我们这里可以开启多个server分别监听不同的端口,并且让客户端在执行的时候打印出请求的服务是哪一个
然后在客户端运行的过程中,关闭某一个server,这时候我们就可以看到客户端的终端会打印出”更新了“的字样,并且后续的请求都不会再去请求那个已经被我们关闭的服务,代表测试成功。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容