rabbitmq中文教程python版 - 远程过程调用

源码:https://github.com/ltoddy/rabbitmq-tutorial

远程过程调用(RPC)

(using the Pika Python client)

本章节教程重点介绍的内容

在第二篇教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

但是如果我们需要在远程计算机上运行某个功能并等待结果呢?那么,这是一个不同的事情。
这种模式通常称为远程过程调用(RPC)。

在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。
由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。

客户端界面

为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,
它发送一个RPC请求并阻塞,直到收到答案:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
*有关RPC的说明*

虽然RPC是计算中很常见的模式,但它经常被吹毛求疵。当程序员不知道函数调用是本地的还是
慢速的RPC时会出现这些问题。像这样的混乱导致不可预知的问题,并增加了调试的不必要的复杂性,
而不是我们想要的简化软件。

铭记这一点,请考虑以下建议:

  * 确保显而易见哪个函数调用是本地的,哪个是远程的。
  * 记录您的系统。清楚组件之间的依赖关系。
  * 处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?

有疑问时避免RPC。如果可以的话,你应该使用异步管道 - 而不是类似于RPC的阻塞,
其结果被异步推送到下一个计算阶段。

回调队列

一般来说,通过RabbitMQ来执行RPC是很容易的。客户端发送请求消息,服务器回复响应消息。
为了接收响应,客户端需要发送一个“回调”队列地址和请求。让我们试试看:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)
消息属性

AMQP 0-9-1协议预定义了一组包含14个属性的消息。大多数属性很少使用,但以下情况除外:

delivery_mode:将消息标记为持久(值为2)或瞬态(任何其他值)。你可能会记得第二篇教程中的这个属性。
content_type:用于描述编码的MIME类型。例如,对于经常使用的JSON编码,将此属性设置为application/json是一种很好的做法。
reply_to:通常用于命名回调队列。
correlation_id:用于将RPC响应与请求关联起来。

相关ID

在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,
但幸运的是有一个更好的方法 - 让我们为每个客户端创建一个回调队列。

这引发了一个新问题,在该队列中收到回复后,不清楚回复属于哪个请求。那是什么时候使用correlation_id属性。
我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中收到消息时,我们将查看此属性,
并基于此属性,我们将能够将响应与请求进行匹配。如果我们看到未知的correlation_id值,
我们可以放心地丢弃该消息 - 它不属于我们的请求。

您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是抛出错误?
这是由于服务器端可能出现竞争状况。虽然不太可能,但在发送给我们答案之后,但在发送请求的确认消息之前,
RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。
这就是为什么在客户端,我们必须优雅地处理重复的响应,理想情况下RPC应该是等幂的。

总结

image

我们的RPC会像这样工作:

  • 当客户端启动时,它创建一个匿名独占回调队列。
  • 对于RPC请求,客户端将发送具有两个属性的消息:reply_to,该消息设置为回调队列和correlation_id,该值设置为每个请求的唯一值。
  • 该请求被发送到rpc_queue队列。
  • RPC worker(又名:服务器)正在等待该队列上的请求。当出现请求时,它执行该作业,并使用reply_to字段中的队列将结果发送回客户端。
  • 客户端在回调队列中等待数据。当出现消息时,它会检查correlation_id属性。如果它匹配来自请求的值,则返回对应用程序的响应。

把它放在一起

rpc_server.py的代码:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_size=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服务器代码非常简单:

  • (4)像往常一样,我们首先建立连接并声明队列。
  • (11)我们声明我们的斐波那契函数。它只假定有效的正整数输入。(不要指望这个版本适用于大数字,它可能是最慢的递归实现)。
  • (20)我们声明了basic_consume的回调,它是RPC服务器的核心。它在收到请求时执行。它完成工作并将响应发回。
  • (34)我们可能想运行多个服务器进程。为了在多台服务器上平均分配负载,我们需要设置prefetch_count设置。

rpc_client.py的代码:

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.corrrelation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()

        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客户端代码稍有涉及:

  • (8)我们建立连接,通道并为回复声明独占的“回调”队列。
  • (17)我们订阅'回调'队列,以便我们可以接收RPC响应。
  • (19)对每个响应执行的'on_response'回调函数做了一个非常简单的工作,对于每个响应消息它检查correlation_id是否是我们正在寻找的。如果是这样,它将保存self.response中的响应并打破消费循环。
  • (23)接下来,我们定义我们的主要调用方法 - 它执行实际的RPC请求。
  • (25)在这个方法中,首先我们生成一个唯一的correlation_id数并保存 - 'on_response'回调函数将使用这个值来捕获适当的响应。
  • (29)接下来,我们发布具有两个属性的请求消息:reply_tocorrelation_id
  • (32)在这一点上,我们可以坐下来等待,直到适当的回应到达。
  • (41)最后,我们将回复返回给用户。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 内容来自:RabbitMQ Tutorials Java版 远程过程调用(RPC) 在第二个教程中,我们学会了如何...
    maxwellyue阅读 3,118评论 0 4
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,860评论 2 11
  • 本文所有内容均个人从RabbitMQ官网教程中翻译,若图片文字的引用有任何侵权的地方,联系我,我会立马删除。 Th...
    JobinLi阅读 873评论 0 0
  • 男人问 怎么驾驭女人 女人问 怎么驾驭男人 时间驾驭着男人和女人 考验驾驭着男人和女人 驾驭的褒贬 取决于驾驭的方...
    文采乐阅读 432评论 9 6