cinder RPC 分析
[TOC]
我们都知道在Cinder内部,各组件之间通讯是通过RPC api,比如cinder-api创建卷,会通过RPC通知scheduler,然后scheduler再选择一个cinder-volume服务,rpc通知创建卷。
AMQP通信协议
Openstack 组件内部的 RPC(Remote Producer Call)机制的实现是基于 AMQP(Advanced Message Queuing Protocol)作为通讯模型,从而满足组件内部的松耦合性。AMQP 是用于异步消息通讯的消息中间件协议,AMQP 模型有四个重要的角色:
- Exchange:根据 Routing key 转发消息到对应的 Message Queue 中
- Routing key:用于 Exchange 判断哪些消息需要发送对应的 Message Queue
- Publisher:消息发送者,将消息发送的 Exchange 并指明 Routing Key,以便 Message Queue 可以正确的收到消息
- Consumer:消息接受者,从 Message Queue 获取消息
消息发布者 Publisher 将 Message 发送给 Exchange 并且说明 Routing Key。Exchange 负责根据 Message 的 Routing Key 进行路由,将 Message 正确地转发给相应的 Message Queue。监听在 Message Queue 上的 Consumer 将会从 Queue 中读取消息。
Routing Key 是 Exchange 转发信息的依据,因此每个消息都有一个 Routing Key 表明可以接受消息的目的地址,而每个 Message Queue 都可以通过将自己想要接收的 Routing Key 告诉 Exchange 进行 binding,这样 Exchange 就可以将消息正确地转发给相应的 Message Queue。图 2 就是 AMQP 消息模型。
AMQP 定义了三种类型的 Exchange,不同类型 Exchange 实现不同的 routing 算法:
- Direct Exchange:Point-to-Point 消息模式,消息点对点的通信模式,Direct Exchange 根据 Routing Key 进行精确匹配,只有对应的 Message Queue 会接受到消息
- Topic Exchange:Publish-Subscribe(Pub-sub)消息模式,Topic Exchange 根据 Routing Key 进行模糊匹配,只要符合模式匹配的 Message Queue 都会收到消息。支持Routing-key用或#的模式,进行绑定。匹配一个单词,#匹配0个或者多个单词。例如,binding key *.user.# 匹配routing key为 usd.user和eur.user.db,但是不匹配user.hello。
- Fanout Exchange:广播消息模式,Fanout Exchange 将消息转发到所有绑定到它的 Message Queue,而不考虑routing key的值。
OpenStack 目前支持的基于 AMQP 模型的 RPC backend 有 RabbitMQ、QPid、ZeroMQ,对应的具体实现模块在 lib\site-packages\oslo_messaging\ _drivers目录下,impl_*.py 分别为对应的不同 backend 的实现。cinder默认使用RabbitMQ。
RabbitMQ 介绍
作为消息队列(MQ),是一种应用程序对应用程序的通信方法。MQ是消费(consumer)-生产者(proceduer)模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。
MQ的用处是在项目中,将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
常用指令
-
创建用户:
rabbitmqctl add_user <username> <password>
[root@localhost ~]# rabbitmqctl add_user wyue wyue
Creating user "wyue" ...
-
查看所有用户:
rabbitmqctl list_users
[root@localhost ~]# rabbitmqctl list_users
Listing users ...
guest [administrator]
wyue []
stackrabbit []
关闭rabbitmq:
rabbitmqctl stop_app
还原:
rabbitmqctl reset
启动:
rabbitmqctl start_app
设置权限:
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
在创建用户后,必须对用户设置权限,否则连接会被拒绝!-
查看所有队列信息:
rabbitmqctl list_queues
所列格式是 queue_name | queue_length ,
-
查看所有Exchange信息:
rabbitmqctl list_exchanges
所列格式是 exchange_name | exchange_type , exchange是以name作为唯一的,如果你之前定义了一个exchange(name='task',type='topic'),下次又定义exchange(name='task',type='direct')是会报错的!!
-
查看所有consumer信息:
rabbitmqctl list_consumers
只有程序对queue添加consumer后,用list_consumers指令才能看到这个consumer的信息,如果程序运行结束,consumer就不再对queue做监听了,用list_consumers指令便不能再查到之前consumer的信息。
我们了解下常用指令就行。python中已经有专门的工具库kombu,接下来我们看看它是什么玩意。
Kombu
Kombu是一个为Python写的消息库,目标是为AMQ协议提供一个傻瓜式的高层接口,让Python中的消息传递变得尽可能简单,并且也提供一些常见消息传递问题的解决方案。可参考《Kombu:Python的消息库》
消息队列的使用过程
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange,并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端使用routing key,在exchange和queue之间建立好绑定关系。
- 客户端投递消息到exchange。
- exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
范例
我们写个rabbitmq的通信,producer通过direct类型的exchange发送‘hello’到rabbitmq的队列,然后consumer从队列中取出。
注意:
- kombu最好升级版本到4.1.0,之前我使用4.0,运行的时候打开AMQP channel的时候报异常,提示WinError 10042。
- 如果连接rabbitmq新用户,请确认用户已经赋权,否则会被rabbitmq拒绝连接。
entity.py:
from kombu import Exchange, Queue
# 定义direct类型的exchange,另外还有topic/fanout两种类型
task_exchange = Exchange('tasks', type='direct')
# 创建一个队列,定义routing_key,并且跟exchange做绑定
task_queue = Queue('wy_test_queue', task_exchange, routing_key='wy_test1')
send.py
from kombu import Connection
from kombu.messaging import Producer
from entity import task_exchange
from kombu.transport.base import Message
# 创建连接
connection = Connection('amqp://wyue:wyue@172.24.3.200:5672//')
# 在连接里建立一个通道
with connection.channel() as channel:
# 初始化消息
message = Message(channel=channel, body='Hello')
# 定义消息发布者,绑定exchange
producer = Producer(channel, exchange=task_exchange)
# 选用routing_key,发布消息
producer.publish(message.body, routing_key='wy_test1')
运行send.py后,检查rabbitmq队列里wy_test_queue有一条信息。
[root@localhost ~]# rabbitmqctl list_queues|grep wy
wy_test_queue 1
recv.py:
from kombu import Connection
from kombu.messaging import Consumer
from entity import task_queue
connection = Connection('amqp://wyue:wyue@172.24.3.200:5672//')
def process_media(body, message):
"""消息回调函数"""
print body
# 确认消息已经收到
message.ack()
if __name__ == '__main__':
with connection.channel() as channel:
# 对队列创建一个消息消费者
consumer = Consumer(channel, task_queue)
# 注册回调
consumer.register_callback(process_media)
consumer.consume()
# 一直循环,除非收到某个事件退出,比如socket超时
while True:
connection.drain_events()
运行send.py后,检查rabbitmq队列里wy_test_queue已经没有信息。
而openstack,对底层MQ也做了封装。openstack能够支持多种MQ底层,其中也支持rabbitmq。
oslo_messaging
openstack 使用工具包来实现rpc调用。openstack社区把RPC相关的功能作为OpenStack的一个依赖库。其实oslo.messaging库就是把rabbitmq的python库做了封装,考虑到了编程友好、性能、可靠性、异常的捕获等诸多因素。让各个项目的开发者聚焦于业务代码的编写,而不用考虑消息如何发送和接收。我们上文讨论的源码,多属于oslo_messaging。
Cinder RPC加载
在cinder-api启动时候,代码cinder.cmd.api.main里可以看到rpc.init(CONF)做RPC的加载。
cinder.rpc.init:
def init(conf):
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
# 初始化RPC用的transport
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
# 初始化RPC notification用的transport
NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
# get_notification_transport has loaded oslo_messaging_notifications config
# group, so we can now check if notifications are actually enabled.
if utils.notifications_enabled(conf):
# 定义序列化工具
json_serializer = messaging.JsonPayloadSerializer()
serializer = RequestContextSerializer(json_serializer)
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer)
else:
NOTIFIER = utils.DO_NOTHING
调用oslo_messaging的get_transport方法。get_transport是一个工厂方法,可根据配置文件里的transport_url生成不同后端的TRANSPORT对象。
oslo_messaging.transport.get_transport:
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
"""A factory method for Transport objects.
This method will construct a Transport object from transport configuration
gleaned from the user's configuration and, optionally, a transport URL.
If a transport URL is supplied as a parameter, any transport configuration
contained in it takes precedence. If no transport URL is supplied, but
there is a transport URL supplied in the user's configuration then that
URL will take the place of the URL parameter. In both cases, any
configuration not supplied in the transport URL may be taken from
individual configuration parameters in the user's configuration.
An example transport URL might be::
rabbit://me:passwd@host:5672/virtual_host
and can either be passed as a string or a TransportURL object.
:param conf: the user configuration
:type conf: cfg.ConfigOpts
:param url: a transport URL
:type url: str or TransportURL
:param allowed_remote_exmods: a list of modules which a client using this
transport will deserialize remote exceptions
from
:type allowed_remote_exmods: list
:param aliases: A map of transport alias to transport name
:type aliases: dict
"""
allowed_remote_exmods = allowed_remote_exmods or []
# 导入'transport_url','rpc_backend','control_exchange'三个配置,配置说明见下文分析
conf.register_opts(_transport_opts)
# 把url(cinder.conf:transport_url = rabbit://stackrabbit:secret@172.24.3.200:5672/)转换成oslo_messaging.transport.TransportURL对象
if not isinstance(url, TransportURL):
url = TransportURL.parse(conf, url, aliases)
# url打印出: <TransportURL transport='rabbit', hosts=[<TransportHost hostname='172.24.2.218', port=5672, username='stackrabbit', password='secret'>]>
kwargs = dict(default_exchange=conf.control_exchange,
allowed_remote_exmods=allowed_remote_exmods)
try:
# 在oslo.messaging.driver这个命名空间下,绑定rabbitmq的驱动
mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport.split('+')[0],
invoke_on_load=True,
invoke_args=[conf, url],
invoke_kwds=kwargs)
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
return Transport(mgr.driver)
transport 配置项列表
配置项 | 默认值 | 说明 |
---|---|---|
transport_url | 无 | A URL representing the messaging driver to use and its full configuration. |
rabbit | The messaging driver to use, defaults to rabbit. Other drivers include amqp and zmq. 已经被弃用,被transport_url取代 | |
control_exchange | openstack | The default exchange under which topics are scoped. May be overridden by an exchange name specified in the transport_url option. |
serializer = RequestContextSerializer(serializer) 是消息的序列化处理,把cinder消息转换成可以在网络中传送的格式。
总结
- cinder在启动cinder-api服务的时候,把RPC相关环境加载好
- 加载的内容主要是rpc的transport和serializer
- transport是根据配置项transport_url和control_exchange创建的,可看做cinder和rpc后端(如rabbitmq)之间的消息中转站。
- serializer是序列化工具,用于rpc发送消息的序列化转换。
Cinder RPC 接口
scheduler和volume都有定义自己的RPC接口,我们以scheduler为例。
rpcapi.py文件开放了RPC api接口,manager.py则是RPC 方法的具体业务实现。
cinder.scheduler.rpcapi.SchedulerAPI继承自cinder.rpc.RPCAPI
每一个RPC api初始化的时候,都要定义target、serializer、client等,__init__
定义在cinder.rpc.RPCAPI,其它子RPCAPI可直接继承使用:
cinder.rpc.RPCAPI#init
def __init__(self):
target = messaging.Target(topic=self.TOPIC,
version=self.RPC_API_VERSION)
obj_version_cap = self.determine_obj_version_cap()
serializer = base.CinderObjectSerializer(obj_version_cap)
rpc_version_cap = self.determine_rpc_version_cap()
self.client = get_client(target, version_cap=rpc_version_cap,
serializer=serializer)
而一些特别的参数比如RPC_API_VERSION、TOPIC则由子RPCAPI自己定义:
cinder.scheduler.rpcapi.SchedulerAPI
RPC_API_VERSION = '3.5'
RPC_DEFAULT_VERSION = '3.0'
# cinder/common/constants.py:21定义的 SCHEDULER_BINARY = "cinder-scheduler"
TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler'
如果我们要定义rpc api,直接在cinder.scheduler.rpcapi.SchedulerAPI里添加即可。比如我们写一个say_hello的rpc api 的 demo:
def say_hello(self, ctxt):
version = '3.0'
cctxt = self.client.prepare(version=version)
cctxt.cast(ctxt, 'say_hello')
self.client 来自于cinder.rpc.get_client:
def get_client(target, version_cap=None, serializer=None):
# assert断言是声明其布尔值必须为真的判定,如果发生异常就说明表达示为假。
# 这里判断TRANSPORT如果为空就断言异常退出,TRANSPORT已经在cinder-api服务启动时加载好了,可见上文。
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
# 返回了oslo_messaging.rpc.client.RPCClient对象
return messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
self.client.prepare(version=version) ,用于准备rpc环境的上下文,返回oslo_messaging.rpc.client._CallContext对象。
而oslo_messaging.rpc.client._CallContext继承了oslo_messaging.rpc.client._BaseCallContext。_BaseCallContext有两个重要的RPCClient方法,分别是call和cast。
oslo_messaging.rpc.client._BaseCallContext#cast:
def cast(self, ctxt, method, **kwargs):
"""Invoke a method and return immediately. See RPCClient.cast()."""
# 对request消息做格式序列化
msg = self._make_message(ctxt, method, kwargs)
# 对rpc上下文做格式序列化
msg_ctxt = self.serializer.serialize_context(ctxt)
# 检查target的版本是否正确
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, msg_ctxt, msg, retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
cast方法就是直接发出序列化的消息到target,不需要接收返回值。这属于异步调用。
oslo_messaging.rpc.client._BaseCallContext#call:
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
if self.target.fanout:
raise exceptions.InvalidTarget('A call cannot be used with fanout',
self.target)
msg = self._make_message(ctxt, method, kwargs)
msg_ctxt = self.serializer.serialize_context(ctxt)
timeout = self.timeout
if self.timeout is None:
timeout = self.conf.rpc_response_timeout
self._check_version_cap(msg.get('version'))
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
return self.serializer.deserialize_entity(ctxt, result)
call发出了rpc消息,在timeout超时时间内,接收到响应信息并反序列化后返回。这是同步调用。
call和cast都是调用self.transport._send,我们来看看self.transport._send方法。
oslo_messaging.transport.Transport#_send
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
return self._driver.send(target, ctxt, message,
wait_for_reply=wait_for_reply,
timeout=timeout, retry=retry)
而实际还是通过驱动的rabbitmq实现,即调用oslo_messaging._drivers.impl_rabbit.RabbitDriver的send方法,而这个方法继承自oslo_messaging._drivers.amqpdriver.AMQPDriverBase的send方法,send又调用了_send。
oslo_messaging._drivers.amqpdriver.AMQPDriverBase#_send:
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None,
envelope=True, notify=False, retry=None):
msg = message
# 如果wait_for_reply=True,等待回复。修改msg的数据结构。
if wait_for_reply:
# 生成msg_id。
# uuid.uuid4由伪随机数得到,转换16进制。可见参考文档《Python使用UUID库生成唯一ID》
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
# _get_reply_q()设置回复信息的msgid,创建一个用于监听回复消息的socket连接。具体见下文分析
msg.update({'_reply_q': self._get_reply_q()})
# msg结构体增加UNIQUE_ID,作为唯一性标识,避免重复msg
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
# 把ctxt上下文整合进msg
rpc_amqp.pack_context(msg, ctxt)
# 对msg做序列化
if envelope:
msg = rpc_common.serialize_msg(msg)
if wait_for_reply:
# msg_id加入监听队列,用于接受返回值,msg_id就是监听的key
self._waiter.listen(msg_id)
log_msg = "CALL msg_id: %s " % msg_id
else:
log_msg = "CAST unique_id: %s " % unique_id
try:
# 创建一个用于发送消息的socket连接
with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
if notify:
exchange = self._get_exchange(target)
log_msg += "NOTIFY exchange '%(exchange)s'" \
" topic '%(topic)s'" % {
'exchange': exchange,
'topic': target.topic}
LOG.debug(log_msg)
conn.notify_send(exchange, target.topic, msg, retry=retry)
elif target.fanout:
log_msg += "FANOUT topic '%(topic)s'" % {
'topic': target.topic}
LOG.debug(log_msg)
conn.fanout_send(target.topic, msg, retry=retry)
else:
topic = target.topic
exchange = self._get_exchange(target)
if target.server:
topic = '%s.%s' % (target.topic, target.server)
# 例如:exchange 'openstack' topic 'cinder-scheduler'
log_msg += "exchange '%(exchange)s'" \
" topic '%(topic)s'" % {
'exchange': exchange,
'topic': topic}
LOG.debug(log_msg)
# 创建exchange,发送给publisher。
conn.topic_send(exchange_name=exchange, topic=topic,
msg=msg, timeout=timeout, retry=retry)
if wait_for_reply:
# oslo_messaging._drivers.amqpdriver.ReplyWaiter#wait 等待rpc返回值
result = self._waiter.wait(msg_id, timeout)
# 如果返回值是个异常类型,则抛出
if isinstance(result, Exception):
raise result
return result
finally:
if wait_for_reply:
self._waiter.unlisten(msg_id)
def _get_reply_q(self):
# 其实是with threading.Lock(),加线程锁
with self._reply_q_lock:
if self._reply_q is not None:
return self._reply_q
reply_q = 'reply_' + uuid.uuid4().hex
# 创建监听模式的连接。oslo_messaging定义了两种PURPOSE用于创建Connection,'listen'和 'send'。'listen'用于读socket,'send'用于写socket。
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
# 初始化回复监听器
self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
self._reply_q = reply_q
self._reply_q_conn = conn
return self._reply_q
oslo_messaging._drivers.impl_rabbit.Connection#topic_send
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
# 创建kombu.entity.Exchange对象
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.amqp_durable_queues,
auto_delete=self.amqp_auto_delete)
# 其实调用的是oslo_messaging._drivers.impl_rabbit.Connection#_publish,发布消息
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, timeout=timeout,
retry=retry)
oslo_messaging._drivers.impl_rabbit.Connection#_publish
def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""Publish a message."""
# 检查exchange有没有在_declared_exchanges队列,如果没有,就加入。
# _declared_exchanges适用于存储exchanges的队列,避免不必要的exchange重新定义。如果connection被重置了,Connection._set_current_channel也会对_declared_exchanges做重置。
if not (exchange.passive or exchange.name in self._declared_exchanges):
exchange(self.channel).declare()
self._declared_exchanges.add(exchange.name)
# NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached
with self._transport_socket_timeout(timeout):
# 调用kombu.messaging.Producer#publish,这里就不深入分析了。
self._producer.publish(msg,
exchange=exchange,
routing_key=routing_key,
expiration=timeout,
compression=self.kombu_compression)
oslo_messaging里除了topic_send,还定义了direct_send、fanout_send两种发送方法。三者对应了AMQP协议中Exchange的3种类型:Direct, Topic, Fanout。通过代码看,它们的实现都是先定义一个exchange,然后通过oslo_messaging._drivers.impl_rabbit.Connection#_ensure_publishing方法,最后交给kombu.messaging.Producer#publish发布消息。
cinder中有用到的exchange:
exchange_name | exchange_type |
---|---|
cinder-volume.localhost.localdomain@NetAppIscsiBackend_fanout | fanout |
cinder-volume.localhost.localdomain@ceph_fanout | fanout |
cinder-volume_fanout | fanout |
cinder-scheduler_fanout | fanout |
cinder-volume.localhost.localdomain@ceph-image_fanout | fanout |
openstack | topic |
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
durable=False,
auto_delete=True,
passive=True)
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
exchange = kombu.entity.Exchange(name='%s_fanout' % topic,
type='fanout',
durable=False,
auto_delete=True)
self._ensure_publishing(self._publish, exchange, msg, retry=retry)
oslo_messaging._drivers.amqpdriver.ReplyWaiter#wait:
def wait(self, msg_id, timeout):
# NOTE(sileht): for each msg_id we receive two amqp message
# first one with the payload, a second one to ensure the other
# have finish to send the payload
# NOTE(viktors): We are going to remove this behavior in the N
# release, but we need to keep backward compatibility, so we should
# support both cases for now.
timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
final_reply = None
ending = False
while not ending:
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
try:
message = self.waiters.get(msg_id, timeout=timeout)
except moves.queue.Empty:
self._raise_timeout_exception(msg_id)
reply, ending = self._process_reply(message)
if reply is not None:
# NOTE(viktors): This can be either first _send_reply() with an
# empty `result` field or a second _send_reply() with
# ending=True and no `result` field.
final_reply = reply
return final_reply
其它
oslo.versionedobjects
The oslo.versionedobjects library provides a generic versioned object model that is RPC-friendly, with inbuilt serialization, field typing, and remotable method calls. It can be used to define a data model within a project independent of external APIs or database schema for the purposes of providing upgrade compatibility across distributed services.