RabbitMQ+GRPC的快速使用(1)

使用背景

之前写的同步发送通知随着业务量的增长,已经不再适用,所以快速实现一个基本的rq队列+grpc的方式来投递通知数据并交给rq的worker去调用grpc的服务。
但是之前调用的地方太多了,所以最好还是以patch的方式去修改

思路

原有的结构大致为图1所示


图1

首先flask调用grpc再由grpc请求微信服务器发送消息,然后由微信响应请求后返回通知结果给grpc,grpc再返回结果给flask最终返回给客户端,所以除非等到grpc返回调用结果,否则将会一直阻塞
现在则为


图2

flask投递消息到队列中去就就结束了,直接返回到客户端,这里就不会阻塞,而是让监听rabbitMQ的worker去执行

这里暂时只创建一个队列去分发所有类型的通知所以message的格式需要固定
{"method":"method_name", "data":{}},客户端调用publish传入对应的参数即可

# client.py
import pika
import pickle


class RabbitClient(object):
    def __init__(self, host="localhost", port=5672, routing_key=None):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=host, port=port))
        self.channel = self.connection.channel()
        self.routing_key = routing_key

    def publish(self, method_name, **kwargs):
        message = self.package(method_name, **kwargs)

        self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=message)

    def package(self, method_name, **kwargs):
        temp = {"method": method_name}
        temp.update(data=kwargs)
        return pickle.dumps(temp)
# 这里是调用的工具module,原来的方式已经注释
from apps import rq


# def sen_message_test(user_id, message):
#     """
#
#     :param user_id:
#     :param message: {"title":"","message":""}
#     :return:
#     """
#     with grpc.insecure_channel("{0}:{1}".format(_HOST, _PORT)) as channel:
#         client = send_server_pb2_grpc.SendServiceStub(channel=channel)
#         response = client.SendMessage(send_server_pb2.SendMessageParam(user_id=user_id, message=json.dumps(message)))
#     print("received: " + str(response))

def sen_message_test(user_id, message):
    rq.publish("sen_message_test", user_id=user_id, message=message)

def debt_remind_test(user_id=None, bill_id=None):
    rq.publish("debt_remind_test", user_id, bill_id)

def repair_remind_test(user_id=None, repair_id=None):
    rq.publish("repair_remind_test", user_id=user_id, repair_id=repair_id)

# 太多了就不全列出来了,总之就是要保证原来的业务逻辑代码还能用
# worker.py
import pika
import pickle


class RabbitServer(object):
    def __init__(self, host="localhost", port=5672, queue=None):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=host, port=port))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=queue, durable=True)

        self.channel.basic_consume(on_message_callback=self.callback, queue=queue, auto_ack=True)
        self.dispatcher = RpcMethodDispatcher()
        self.setup = self.dispatcher.setup

    def callback(self, ch, method, properties, body):
        body = pickle.loads(body)
        print(body)

        func = self.dispatcher.dispatch(body.get("method"))
        if not func:
            return
        try:
            func(**body.get("data"))
        except Exception as e:
            print(e)

    def run(self):
        print("wait")
        self.channel.start_consuming()


class RpcMethodDispatcher(object):
    def __init__(self):
        self.map = []

    def setup(self, name):
        # 和message中的method相互对应类似于@app.route("/"),将所有路由添加过来
        def deco(f):
            self.map.append(MethodMap(name, f))

            def wrapper(*args, **kwargs):
                return f(*args, **kwargs)

            return wrapper

        return deco

    def dispatch(self, name):
        for i in self.map:
            if i.name == name:
                return i.method


class MethodMap(object):
    def __init__(self, name, method):
        self.name = name
        self.method = method


server = RabbitServer(queue="task_queue")

if __name__ == '__main__':
    server.run()

给标题后面加了个(1),我知道这玩意儿很快就会还要修改
可能看到这里就会有同学问了,为啥不new一个thread去执行嘞?


image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,082评论 1 32
  • GRPC是基于protocol buffers3.0协议的. 本文将向您介绍gRPC和protocol buffe...
    二月_春风阅读 17,972评论 2 28
  • __block和__weak修饰符的区别其实是挺明显的:1.__block不管是ARC还是MRC模式下都可以使用,...
    LZM轮回阅读 3,278评论 0 6
  • 一种能飞的汽车,不用任何汽油等柴油。只要有指纹他就能飞起来而且不需要任何轨道,想去哪去。 也可以飞到海里,只需要关...
    赵霞_a476阅读 1,388评论 0 1
  • 鬼神神差那次买了村上春树的《当我谈跑步时,我谈些什么》,加上当时有好朋友迷上了跑步,经常在朋友圈晒各种跑马拉松的趣...
    暴走君萨阅读 451评论 10 5