1,摘要
从安装环境,配置入门,到HelloWorld实操,各种类型消息传递的演示代码,原理介绍,答疑解惑,面试题,全面介绍RabbitMQ消息队列。
RabbitMQ集群搭建另外一篇文章介绍。
2,内容
2.1 Ubuntu 16.04 安装 RabbitMQ
参考:
(1)Ubuntu 16.04 RabbitMq 安装与运行(安装篇)【成功】
https://blog.csdn.net/qq_22638399/article/details/81704372
2.1.1 安装 Erlang
由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang,执行命令:
apt-get install erlang-nox # 安装erlang
erl # 查看relang语言版本,成功执行则说明relang安装成功
2.1.2 安装 RabbitMQ 3.5.7
2,添加公钥
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
3.更新软件包
apt-get update
4.安装 RabbitMQ
apt-get install erlang #安装erlang。
apt-get install rabbitmq-server #安装成功自动启动。
通过系统默认源安装得到的rabbitmq的版本是3.5.7。
检查rabbitmq的版本:
rabbitmqctl status |grep rabbit #检查rabbit版本
5.查看 RabbitMq状态
systemctl status rabbitmq-server #Active: active (running) 说明处于运行状态
# service rabbitmq-server status 用service指令也可以查看,同systemctl指令
6.启动、停止、重启
service rabbitmq-server start # 启动
service rabbitmq-server stop # 停止
service rabbitmq-server restart # 重启
执行了上面的步骤,rabbitMq已经安装成功。
2.1.2 安装 RabbitMQ 3.8.4
(1)如果之前安装过RabbitMQ,此处需要卸载掉。
apt-get remove rabbitmq
apt-get purge erlang
apt-get autoremove
Erlang跟RabbitMQ匹配,才能安装得成功。
Erlang Release Series | Apt Repositories that provide it | Notes |
---|---|---|
23.x | Debian packages of Erlang by Team RabbitMQErlang Solutions | Supported starting with 3.8.4. See Erlang compatibility guide. |
(2)Erlang安装最新版本 (20+版本):
配置源
echo "deb http://packages.erlang-solutions.com/ubuntu trusty contrib" | tee -a /etc/apt/sources.list.d/erlang_solutions.list
导入key
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc |apt-key add -
更新和安装
apt-get update
apt-get install erlang-nox
(3)下载RabbitMQ版本。
wget -c https://bintray.com/rabbitmq/debian/download_file?file_path=pool%2Frabbitmq-server%2Frabbitmq-server_3.8.4-1_all.deb
mv download_file?file_path=pool%2Frabbitmq-server%2Frabbitmq-server_3.8.4-1_all.deb rabbitmq-server_3.8.4-1_all.deb
安装RabbitMQ3.8.4软件。
apt get install socat
dpkg -i ./rabbitmq-server_3.8.4-1_all.deb
(4)查看 RabbitMq状态
systemctl status rabbitmq-server #Active: active (running) 说明处于运行状态
# service rabbitmq-server status 用service指令也可以查看,同systemctl指令
(5)启动、停止、重启
service rabbitmq-server start # 启动
service rabbitmq-server stop # 停止
service rabbitmq-server restart # 重启
执行了上面的步骤,rabbitMq已经安装成功。
2.1.3 启用 RabbitMQ web 管理插件
- 启用 web端可视化操作界面,我们还需要配置Management Plugin插件
rabbitmq-plugins enable rabbitmq_management # 启用插件
service rabbitmq-server restart # 重启
注意:RabbitMQ 3.3 及后续版本,guest 只能在服务本机登录。
8.查看用户
rabbitmqctl list_users
9.添加管理用户
rabbitmqctl add_user admin [yourpassword] # 增加普通用户,密码为admin
rabbitmqctl set_user_tags admin administrator # 给普通用户分配管理员角色
10.设置权限,指定允许访问的vhost以及write/read
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin "." "." ".*"
rabbitmqctl list_permissions -p / #查看vhost(/)允许哪些用户访问
-
修改配置
在/etc/rabbitmq 目录下面增加文件
rabbitmq.config
,编辑内容如下,loopback_users表示guest用户只能本地访问,admin 账户能够外网访问。[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, [“guest”]}]}
].
**端口信息:**
1. client端通信口amqp: 5672
2. http管理口15672
3. server间内部通信口25672
4. erlang发现口:4369
想要修改默认端口可修改,修改安装目录下 etc/rabbitmq.config文件。
**配置相关信息:**
=INFO REPORT==== 22-Jan-2021::11:32:41 ===
node : rabbit@JD3
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config
cookie hash : VrYYogHAkySebZfCPZM8DQ==
log : /var/log/rabbitmq/rabbit@JD3.log
sasl log : /var/log/rabbitmq/rabbit@JD3-sasl.log
database dir : /var/lib/rabbitmq/mnesia/rabbit@JD3
-
重启服务器
systemctl restart rabbitmq-server
ok,你可以在你的浏览器上输入:http://服务器Ip:15672/ 来访问你的rabbitmq监控页面。使用刚刚添加的新用户登录。
2.1.4 管理命令参考
命令全集:
Commands:
stop [<pid_file>]
stop_app
start_app
wait <pid_file>
reset
force_reset
rotate_logs <suffix>
join_cluster <clusternode> [--ram]
cluster_status
change_cluster_node_type disc | ram
forget_cluster_node [--offline]
rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...]
update_cluster_nodes clusternode
force_boot
sync_queue queue
cancel_sync_queue queue
purge_queue queue
set_cluster_name name
add_user <username> <password>
delete_user <username>
change_password <username> <newpassword>
clear_password <username>
authenticate_user <username> <password>
set_user_tags <username> <tag> ...
list_users
add_vhost <vhostpath>
delete_vhost <vhostpath>
list_vhosts [<vhostinfoitem> ...]
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
clear_permissions [-p <vhostpath>] <username>
list_permissions [-p <vhostpath>]
list_user_permissions <username>set_parameter [-p <vhostpath>] <component_name> <name> <value>
clear_parameter [-p <vhostpath>] <component_name> <key>
list_parameters [-p <vhostpath>]
set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
clear_policy [-p <vhostpath>] <name>
list_policies [-p <vhostpath>]list_queues [-p <vhostpath>] [<queueinfoitem> ...]
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
list_connections [<connectioninfoitem> ...]
list_channels [<channelinfoitem> ...]
list_consumers [-p <vhostpath>]
status
environment
report
eval <expr>close_connection <connectionpid> <explanation>
trace_on [-p <vhost>]
trace_off [-p <vhost>]
set_vm_memory_high_watermark <fraction>
set_vm_memory_high_watermark absolute <memory_limit_in_bytes>
具体命令信息:
在rabbitmq的内部数据库添加用户;
add_user删除一个用户;
delete_user改变用户密码(也是改变web管理登陆密码);
change_password清除用户的密码,该用户将不能使用密码登陆,但是可以通过SASL登陆如果配置了SASL认证;
clear_password设置用户tags;
set_user_tags …列出用户;
list_users创建一个vhosts;
add_vhost删除一个vhosts;
delete_vhost列出vhosts;
list_vhosts [ …]针对一个vhosts给用户赋予相关权限;
set_permissions [-p ]清除一个用户对vhosts的权限;
clear_permissions [-p ]列出哪些用户可以访问该vhosts;
list_permissions [-p ]-
列出该用户的访问权限;
list_user_permissionsset_parameter [-p ] <component_name>
clear_parameter [-p ] <component_name>
list_parameters [-p ] 获取RabbitMQ的版本号
rabbitmqctl status | grep rabbit
2.2 RabbitMQ配置
参考:https://www.rabbitmq.com/configure.html
2.2.1 修旧配置格式如何区分
RabbitMQ新的版本(V3.7以后)支持类似ini,sysctrl配置文件格式的配置文件。这个文件被命名为rabbitmq.conf。
Configuration File | Format Used | Purpose |
---|---|---|
rabbitmq.conf | New style format (sysctl or ini-like) | Primary configuration file. Should be used for most settings. It is easier for humans to read and machines (deployment tools) to generate. Not every setting can be expressed in this format. |
advanced.config | Classic (Erlang terms) | A limited number of settings that cannot be expressed in the new style configuration format, such as LDAP queries. Only should be used when necessary. |
rabbitmq-env.conf (rabbitmq-env.conf.bat on Windows) | Environment variable pairs | Used to set environment variables relevant to RabbitMQ in one place. |
sysctl 格式举例: rabbitmq.conf
# A new style format snippet. This format is used by rabbitmq.conf files.
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
经典格式举例: rabbitmq.config
%% A classic format snippet, now used by advanced.config files.
[
{rabbit, [{ssl_options, [{cacertfile, "/path/to/ca_certificate.pem"},
{certfile, "/path/to/server_certificate.pem"},
{keyfile, "/path/to/server_key.pem"},
{verify, verify_peer},
{fail_if_no_peer_cert, true}]}]}
].
2.2.2 典型配置文件
新格式:
[rabbitmq.conf]
https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/rabbitmq.conf.example
经典格式:
[advanced.config]
https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/advanced.config.example
有些配置设置是不可能或者很难通过sysctl格式进行配置。因此,可以使用Erlang术语配置格式的附加配置文件,该文件通常被命名为advanced.config。它将与rabbitmq.conf中提供的配置合并。
经典格式:
[rabbitmq.config.example]
https://github.com/rabbitmq/rabbitmq-server/blob/v3.7.x/deps/rabbit/docs/rabbitmq.config.example
更多配置说明可参考文章《RabbitMQ系列(六)RabbitMQ的配置》
2.2.3 rabbitmq.config详细配置参数
Key | Documentation |
---|---|
tcp_listeners | 用于监听 AMQP连接的端口列表(无SSL). 可以包含整数 (即"监听所有接口")或者元组如 {"127.0.0.1", 5672} 用于监听一个或多个接口.Default: [5672] listeners.tcp.default = 5672
|
num_tcp_acceptors | 接受TCP侦听器连接的Erlang进程数。Default: 10 num_acceptors.tcp = 10
|
handshake_timeout | AMQP 0-8/0-9/0-9-1 handshake (在 socket连接和SSL 握手之后)的最大时间, 毫秒为单位.Default: 10000 handshake_timeout = 10000
|
ssl_listeners | 如上所述,用于SSL连接。Default: [] |
num_ssl_acceptors | 接受SSL侦听器连接的Erlang进程数。Default: 1 num_acceptors.ssl = 10
|
ssl_options | SSL配置.参考SSL documentation.Default: [] ssl_options = none
|
ssl_handshake_timeout | SSL handshake超时时间,毫秒为单位.Default: 5000 ssl_handshake_timeout = 5000
|
vm_memory_high_watermark | 流程控制触发的内存阀值.相看memory-based flow control 文档.Default: 0.4 vm_memory_high_watermark.relative = 0.6 vm_memory_high_watermark.absolute = 2GB
|
vm_memory_high_watermark_paging_ratio | 高水位限制的分数,当达到阀值时,队列中消息消息会转移到磁盘上以释放内存. 参考memory-based flow control 文档.Default: 0.5 vm_memory_high_watermark_paging_ratio = 0.5
|
disk_free_limit | RabbitMQ存储数据分区的可用磁盘空间限制.当可用空间值低于阀值时,流程控制将被触发.此值可根据RAM的总大小来相对设置 (如.{mem_relative, 1.0}).此值也可以设为整数(单位为bytes)或者使用数字单位(如."50MB").默认情况下,可用磁盘空间必须超过50MB.参考 Disk Alarms 文档.Default: 50000000 disk_free_limit.relative = 3.0 disk_free_limit.absolute = 2GB
|
log_levels | 控制日志的粒度.其值是日志事件类别(category)和日志级别(level)成对的列表.level 可以是 'none' (不记录日志事件), 'error' (只记录错误), 'warning' (只记录错误和警告), 'info' (记录错误,警告和信息), or 'debug' (记录错误,警告,信息以及调试信息).目前定义了4种日志类别. 它们是:channel -针对所有与AMQP channels相关的事件connection - 针对所有与网络连接相关的事件federation - 针对所有与federation相关的事件mirroring -针对所有与 mirrored queues相关的事件Default: [{connection, info}] log.file.level = info
|
frame_max | 与客户端协商的允许最大frame大小. 设置为0表示无限制,但在某些QPid客户端会引发bug. 设置较大的值可以提高吞吐量;设置一个较小的值可能会提高延迟. Default: 131072 |
channel_max | 与客户端协商的允许最大chanel大小. 设置为0表示无限制.该数值越大,则broker使用的内存就越高.Default: 2047channel_max = 2047
|
channel_operation_timeout | Channel 操作超时时间(毫秒为单位) (内部使用,因为消息协议的区别和限制,不暴露给客户端).Default: 5000 channel_operation_timeout = 15000
|
heartbeat | 表示心跳延迟(单位为秒) ,服务器将在connection.tune frame中发送.如果设置为 0, 心跳将被禁用. 客户端可以不用遵循服务器的建议, 查看 AMQP reference 来了解详情. 禁用心跳可以在有大量连接的场景中提高性能,但可能会造成关闭了非活动连接的网络设备上的连接落下.Default: 60 (3.5.5之前的版本是580) heartbeat = 60
|
default_vhost | 当RabbitMQ从头开始创建数据库时创建的虚拟主机. amq.rabbitmq.log交换器会存在于这个虚拟主机中.Default: <<"/">> default_vhost = /
|
default_user | RabbitMQ从头开始创建数据库时,创建的用户名.Default: <<"guest">> default_user = guest
|
default_pass | 默认用户的密码.Default: <<"guest">> default_pass = guest
|
default_user_tags | 默认用户的Tags.Default: [administrator] default_user_tags.administrator = true
|
default_permissions | 创建用户时分配给它的默认Permissions .Default: [<<".">>, <<".">>, <<".*">>] default_permissions.configure = .* default_permissions.read = .* default_permissions.write = .*
|
loopback_users | 只能通过环回接口(即localhost)连接broker的用户列表如果你希望默认的guest用户能远程连接,你必须将其修改为[].Default: [<<"guest">>] ``loopback_users = none |
cluster_nodes | 当节点第一次启动的时候,设置此选项会导致集群动作自动发生. 元组的第一个元素是其它节点想与其建立集群的节点. 第二个元素是节点的类型,要么是disc,要么是ramDefault: {[], disc} cluster_formation.classic_config.nodes.1 = rabbit@hostname1 cluster_formation.classic_config.nodes.2 = rabbit@hostname2
|
server_properties | 连接时向客户端声明的键值对列表Default: [] |
collect_statistics | 统计收集模式。主要与管理插件相关。选项:none (不发出统计事件)coarse (发出每个队列 /每个通道 /每个连接的统计事件)fine (也发出每个消息统计事件)你自已可不用修改此选项.Default: none `` |
collect_statistics_interval | 统计收集时间间隔(毫秒为单位). 主要针对于 management plugin.Default: 5000 collect_statistics_interval = 5000
|
auth_mechanisms | 提供给客户端的SASL authentication mechanisms.Default: ['PLAIN', 'AMQPLAIN'] auth_mechanisms.1 = PLAIN auth_mechanisms.2 = AMQPLAIN
|
auth_backends | 用于 authentication / authorisation backends 的列表. 此列表可包含模块的名称(在模块相同的情况下,将同时用于认证来授权)或像{ModN, ModZ}这样的元组,在这里ModN将用于认证,ModZ将用于授权.在2元组的情况中, ModZ可由列表代替,列表中的所有元素必须通过每个授权的确认,如{ModN, [ModZ1, ModZ2]}.这就允许授权插件进行组合提供额外的安全约束.除rabbit_auth_backend_internal外,其它数据库可以通常 plugins来使用.Default: [rabbit_auth_backend_internal] |
reverse_dns_lookups | 设置为true,可让客户端在连接时让RabbitMQ 执行一个反向DNS查找, 然后通过 rabbitmqctl 和 管理插件来展现信息. Default: false |
delegate_count | 内部集群通信中,委派进程的数目. 在一个有非常多核的机器(集群的一部分)上,你可以增加此值.Default: 16 |
trace_vhosts | tracer内部使用.你不应该修改.Default: [] |
tcp_listen_options | 默认socket选项. 你可能不想修改这个选项.Default:[{backlog, 128}, {nodelay, true}, {exit_on_close, false}] |
hipe_compile | 将此选项设置为true,将会使用HiPE预编译部分RabbitMQ,Erlang的即时编译器. 这可以增加服务器吞吐量,但会增加服务器的启动时间. 你可以看到花费几分钟延迟启动的成本,就可以带来20-50% 更好性能.这些数字与高度依赖于工作负载和硬件.HiPE 支持可能没有编译进你的Erlang安装中.如果没有的话,启用这个选项,并启动RabbitMQ时,会看到警告消息. 例如, Debian / Ubuntu 用户需要安装erlang-base-hipe 包.HiPE并非在所有平台上都可用,尤其是Windows.在 Erlang/OTP 17.5版本之前,HiPE有明显的问题 . 对于HiPE,使用最新的OTP版本是高度推荐的.Default: false |
cluster_partition_handling | 如何处理网络分区.可用模式有:ignorepause_minority{pause_if_all_down, [nodes], ignore | autoheal}where [nodes] is a list of node names (ex: ['rabbit@node1', 'rabbit@node2'])autoheal参考documentation on partitions 来了解更多信息Default: ignore |
cluster_keepalive_interval | 节点向其它节点发送存活消息和频率(毫秒). 注意,这与 net_ticktime是不同的;丢失存活消息不会引起节点掉线Default: 10000 |
queue_index_embed_msgs_below | 消息大小在此之下的会直接内嵌在队列索引中. 在修改此值时,建议你先阅读 persister tuning 文档.Default: 4096 |
msg_store_index_module | 队列索引的实现模块. 在修改此值时,建议你先阅读 persister tuning 文档.Default: rabbit_msg_store_ets_index |
backing_queue_module | 队列内容的实现模块. 你可能不想修改此值.Default: rabbit_variable_queue |
msg_store_file_size_limit | Tunable value for the persister. 你几乎肯定不应该改变此值。Default: 16777216 |
mnesia_table_loading_timeout | 在集群中等待使用Mnesia表可用的超时时间。Default: 30000 |
queue_index_max_ journal_entries | Tunable value for the persister. 你几乎肯定不应该改变此值。Default: 65536 |
queue_master_locator | Queue master 位置策略.可用策略有:<<"min-masters">><<"client-local">><<"random">>查看documentation on queue master location 来了解更多信息.Default: <<"client-local">> |
2.3 RabbitMQ实操入门
参考:
(1)官方入门文档 https://www.rabbitmq.com/getstarted.html
(2)中文译文入门:http://raylei.cn/index.php/archives/48/
(3) rabbitmq官方的六种工作模式 https://blog.csdn.net/qq_33040219/article/details/82383127
(4)RABBITMP的GO消息接口:https://godoc.org/github.com/streadway/amqp
(5)所有的样例代码
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go
2.3.1 HelloWorld 入门
(1)实现原理
我们将实现一个使用RabbitMQ的Hello World示例,其中包含两个小程序,一个用于发送消息的send.go, 一个用于接收消息的receive.go.
在下面的图示中:"P"表示生产者,"C"表示消费者,中间的表格表示队列, 即为RabbitMQ中的消息池。
关于Go语言使用RabbitMQ的API,可参考GO amqp.
我们在这里使用amqp库,因此我们需要先安装GO amqp客户端。
首先,使用go get安装amqp:
go get github.com/streadway/amqp
(2)Sending 发送端代码实现
我们使用send.go称把消息生产者,receive.go成为消费者。send.go连接到RabbitMQ后,发送一条消息后便退出。
首先,需要导入amqp:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
然后,我们编写一个通用的辅助方法,用来检查每一步amqp调用的结果。另外,在Go语言中经常需要使用if语句来检查操作结果,为了避免在代码中到处散落if(err != nil)语句,可以使用下列方法:
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
下面,我们来实现main函数:
1,连接RabbitMQ服务器:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
上面代码会建立一个socket连接,处理一些协议转换及版本对接和登录授权的问题。
func Dial接口说明:
func Dial(url string) (*Connection, error)
Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the handshake deadline to 30 seconds. After handshake, deadlines are cleared.Dial uses the zero value of tls.Config when it encounters an amqps:// scheme. It is equivalent to calling DialTLS(amqp, nil).
2,建立连接之后,我们需要创建一个通道channel,之后我们的大多数API操作都是围绕通道来实现的:
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
func (Connection) Channel接口说明:*
func (c Connection) Channel() (Channel, error)通道打开唯一的,并发的服务通道,用于处于一堆的AMQP消息。当该方法在接收端遇到任何错误时,会导致接收端失效,并且一个新的通道会被打开。
Channel opens a unique, concurrent server channel to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened.
3, 最后,我们需要定义一个队列用来存储、转发消息,然后我们的sender只需要将消息发送到这个队列中,就完成了消息的publish操作。队列Queue待名字,区分唯一性。
q, err := ch.QueueDeclare(
"hello", //name
false, //durable
false, //delete when unused
false, //exclusive
false, //no wait
nil, //arguments
)
failOnError(err, "Failed to declare q queue")
body := "Hello"
err = ch.Publish(
"", //exchange
q.Name, // routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
Body : []byte(body),
}
)
failOnError(err, "Failed to publish a message")
func (Channel) QueueDeclare接口:*
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)QueueDeclare声明一个保存和传递消息的队列queue。 如果一个channel不存在一个queque,则Declaring函数创建一个queue,如果channel已经存在了一个queue,该函数用于确认存在queque的参数是否同声明的。
Every queue declared gets a default binding to the empty exchange "" which has the type "direct" with the routing key matching the queue's name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})Delivery Exchange Key Queue
key: alerts -> "" -> alerts -> alerts
The queue name may be empty, in which case the server will generate a unique name which will be returned in the Name field of Queue struct.Durable and Non-Auto-Deleted queues will survive server restarts and remain when there are no remaining consumers or bindings. Persistent publishings will be restored in this queue on server restart. These queues are only able to be bound to durable exchanges.
Non-Durable and Auto-Deleted queues will not be redeclared on server restart and will be deleted by the server after a short time when the last consumer is canceled or the last consumer's channel is closed. Queues with this lifetime can also be deleted normally with QueueDelete. These durable queues can only be bound to non-durable exchanges.
Non-Durable and Non-Auto-Deleted queues will remain declared as long as the server is running regardless of how many consumers. This lifetime is useful for temporary topologies that may have long delays between consumer activity. These queues can only be bound to non-durable exchanges.
Durable and Auto-Deleted queues will be restored on server restart, but without active consumers will not survive and be removed. This Lifetime is unlikely to be useful.
Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.
When noWait is true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.
When the error return value is not nil, you can assume the queue could not be declared with these parameters, and the channel will be closed.
func (Channel) Publish接口说明:*
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
Publish sends a Publishing from the client to an exchange on the server.从客户端发布一个消息给服务器端的交换机exchange。
When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is because every declared queue gets an implicit route to the default exchange.
Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.
Publishings can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing key, or when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.
This can return an error when the channel, connection or socket is closed. The error or lack of an error does not indicate whether the server has received this publishing.
It is possible for publishing to not reach the broker if the underlying socket is shut down without pending publishing packets being flushed from the kernel buffers. The easy way of making it probable that all publishings reach the server is to always call Connection.Close before terminating your publishing application. The way to ensure that all publishings reach the server is to add a listener to Channel.NotifyPublish and put the channel in confirm mode with Channel.Confirm. Publishing delivery tags and their corresponding confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1.
type Publishing类型说明
type Publishing struct {
// Application or exchange specific fields,
// the headers exchange will inspect this field.
Headers Table// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
Priority uint8 // 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id// The application specific payload of the message
Body []byte
}
Publishing captures the client message sent to the server. The fields outside of the Headers table included in this struct mirror the underlying fields in the content frame. They use native types for convenience and efficiency.
定义队列操作具备幂等性,也就是说多次重复定义,相同名称的队列只会创建一个。发送给队列的内容是byte数组,将任意格式数据转换成byte数组是一件很简单的事情,因此对于任何格式的数据,要将它发送到队列中是很容易的。
(3) Receiving 接收端代码实现
以上完成了发送消息的程序,现在来实现从RabbitMQ队列中接收消息的消费者程序。不同于消息发送程序只需要将单一的消息推送至队列后推出,消息接收者需要保持一个监听程序从队列中不断的接收消息。
1,首先,同样的导入包和实现辅助函数:
package main
import {
"fmt"
"log"
"github.com/streadway/amqp"
}
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
2,接着,与生产者一样,打开连接并创建通道,注意这里的参数必须与send中的queue name相一致,这样才能实现发送/接受的配对。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to server")
defer conn.Close();
ch, err := conn.Channel()
failOnError(err, "Failed to connect to channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", //name
false, //durable
false, //delete when usused
false, // exclusive
false, //no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
一般来说,接收消息的程序会先于发送者运行,因此在这里我们先定义一个queue,确保后面发送者连接到这个queue时,当前接收消息程序以运行。
3,接下来,需要RabbitMQ服务器让它将消息分发到我们的消费者程序中,消息转发操作是异步执行的,这里使用goroutine来完成从队列中的读取消息操作:
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf("Received a message : %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages, To exit press CTRL+C")
<-forever
func (*Channel) Consume
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
Consume immediately starts delivering queued messages.
Begin receiving on the returned chan Delivery before any other operation on the Connection or Channel.
Continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must range over the chan to ensure all deliveries are received. Unreceived deliveries will block all methods on the same connection.
All deliveries in AMQP must be acknowledged. It is expected of the consumer to call Delivery.Ack after it has successfully processed the delivery. If the consumer is cancelled or the channel or connection is closed any unacknowledged deliveries will be requeued at the end of the same queue.
The consumer is identified by a string that is unique and scoped for all consumers on this channel. If you wish to eventually cancel the consumer, use the same non-empty identifier in Channel.Cancel. An empty string will cause the library to generate a unique identity. The consumer identity will be included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details.
When exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ.
It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.
When noWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.
Optional arguments can be provided that have specific semantics for the queue or server.
Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will be dropped.
When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed.
(4) Running运行
首先,在命令行中先运行消费者:
go run receive.go
当前程序会一直监听RabbitMQ的队列消息,一旦接收到消息后会直接打印出来,使用Ctrl+C可以终止程序;
接着,在另一个命令终端中运行生产者:
go run send.go
可以看到receive.go程序接收到了当前信息:
发送端:
接收端:
如上就是本系列的RabbitMQ的第一个例子。你可以在这里下载完整的send.go, receive.go文件。
2.3.2 工作队列
本篇将实现一个将耗时任务分发到多个消费者程序的工作队列。
工作队列的主要思想是避免对资源密集型任务处理时的等待,而是先将任务压入队列,后期再进行计划处理。我们将任务封装成消息发送给队列,由队列程序按策略分发到所有的在线工作者程序执行。当有多个工作程序同时在线时,多项任务同时被多个不同的工作者处理便成为可能。
在Web应用程序领域,如果需要在一个HTTP短连接中完成一些复杂的耗时任务时,工作队列的思想能大幅提高处理效率而带来了更好的用户体验。
(1)准备
在上一篇介绍中我们发送一个固定的字符串“Hello world"到队列,然后在接收程序中打印出来。这里没有实现诸如图片大小的调整、PDF文件的渲染等真实的复杂任务,而是用特定字符串来表示复杂任务,导致消息处理程序忙碌。任务处理程序通过time.Sleep函数让线程睡眠来模拟复杂度,以一连串的字符"."来表示任务的复杂度,每一个点表示停顿1秒钟,如"Hello..."表示任务耗时3秒钟。
还是在之前例子的send.go文件上进行修改,让程序通过命令行将任意个消息参数传递到队列,姑且将新文件命名为new_task.go:
body := bodyFrom(os.Args)
err = ch.Publish(
"", //exchange
q.Name, //routing key
false, //mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
}
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
同样的,receive.go文件也需要进行修改,根据消息体中"."的个数来模拟任务的耗时长度。该文件的任务还是从队列中取出一个任务并执行,我们姑且称之为work.go:
msgs, err := ch.Consume(
q.Name, //queue
"", //consumer
true, //auto-ack
false, //exclusive
false, //no-local
false, //no-wait
nil, //args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t* time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for message. To exit press CTRL+C")
<-forever
好了,到此为止,我们先来看看上述的模拟是否成功:
先运行worker.go
#shell 1
go run worker.go
然后另起一个终端运行new_task.go
#shell 2
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....
接收端的信息:
接收端打印Done输出分别有1秒,2秒,3秒,4秒,5秒的延迟。
(2)轮询调度(Round-robin dispatching)
工作队列的优势是能轻松处理多个积压的任务,如果有一个已经堆满的任务队列待处理,只需添加多个消费者,这些消费者便都能对队列进行消耗。
1,首先,想象一下如果同时运行两个worker.go脚本,当生产者不断发送消息到队列时,会出现什么情况?
我们需要三个终端来运行这个小例子,两个运行worker.go,我们将他们看成两个消费者C1和C2.
#shell 1
go run worker.go
#shell 2
go run worker.go
然后在第三个终端中,发送消息到队列,你可以尝试多次,如下:
#shell 3
go run new_task.go First message..
go run new_task.go Second message....
go run new_task.go Third message......
go run new_task.go Fourth message........
go run new_task.go Fifth message..........
...
这种消息分发方式叫做round-robin(轮询调度).
shell1接收端:(3)消息确认
当处理一个长耗时任务时,任务处理程序(消费者)可能由于某种原因以外崩溃,那么此时会发生什么事情呢?在我们目前的代码中,一旦RabbitMQ将消息发送到消费者时就会将其标记并删除,而不会去关心消费者程序是否执行完毕。因此在这种情形下,如果你关闭了一个正在处理某项任务的消费者时,会导致其正在处理的及已分发给它却还没来得及处理的任务丢失。
然而在很多真实情况下,我们并不希望丢失掉任何一条消息,如订单信息、支付信息等。当某一消费者突然崩溃后,我们希望将其未处理完毕的消息转发到其他消费者进行处理,这种思想有如我们常见的主备设置策略。
RabbitMQ提供消息确认机制来确保每一个消息都不会丢失,其原理是当RabbitMQ接收到一个从消费者发出的表明任务已处理完毕的确认包(ack)后,才其从队列中释放删除。
如果某一个消费者突然崩溃(如通道关闭、连接关闭或TCP连接丢失)而没有发出确认包,RabbitMQ将会认为该消息并没有被完全处理,因此会重新将其加入到队列中。如果在此时还有其他消费者在线,那么当前消息也会很快被分发处理掉,这样即使在某些消费者意外掉线关闭的情况下,我们也能确保所有消息会被丢失。
消息确认没有超时机制,RabbitMQ只会在消费者Down掉之后才进行重新分发,因此即使对于某些耗时很长的任务也不会有影响。
在这个Demo里面,我们将Consume()函数的auto-ack参数设为false,然后当任务处理完毕之后通过d.Ack(false)手动发送一个确认消息.
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
这样,即使我们通过Ctrl+C来关闭某一个正在处理消息的消费者,其消息也不会丢失,RabbitMQ马上就会将当前未确认的消息转发的其他消费者处理。
需要注意的是消息确认包的目的地必须是当前消息的接收通道,如果将确认包发送到其他通道时会引发异常。更多的信息科参考doc guide on confirmations.
Forgotten acknowledgment
忘记对消息进行确认是一个比较常见的错误,这个错误很容易犯,但是后果很严重。
当消费者退出后消息会重发,却永远没有确认删除的包,因此RabbitMQ消息越积越多就会吃掉越来越多的内存,最后可能导致崩溃。
对于这种未确认的消息调试,我们可以使用rabbitmqcrl命令来打印message_unacknowledged的内容:
# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
task_queue 5
hello 0
(4)消息持久化
前面我们讲解了当消费者程序Down掉如何保证消息不丢失。可如果是RabbitMQ崩溃呢?消息还能保证不丢失吗?
当RabbitMQ退出或崩溃时,除非你明确地指定,否则所有的队列和消息都会丢失。要做到消息不丢失需满足两个条件:队列和消息的持久化。
首先,要保证队列不会丢失,可将队列声明为持久化:
q, err := ch.QueueDeclare(
"hello", //name
true, //durable
false, //delete when unused
false, //exclusive
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare a queue")
上述代码看起来没有问题,但到目前为止,如果直接就这样运行,那么队列还是无法持久化而导致丢失。这是因为我们之前已经定义了一个名为“Hello”的队列,RabbitMQ不允许创建多个名称相同而参数不同的队列,这个跟函数重载有区别,但这种情况发生时,RabbitMQ会返回错误。既然如此,直接换个名字:task_queue,
q, err := ch.QueueDeclare(
"task_queue", //name
true, //durable
false, //delete when unused
false, //exclusive
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare a queue")
注意:durable参数在生产者和消费者程序中都要指定为True。
现在,task_queue队列即使在RabbitMQ重启之后也不会丢失了。接着就需要实现对消息的持久化,这个也很简单,只需要在amqp.Publishing函数中设置一下amqp.Persistent参数即可:
err = ch.Publishing(
"", //exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
}
)
关于消息持久化
将消息设置为Persistent并不能百分百地完全保证消息不会丢失。虽然RabbitMQ知道要将消息写到磁盘,但在RabbitMQ接收到消息和写入磁盘前还是有个时间空档。
因为RabbitMQ并不会对每一个消息都执行fsync(2),因此消息可能只是写入缓存而不是磁盘。
所以Persistent选项并不是完全强一致性的,但在应付我们的简单场景已经足够。如需对消息完全持久化,可参考publisher confirms.
(5)公平分发
有时候队列的轮询调度并不能满足我们的需求,假设有这么一个场景,存在两个消费者程序,所有的单数序列消息都是长耗时任务而双数序列消息则都是简单任务,那么结果将是一个消费者一直处于繁忙状态而另外一个则几乎没有任务被挂起。当RabbitMQ对此情况却是视而不见,仍然根据轮询来分发消息。
导致这种情况发生的根本原因是RabbitMQ是根据消息的入队顺序进行派发,而并不关心在线消费者还有多少未确认的消息,它只是简单的将第N条消息分发到第N个消费者:
为了避免这种情况,我们可以给队列设置预取数(prefect count)为1。它告诉RabbitMQ不要一次性分发超过1个的消息给某一个消费者,换句话说,就是当分发给该消费者的前一个消息还没有收到ack确认时,RabbitMQ将不会再给它派发消息,而是寻找下一个空闲的消费者目标进行分发。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set Qos")
关于队列长度
NOTE:如果所有的消费者都繁忙,队列可能会被消息填满。你需要注意这种情况,要么通过增加消费者来处理,要么改用其他的策略。
(6)整合上面的代码
我们将上面的片段整合起来,那么new_task.go:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
Github地址 new_task.go.
worker.go文件如下:
package main
import (
"bytes"
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
Github地址:worker.go.
最后,为了验证上面轮询调度、消息持久化和公平分发的特性,你可以多开几个Shell窗口,发几条长耗时的消息,然后停掉某一些worker或重启RabbitMQ就能观察到与之相符的现象。
(7)总结
本篇介绍了通过消息确认机制和设置预取消息长度的方式来实现一个工作队列,而持久化选项的设置可以保证队列和消息在出现消费者崩溃或RabbitMQ重启的异常情况下都不会丢失。
有关于更多amqp.Channel方法和消息属性的内容,可以参考amqp API reference.
2.3.3 发布/订阅模式
在本篇中,我们尝试将同一个消息发送给多个消费者进行处理,这就是广为人知的发布/订阅模式。
本篇通过搭建一个日志系统来阐述发布/订阅模式,它包含两部分内容:一个用于产生日志消息的程序,另一个用于接收和打印消息。
在这个日志系统中,每一份接收者程序的拷贝都能收到消息,因此我们可以轻易地使用一个程序将日志写入磁盘,而另一个程序直接在屏幕显示。
本质上来说,当系统收到一个日志处理请求时,会把这个消息广播给所有的接收者。
(1)Exchanges
之前的介绍中,我们都是以队列为中介进行消息的发送和接收,现在将完整的介绍一下RabbitMQ的消息模式。
对前述内容做一个简单总结:
- 一个producter(生产者)是指用于发送消息的用户程序;
- 一个queue(队列)是用来存储消息的缓冲区;
- 一个consumer(消费者)是用来接收消息的用户程序;
RabbitMQ消息模式的核心内容是一个producter永远不会将消息直接发送给队列。
Producter甚至都不知道其产生的消息会被分发到哪一个队列,实际上,producter只会将消息发送给exchange.Exchange很好理解,类似于一个中转站,它就是将从producter中接收到的消息转发给与之绑定的队列。
当Exchange接收到消息后,它是如何来确定对消息进行处理的呢?是将消息发送到指定的一个队列,还是广播到所有队列,或者是直接将其忽略?
这一切都由Exchange定义是的类型(type)来控制。
RabbitMQ共有四种exchange类型:direct, topic, headers, fanout.我们这里使用的最后一种fanout,现在就让我们来定义一个type,取名logs:
err = ch.ExchangeDeclare(
"logs", //name
"fanout", //type
"true", //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
这个fanout类型的exchange很简单,顾名思义:它将所接收到的消息广播给所有绑定的队列。这也正是日志系统说要做的工作。
查看exchange
可以通过rabbitmqctl命令来查看所有的exchanges:sodu rabbitmqctl list_exchanges
命令执行后,会列出一些amq.*名称的exchanges,这是系统默认存在的,我们现在还用不到这些。
默认的exchange
你可能会感到奇怪,前面例子并没有提及生产者只能将消息发送给exchange,为什么程序仍能将消息发送给队列?原因在于我们使用了一个默认的exchange,代码中就是Publish函数的参数使用了空字符串"":
来看看之前的publish代码:
> err = ch.Publish(
> "", //exchange
> q.Name, //routing key
> false, //mandatory
> false, //immediate
> amqp.Publishing(
> ContentType: "text/plain",
> Body: []byte(body),
> )
> )
使用一个无命名的或默认的exchange,消息将会根据routing_key所指定的参数进行查找,如果存在就会分发到相应的队列。
既然讲到的exchange,那么我们可以使用前面定义的exchange来代替默认值:
err = ch.ExchangeDeclare(
"logs", //name
"fanout", //type
true, //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare an exchange")
body:= bodyForm(os.Args)
err = ch.Publish(
"logs", //exchange
"", //routing key
false, //mandatory
false, //immediate
amqp.Publishing(
ContentType: "text/plain",
Body: []byte(body),
)
)
(2)临时队列
不出意外,你应该还对前面使用过的两个命名队列(hello和task_queue)有印象,在使用命名队列时必须让生产者和消费者都是用同一个名称的队列,否则消息将无法在两者之间进行传递。
但在这里名字不是关心的重点,因为我们的日志系统需要记录所有的消息,而不是其中的一部分。我们比较关心的是消费者程序接收和处理的消息都应该是未处理过的。
为了确保这一点,我们需要两个条件:
首先,无论何时当消费者连接到Rabbit时我们需要一个新的、空的队列,因此就不会存在之前的消息。我们可以通过创建一个随机名字的队列来实现,而更好的方法是:让服务器自己选择一个随机队列给我们。
再者,当我们的消费者程序断开连接时,这个队列要能自动的删除。
在amqp客户端中,当我们将空字符串指定为队列名字时,将会创建一个非持久化的、带有随机命名的队列:
q, err := ch.QueueDeclare(
"", //name
false, //durable
false, //delete when unused
true, //exclusive
false, //no-wait
nil, //arguments
)
当这个函数返回时,RabbitMQ将创建一个带有随机名字的队列,如amq.gen-JzTY20BRgKO-HjmUJj0wLg.
当这个连接被关闭时,队列将会被删除,因为其被定义为独有的(exclusive)。
(3)绑定
到现在,我们已经创建了一个fanout类型的exchange和一个队列,然而exchange并不知道它要哪个队列是应该被分发消息的。因此,我们需要明确的指定exchange和队列队列之间的关系,这个操作称之为绑定。
err = ch.QueueBind(
q.Name, //queue name
"", //routing key
"logs", //exchange
false,
nil
)
现在,logs exchange中的消息就会被分发到我们的队列。
查看绑定列表
仍然可以通过命令来查看:
rabbitmqctl list_bindings
(4)完整的例子
生产者程序看起来跟之前例子区别不大,最重要的不同就是这里将消息发送给名为logs的exchange,而不是直接发送到默认队列。在发送消息时需要提供一个routingKey,但是在fanout类型的exchange中这个值是被忽略的。
那么,emit_log.go的脚本就是:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
Github地址:emit_log.go
需要注意,我们必须在建立了连接之后才能定义exchange,否则会报错。
如果exchange没有绑定任何一个队列,那么消息将会丢失而没有得到处理,但在这个例子里,这种情况是允许的,如果没有任何一个队列来消费这些消息,那么就直接忽略掉就好。
下面是receive_logs.go的代码:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
Github地址:receive_logs.go
如果你想将日志消息保存到文件,只需在命令终端中执行下面的命令:
go run receive_logs.go > logs_from_rabbit.log
如果想直接打印到屏幕上,在另一个终端中执行:
go run receive_logs.go
当然,发送消息的命令如下:
go run emit_log.go
使用rabbitmqctl list_bindings命令可以查看上面代码所创建的绑定关系,如当运行两个receive_logs.go之后,可能会得到如下的结果:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
其结果是很容易理解的:从logs exchange中的消息会被转发到两个由系统命名的队列中。这也正是我们所期望的。
下一节将会介绍如何对消息进行筛选,不监听所有消息,而是监听其中的一个子集。
2.3.4 路由
在本篇中,将介绍如何对消息进行过滤,从而只处理我们感兴趣的消息。如只把一些严重的错误信息写入磁盘,但对所有类型的消息都打印到屏幕。
(1)Binding
前面例子中我们使用了如下绑定:
err = ch.QueueBind(
q.Name, //queue name
"", //routing key
"logs", //exchange
false,
nil)
绑定是交换器(exchange)和队列之间的关系,我们可以简单的理解为:队列对其绑定的交换器的消息感兴趣。
绑定函数Bind()可以指定routing_key参数,为了避免跟之前的Channel.Publish的参数混淆,我们称之为binding_key。下面就是使用了绑定参数的例子:
err = ch.QueueBind(
q.Name, //queue name
"black", //routing key
"logs", //exchange
false,
nil)
绑定参数的作用取决于exchange的类型,前面例子中使用的fanout类型的exchange是会忽略掉这个值的。
(2)Direct exchange
前面文章中的日志系统会将所有消息广播分发到所有的消费者处理程序。现在我们想要扩展成为根据消息的严重程度来过滤分发,如只将严重的error级别的日志写入磁盘,而不写入info和warn类型的日志消息以节省磁盘空间。
前面使用的fanout exchange,只是对消息一味的广播转发,可扩展性差,无法满足我们的需求。所以,我们使用Direct exchange进行替代。Direct exchange的路由算法很简单:就是将exchange的binding_key和消息的routing_key进行比较,如果完全匹配这说明是需要分发的队列。
如下图配置:
在图中,direct exchange X有两个队列与之绑定。队列Q1的binding_key是orange, 而队列Q2有两个binding_key: black,green.
由此,当发送routing_key为orange的消息时会被路由到Q1,而带有black或green的routing_key的消息则会被分发到Q2,其他类型的消息都会被忽略。
(3)多重绑定
Direct exchange会将消息广播至所有匹配的绑定队列,因此很容易实现对同一个binding_key需要分发到多个队列的情况。如图,带有routing_key的消息会被分发到Q1和Q2两个队列。
(4)发送日志
考虑下如何实现?首先需要使用direct类型的exchange替换掉fanout类型,然后在发送消息是用routing_key来表示日志级别,而接收消息的脚本需要指定接收哪些级别的消息。先来看看如何发送日志:
首先创建exchange:
err = ch.ExchangeDeclare(
"logs_direct", //name
"direct", //type
true, //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
然后准备发送消息:
err = ch.ExchangeDeclare(
"logs_direct", //name
"direct", //type
true, //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct", //exchange
severityFrom(os.Args), //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
}
)
为了简单,我们假设消息的级别为"info", "warning", "error"三者中的一个.
(5)订阅
接收消息的程序跟之前的大体相同,只是需要为每一种级别的日志消息新建一个绑定:
q, err := ch.QueueDeclare(
"", //name
false, //durable
false, //delete when unused
true, //exclusive
false, //no-wait
nil, //arguemnts
)
failOnError(err, "Failed to declare queue.")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warnint] [error]", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_direct", //exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
(6)整个文件
emit_log_direct.go文件:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open an channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct", //name
"direct", //type
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct", // exchange
severityFrom(os.Args), //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] sent %s", body)
}
func bodyFrom(args []string) string{
var s string
if(len(args) < 3) || os.Args[2] == "" {
s = "hello"
}else{
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if len(args) < 2 || args[1] == "" {
s = "info"
}else {
s = os.Args[1]
}
return s
}
GitHub地址:emit_log.direct.go
receive_logs_direct.go文件:
package main
import(
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct",
"direct",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", //name
false,
false,
true,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]")
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, //queue name
s, //routing key
"logs_direct", //exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, //name
"", //consumer
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To Exit press Ctrl+c")
<-forever
}
GitHub地址:receive_logs.direct.go
(7)运行结果
将"warning"和"error"级别的消息都写入磁盘,只需运行:
go run receive_logs_direct.go warning error>logs_from_rabbit.log
将所有消息都打印到屏幕:
go run receive_logs_direct.go warning error info
而发送消息:
go run emit_log_direct.go error "this is a log message"
2.3.5 主题交换器
如果想让系统不能仅根据日志级别来定义,还能根据发送日志的源信息来订阅。如unix工具syslog,就是根据日志级别(info/warn/crit..)和设备(auth/cron/kern)来进行路由的。这会提供更多的灵活性,如可以做到监听所有来自'cron'和'kern'设备的error信息。
为了实现这种灵活性,需要来学习下另外一个功能更综合的topic类型的交换器(exchange).
(1)Topic exchange
Topic类型的exchange消息的routing key是有一定限制的,必须是一组使用“.”分开的单词。单词可以是任意的,但是一般来说以能准确的表达功能的为佳。如以下的例子都是合法的:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".Routing key可以是任意多个单词组成,但其总长度不能超过255个字节。
Topic exchange的binding key跟之前的没有太大区别,其逻辑跟direct一样,其接收到的消息会分发到所有与其routing key相匹配的绑定队列。以下两个通配符也可作为binding key使用:
* 表示一个单词
# 表示0或多个单词
看例子:
上图中,消息的routing key用三个单词来表示,依次表示速度、颜色、种类,其形式如:"<speed>.<color>.<species>”。
图中有三个绑定:Q1的绑定键是".orange.", Q2的绑定键是"..rabbit"和"lazy.#".
这三个绑定规则可以简单概括为:
Q1对orange颜色的动物感兴趣;
Q2则对所有物种是rabbit的、速度是lazy的所有动物感兴趣;
举例来说明带有不同routing key的消息会被路由到哪个队列:
"quick.orange.rabbit": 分发到Q1和Q2;
"lazy.orange.elephant": 分发到Q1和Q2;
"quick.orange.fox": 分发到Q1;
"lazy.brown.fox": 分发到Q2;
"lazy.pink.rabbit": 分发到Q2,且只会分发一次,虽然它匹配了Q2的两个绑定;
"quick.brown.fox": Q1、Q2都不分发,因为没有与之匹配的绑定规则,将会被忽略;
"orange": 无匹配规则,丢失
"quick.orange.male.rabbit": 无匹配规则,丢失
"lazy.orange.male.rabbit": 分发到Q2,虽然有4个单词,但符合"lazy.#"规则的通配符
Topic exchange
Topic exchange很灵活,也很容易用此实现其他类型的功能.
如果将"#"指定为绑定键,那么就会接收所有的消息,相当于fanout类型的广播;
如果通配符"*","#"均不作为绑定键使用,那么其功能实现就等同于direct类型;
(2)整个文件
在前一篇的基础上实现topic exchange,只需做少许改的即可,这里我们假设routing key由两个单词组成,类似于"<facility>.<severity>".
emit_log_topic.go:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open an channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", //name
"topic", //type
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] sent %s", body)
}
func bodyFrom(args []string) string{
var s string
if(len(args) < 3) || os.Args[2] == "" {
s = "hello"
}else{
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if len(args) < 2 || args[1] == "" {
s = "info"
}else {
s = os.Args[1]
}
return s
}
receive_logs_topic.go:
package main
import(
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic",
"topic",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", //name
false,
false,
true,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name,
s,
"logs_topic",
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
(3)运行
接收所有消息:
go run receive_logs_topic.go "#"
接收来自"kern"设备的消息:
go run receive_logs_topic.go "kern.*"
接收所有以"critical"结尾的消息:
go run receive_logs_topic.go "*.critical"
创建多重绑定:
go run receive_logs_topic.go "kern.*" "*.critical"
发送消息:
go run emit_log_topic.go "kern.critical" "A critical kernal error"
上述假设都是基于两个单词的,你也可以试一下设置其他长度单词的routing key看看会发生什么。
2.3.6 远程过程调用(RPC)
如果需要在一个远程机器上执行一个函数然后等待它的返回结果应该怎样?这个过程称之为远程过程调用(RPC:Remote Procedure Call).
本篇将介绍如何利用RabbitMQ实现一个包含客户端和可扩展服务端的RPC系统,仍然跟之前的一样,利用模拟计算来替代真实的耗时任务,这里使用计算斐波那契数列函数。
(1)回调队列(Callback queue)
RPC系统的模式是客户端发送请求给服务器,服务器接收处理后回复一条响应消息。RabbitMQ对此提供了很好的支持,通过在请求中指定callback的回调队列地址来实现接收服务器的响应消息:
q, err := ch.QueueDeclare(
"", //name
false, //durable
false, //delete when usused
true, //exclusive
false, //nowait
nil, //argments
)
err = ch.Publish(
"", //exchange
"rpc_queue" //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
}
)
Message properties
AMQP 0-9-1协议中共定义了14个消息属性,其中大部分是不常用的,常用的有以下几个:
- persistent: 标记消息是持久化(true)或者临时的(false),该属性在第二篇文章中有介绍;
- content_type: 用来描述mime-type的编码,如JSON类型:application/json;
- reply_to: 用于标记回调队列的名称;
- correlation_id: 用来表示request和response的关联关系;
(2)Correlation Id
RPC server对Client请求的响应同样需要通过消息队列来传递,可以对每一次请求创建一个回调队列,但这种方式效率很低,更好的方式是:对于每一个客户端只创建一个回调队列。
但这样会带来一个问题:回调队列接收到一个response之后,如何确定其对应的request?这就需要 correlataion_id来标识。客户端在request中添加一个唯一的correlation_id,在接收到服务器返回的response时,根据该值来确定与之匹配的request并处理。如果未能找到与之匹配的correlation_id,说明该response并不属于当前client的请求,为了安全起见,将其忽略即可。
我们可能会问:为什么在没有找到与之匹配的correlation_id时是将其忽略而不是失败报错?这是考虑到服务端的竞争条件:假设RPC server在发送response后宕机了,而此时却没能对当前request发出确认消息(ack).如果这种情况出现,该请求还在队列中会被再次派发。因此当前Request会在服务端处理两次,也会给客户端发送两次Response,故而,client要能处理重复的response,而server端对于Request需要实现幂等。
(3)总结
RPC的工作过程如下:
当Client启动时,会创建一个匿名的、独有的回调队列;
对每一个RPC Request,Client都会设置两个参数:用于标识回调队列的reply_to和用于唯一标识的correlation_id;
Request被发送到rpc_queue队列。
RPC服务器等待rpc_queue的消息,一旦消息到达,处理任务后将响应结果消息发送到reply_to指定的队列;
Client等待callback队列的消息,一旦消息到达,查找与correlation_id匹配的request,然后返回给应用程序。
(4)代码
首先看下斐波那契数列(Fibonacci)函数:
斐波那契数列指的是这样一个数列:
这个数列从第3项开始,每一项都等于前两项之和。
func fib(n int) int {
if n== 0 {
return 0
}else if n==1 {
return 1
}else {
return fib(n-1) + fib(n-2)
}
}
这里假设输入都是正整数,且不指望它对大整数有效,因为这个方式可能是效率最差的了。
rpc_server.go文件:
package main
import (
"fmt"
"log"
"strconv"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func fib(n int) int {
if n== 0 {
return 0
}else if n==1 {
return 1
}else {
return fib(n-1) + fib(n-2)
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"rpc_queue", //name
false, //durables
false, //delete when unused
false, //exclusive
false, //no wait
nil, //args
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")
msgs, err := ch.Consume(
q.Name, //queue
"", //exchange
false, // auto-ack
false, //exclusive
false, //no-local
false, //no-wait
nil, //args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to an integer")
log.Printf(" [.] fib(%d)", n)
response := fib(n)
err = ch.Publish(
"", //exchange
d.ReplyTo, //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType : "text/plain",
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
})
failOnError(err, "Failed to publish a message")
d.Ack(false)
}
}()
log.Printf(" [*] Awaiting RPC reqeusts")
<-forever
}
服务端的代码简单明了:
首先建立RabbitMQ的连接、创建通道和定义队列;
其次如果是多服务器进程,可以通过prefetch值得设置实现的负载均衡;
最后通过Channel.Consume监听队列消息,然后通过goroutine来实现对消息的处理和发送response。
rpc_client.go文件:
package main
import (
"fmt"
"log"
"strconv"
"os"
"math/rand"
"strings"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func randomString(l int) string {
bytes := make([]byte, l)
for i:=0; i<l; i++ {
bytes[i] = byte(randInt(65, 90))
}
return string(bytes)
}
func randInt(min int, max int) int {
return min + rand.Intn(max - min)
}
func bodyFrom(args []string) int {
var s string
if(len(args) < 2 || os.Args[1]==""){
s = "30"
}else{
s = strings.Join(args[1:], " ")
}
n, err := strconv.Atoi(s)
failOnError(err, "Failed to convert arg to integer")
return n
}
func fibonacciRPC(n int) (res int, err error) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"", //name
false, //durables
false, //delete when unused
true, //exclusive
false, //no wait
nil, //args
)
failOnError(err, "Failed to declare a queue")
msgs , err := ch.Consume(
q.Name, //queue
"", //consumer
true, //auto-ack
false, //exclusive
false, //no-lock
false, //nowait
nil,
)
failOnError(err, "Faield to register a consumer")
corrId := randomString(32)
err = ch.Publish(
"", //exchange
"rpc_queue", //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
})
failOnError(err, "Failed to publish a message")
for d:= range msgs {
if corrId == d.CorrelationId {
res, err = strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
break
}
}
return
}
func main(){
rand.Seed(time.Now().UTC().UnixNano())
n:= bodyFrom(os.Args)
log.Printf(" [x] Requesting fib(%d)", n)
res, err := fibonacciRPC(n)
failOnError(err, "Failed to handle RPC request")
log.Printf(" [.] Got %d", res)
}
(5)运行
首先运行RPC server:
go run rpc_server.go
# => [x] Awaiting RPC requests
客户端计算斐波那契数列:
go run rpc_client.go 30
# => [x] Requesting fib(30)
目前为止设计的RPC系统,不仅仅是能提供RPC服务,还具备其他优点:
* 如果单台RPC服务器性能缓慢,可以很容易的进行扩展,只需在新窗口运行一个rpc_server.go脚本即可;
* 在客户端,RPC模式要求对消息进行一次发送和接收操作,因此只需要一次网络往返即可完成一次RPC请求;
当然,这里的Demo过于简单,并没有考虑实际应用中复杂而重要的诸多问题,如:
* 客户端如何处理服务端掉线的情况?
* 客户端如何处理服务端超时情况?
* 如果服务端故障导致异常,是否需要将异常转发给客户端处理?
* 在对消息处理前未对消息的合法性进行检查,如边界值、类型信息等;
如果你对上述问题有兴趣,期望了解更多的使用知识,请参考Management UI.
2.4 RabbitMQ原理
2.4.1 RabbitMQ基本概念
首先来看看RabbitMQ里的几个重要概念:
- 生产者(Producer):发送消息的应用。
- 消费者(Consumer):接收消息的应用。
- 队列(Queue):存储消息的缓存。
- 消息(Message):由生产者通过RabbitMQ发送给消费者的信息。
- 连接(Connection):连接RabbitMQ和应用服务器的TCP连接。
- 通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。
- 交换机(Exchange):交换机负责从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须到绑定一个交换机。
- 绑定(Binding):绑定是队列和交换机的一个关联连接。
- 路由键(Routing Key):路由键是供交换机查看并根据键来决定如何分发消息到列队的一个键。路由键可以说是消息的目的地址。
生产者(Producer)发送/发布消息到代理->消费者(Consumer)从代理那里接收消息。哪怕生产者和消费者运行在不同的机器上,RabbitMQ也能扮演代理中间件的角色。
当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用交换机(Exchange)来发送。下面的设计图简单展示了这三个主要的组件之间是如何连接起来的。
交换机代理(exchange agent)负责把消息分发到不同的队列里。这样的话,消息就能够从生产者发送到交换机,然后被分发到消息队列里。这就是常见的“发布”方法。
Producer
然后,消息会被消费者从队列里读取并消费,这就是“消费”。
2.4.2 RabbitMQ架构
2.4.3 交换机(Exchange)
消息并不是直接发布到队里里的,而是被生产者发送到一个交换机上。交换机负责把消息发布到不同的队列里。交换机从生产者应用上接收消息,然后根据绑定和路由键将消息发送到对应的队列里。绑定是交换机和队列之间的一个关系连接。
2.4.4 RabbitMQ里的消息流程
生产者(producer)把消息发送给交换机。当你创建交换机的时候,你需要指定类型。交换机的类型接下来会讲到。
交换机(exchange)接收消息并且负责对消息进行路由。根据交换机的类型,消息的多个属性会被使用,例如路由键。
绑定(binding)需要从交换机到队列的这种方式来进行创建。在这个例子里,我们可以看到交换机有到两个不同队列的绑定。交换机根据消息的属性来把消息分发到不同的队列上。
消息(message)消息会一直留在队列里直到被消费。
消费者(consumer)处理消息。
2.4.5 交换机类型的3种类型
直接(Direct):直接交换机通过消息上的路由键直接对消息进行分发。
扇出(Fanout):一个扇出交换机会将消息发送到所有和它进行绑定的队列上。
主题(Topic):这个交换机会将路由键和绑定上的模式进行通配符匹配。
2.4.6 虚拟主机
RabbitMQ是一个多组件系统:Connection、Exchange、Queue、Binding、User Permision、Policies和其他属于虚拟主机(实体的逻辑分组)的组件。
创建虚拟主机
虚拟主机可以通过rabbitmqctl的add_vhost命令进行创建,该命令接收一个必填参数——虚拟主机名称。例如:
> rabbitmqctl add_vhost qa1
更多内容参考《RabbitMQ系列(四) RabbitMQ的虚拟主机》
3.问题及解答
3.1 RabbitMQ启动失败
失败原因信息:
root@JD3:/etc/rabbitmq# systemctl start rabbitmq-server
Job for rabbitmq-server.service failed because the control process exited with error code. See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.
使用systemctl status rabbitmq-server.service命令进一步查看问题:
root@JD3:/etc/rabbitmq# systemctl status rabbitmq-server.service
rabbitmq-server.service - RabbitMQ Messaging Server
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
Active: activating (auto-restart) (Result: exit-code) since Fri 2021-01-22 11:26:52 CST; 4s ago
Process: 6389 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
Process: 27911 ExecStartPost=/usr/lib/rabbitmq/bin/rabbitmq-server-wait (code=exited, status=2)
Process: 27910 ExecStart=/usr/sbin/rabbitmq-server (code=exited, status=140)
Main PID: 27910 (code=exited, status=140)
Jan 22 11:26:52 JD3 systemd[1]: Failed to start RabbitMQ Messaging Server.
Jan 22 11:26:52 JD3 systemd[1]: rabbitmq-server.service: Unit entered failed state.
Jan 22 11:26:52 JD3 systemd[1]: rabbitmq-server.service: Failed with result 'exit-code'.
没有看到有效信息。
查看 /var/log/rabbitmq/startup_log的启动日志,发现异常情况:
{"could not start kernel pid",application_controller,"error in config file "/etc/rabbitmq/rabbitmq.config" (1): syntax error before: '['"}
解决方法:
发现自己编辑 /etc/rabbitmq/rabbitmq.config 文件,少了最后的.导致,此字符表示配置结束。修改后就不会报错了。
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, [“guest”]}]}
].
3.2 RabbitMQ的配置文件格式如何选择?
/etc/rabbitmq下,增加rabbitmq.conf文件,删除rabbitmq.config文件。
4, RabbitMQ常见面试题
参考:
(1) 消息队列 RabbitMQ 常见面试题总结! (附答案)-专题 http://www.mianshigee.com/topic/1009ada/
(2) 技术干货:RabbitMQ面试题及答案 http://blog.itpub.net/69902581/viewspace-2673724/
5,参考
(1)Ubuntu 16.04 RabbitMq 安装与运行(安装篇)【成功】
https://blog.csdn.net/qq_22638399/article/details/81704372
(2)RabbitMQ 从入门到精通 (一)
https://www.cnblogs.com/dwlovelife/p/10982735.html
https://www.cnblogs.com/dwlovelife/p/10991371.html
(3)RabbitMQ核心概念以及工作原理
https://www.jianshu.com/p/256c502d09cd
(4)RabbitMQ 中文文档
http://rabbitmq.mr-ping.com/
https://www.rabbitmq.com
(5)RabbitMQ入门详解以及使用【操作】
https://www.cnblogs.com/huangting/p/11989597.html
(6)RabbitMQ 第一个入门实例hello world【goLang版本】
http://rabbitmq.mr-ping.com/tutorials_with_golang/[1]Hello_World.html
(7)RabbitMQ 第一个入门实例hello world【PHP版本】
http://www.apeit.cn/rabbitmq-hello-world-1xvq3
(8)老司机带你入门RabbitMQ消息中间件【MQ对比,一般】
https://www.imooc.com/article/79039
(9)rabbitmq官方的六种工作模式
https://blog.csdn.net/qq_33040219/article/details/82383127
(10)rabbitmq的配置参考
https://www.rabbitmq.com/configure.html
https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example
(11)RABBITMP的GO消息接口
https://godoc.org/github.com/streadway/amqp
(12)攀登者007的RabbitMQ系列 入门
RabbitMQ系列(一)RabbitMQ的架构
https://juejin.cn/post/6844904136941518862
RabbitMQ系列(二)RabbitMQ Server的安装(基于二进制)
https://juejin.cn/post/6844904111826026510
RabbitMQ系列(三)RabbitMQ Server的安装(基于Linux RPM)
https://juejin.cn/post/6844904111545008141
RabbitMQ系列(四) RabbitMQ的虚拟主机
https://juejin.cn/post/6844904057274892296
RabbitMQ系列(五) RabbitMQ的文件和目录位置
https://juejin.cn/post/6844904139063820295
RabbitMQ系列(六)RabbitMQ的配置
https://juejin.cn/post/6844904057845334029
RabbitMQ系列(七)RabbitMQ的参数与策略
https://juejin.cn/post/6844904136949891085
RabbitMQ系列(八)RabbitMQ的日志
https://juejin.cn/post/6844904061829939208