rabbitmq
本文算是实现对入门教程的 java版本翻译吧。本文中演示代码地址
1. 安装
先安装 erlang (安装网上提供的教程安装erlang)
在安装 rabbitmq-server
下载rabbitmq的安装包的时候选择 tar.xz 直接解压就可以了启动/停止
#启动rabbitmq
#进入安装目录的sbin 目录,执行
./rabbitmq-server -detached
#关闭rabbitmq
./rabbitmqctl stop
2.用户权限
guest/guest 现在只能在localhost使用,不能远程使用
需要添加用户和权限
参考文章 RabbitMQ用户角色及权限控制
例如:
####添加用户:
$sudo rabbitmqctl add_user user_admin passwd_admin
#####修改角色为 administrator:
$sudo rabbitmqctl set_user_tags user_admin administrator
上面的操作有了,可能还是为在日志中提示 user_admin在 vhost '/' 权限不够
则执行下面的操作:
######修改权限
$sudo rabbitmqctl set_permissions -p / user_admin '.*' '.*' '.*'
3.rabbitmq 的使用
3.1 工作队列(workqueues) 模式
注意的点:
- 工作队列中的消息被所有的消费者共享。
- rabbitmq在路由消息到消费者的时候使用轮询(round-robin)的方式,找到第n个消费者来消费消息
- ack(ackownledgment)机制确保消息被消费,不出现消息没有消费就从内存中删除的情况
3.1.1 ack机制(Message acknowledgment)
RabbitMQ 支持消息确认,当消费者(Consumer)接受到消息并且处理完成之后,回给RabbitMQ 发送一个确认消息,Rabbitmq这个时候可以随意的删除这个消息
如果消费者die ,但是没有发送确认消息,这个时候RabbitMQ会认为消息没有完全处理,这个时候会这个消息重新放到队列之中,使用轮询的方式分配给其他的消费者消费
RabbitMQ 默认开启了确认机制,使用的时候设置 autoAck = true 来关闭确认机制,设置autoAck = false 则开启
3.1.2 消息持久化(Message durability)
启动消息持久化,需要在申明channel的时候设置持久化属性为true
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
一个确定了是否持久化的队列,不能再修改durable的值
设置队列为持久化队列之后,需要设置下消息的属性,例如设置属性为MessageProperties (实现了 BasicProperties)
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
使用持久化消息并不能完全保证消息的持久化,应为消息可能先保存在缓存,后面保存到硬盘上,不过对于一般的task是完全足够的,如果想要确保完全的持久化,可以结合 publisher confirms 来实现
3.1.3 公平分配(Fair dispatch)
通过设置prefetchCount 的值,控制一个消费者接受的任务数量
int prefetchCount = 1;
channel.basicQos(prefetchCount);
如上面的设置,当一个消费者还有没有处理完的任务的时候,rabbitmq不会分配任务给他
开启管理功能
rabbitmq-plugins enable rabbitmq_management
web管理界面: http://server-name:15672/
3.2 发布订阅(Publish/Subscribe)模式
使用发布订阅模式,一条消息会被发送给多个消费者消费
使用示例说明:
创建一个日志的生产者 EmitLog 来发布日志,使用多个日志接受者ReceiveLogs
创建两个接受者,一个把日志往硬盘上面写,一个把日志打印到显示器。整个过程发布日志
的只有一个。日志被广播到多个接受者
1.1 交换(Exchanges)
在rabbitmq的消息模型中
生产者-->发送消息的一端
队列--> 消息缓冲区
消费者-->消费消息的一端
rabbitmq的核心思想中,生产者通常不直接发送消息给一个队列,事实上,
生产者也不知道把消息发送给哪个队列
实际上,生成这是把消息发送给一个Exchange,这个Exchange一方面从生产者那边接受消息,
一方面推送消息给队列(queue),exchange清楚的知道自己接受的消息要怎么处理——是追加到
一个指定的队列?还是追加到多个队列?还是直接丢弃?具体做法要根据exchange的类型(type)
来定
可以通过下面的指令列出exchange的类型
./rabbitmqctl list_exchanges
exchange的类型:
- direct
- topic
- headers
- fanout
之前的例子中没有使用exchange是因为我们使用了默认的exchange 即使用空字符串"" 来定义的
(默认exchange的消息会被发送到指明的routingKey的队列中)
channel.basicPublish("", "hello", null, message.getBytes());
现在创建一个exchange并发布
channel.exchangeDeclare("logs", "fanout");
channel.basicPublish( "logs", "", null, message.getBytes());
1.2 临时队列(Temporary queues)
使用无参的 queueDeclare() 方法可以创建一个非持久化的、独立的、自动删除的、名称随机的一个
队列。例如:产生一个名为 amq.gen-JzTY20BRgKO-HjmUJj0wLg 的队列
String queueName = channel.queueDeclare().getQueue();
1.3 绑定(Bindings)
之前说过,消息分配到哪个队列是有exchange来处理的,那么要确定消息去哪儿,就需要明确队列和exchange
的关系。这个过程称为binding
channel.queueBind(queueName, "logs", "");
3.3 路由(Routing)
之前的发布订阅示例中,我们使用的是广播消息,所有的接受者都能接受。使用routing之后
能够让接收者只接收消息的一个子集。例如之前的示例中,只有错误级别的日志写到硬盘,所有的
的错误全部打印
1.bindings
在绑定的过程中加上 routingKey,为了同 basic_publish 区分,我们称之为 binding key
channel.queueBind(queueName, EXCHANGE_NAME, "black");
2. Direct exchange
fanout exchange :只适合无脑的广播模式
direct exchange :消息会被分配到与消费的routing key 对应的 binding key的队列上,匹配不上的
消息直接丢弃
如图:direct 的 exchange X 关联了两个队列 Q1、Q2,Q1的 binding key=orange
,Q2的绑定了两个key black 和 green
在这种情况下:
routing key=orange的消息会被 exchange X 传递给 Q1
routing key=black 或者 routing key=green 的消息会被 exchange X 传递给 Q2
3. 多绑定 (Multiple bindings)
如图,对direct exchange 同时使用 binding key =black 绑定Q1、Q2,这个时候
direct的exchange就同之前的fanout的一样了,直接广播消息了
4. 测试
测试时候一个开两个接收者,一个设置级别为 error ,一个设置为 info,error
然后开两个生产者 分别发送 error 级别的消息和info级别的消息
观察接收情况
3.4 主题(Topic)模式
在前面的日志例子中,我们可以通过级别来区分日志,但是我们还想通过日志来源来区分日志
就像unix tool 中的syslog,他能通过 级别(info/warn/error)和 来源(auth/cron/kern)来
路由日志
例如:我们只处理来自“cron” 的 “error”级别的日志,同时打印来自“kern”的所有级别的
日志
要实现这种要,我们就需要使用 topic exchange
1. Topic exchange
topic exchange 的 routing key必须是 一个或者英文单词,中间使用点隔开的方式,
最大长度是255字节
binding key 必须是相同格式,topic exchange 的逻辑和direct 很相似,消息只会分配到匹配的binding key的队列
- *(星) 代表一个单词
- #(哈希)代表零个或者多个单词
例如:
在这个例子中,接收动物消息,消息会被发送到一个拥有三个单词(两个点隔开)的 routing key
格式为 <speed>.<color>.<species>
Q1绑定了 binding key“ *.orange.* ”, Q2 绑定了 “ *.*.rabbit” 和 “lazy.#”
Q1只对颜色为 orange的消息感兴趣
Q2关心所有的rabbit 和 lazy的消息
routing key 为 quick.orange.rabbit既会被发送到Q1 也会被发送到 Q2
lazy.orange.elephant --> Q1,Q2
quick.orange.fox --> Q1
lazy.brown.fox --> Q2
lazy.pink.rabbit --> Q2(有两个binding key 匹配的情况下依然只会被发送一次)
quick.brown.fox --> 匹配不上,直接丢弃
lazy.orange.male.rabbit -->Q2 (匹配上了"lazy.#")
当不使用 * 或者 # 站位符的时候,topic exchange表现的就和direct 是一样的
3.5 RPC
这一节使用 RabbitMQ 建立一个RPC系统:一个客户端和一个可扩展的服务器
本示例中在客户端调用一个call 方法,服务端返回一个Fibonacci 数列的值
关于使用rpc的几点建议:
本地方法和远程方法要定义明确,一目了然
系统加注释,是组件之间的依赖清晰可见
异常处理,当rpc 服务器出现异常的时候,客户端改如何处理