消息服务 - RabbitMQ 基础入门

rabbitmq

RabbitMQ官方入门教程

本文算是实现对入门教程的 java版本翻译吧。本文中演示代码地址

1. 安装

  1. 先安装 erlang (安装网上提供的教程安装erlang)

  2. 在安装 rabbitmq-server
    下载rabbitmq的安装包的时候选择 tar.xz 直接解压就可以了

  3. 启动/停止

#启动rabbitmq
#进入安装目录的sbin 目录,执行
./rabbitmq-server -detached

#关闭rabbitmq
./rabbitmqctl stop


2.用户权限

guest/guest 现在只能在localhost使用,不能远程使用

需要添加用户和权限

参考文章 RabbitMQ用户角色及权限控制

例如:

####添加用户:

$sudo rabbitmqctl add_user  user_admin  passwd_admin  

#####修改角色为 administrator:

$sudo rabbitmqctl set_user_tags user_admin administrator

上面的操作有了,可能还是为在日志中提示 user_admin在 vhost '/' 权限不够

则执行下面的操作:

######修改权限
$sudo rabbitmqctl  set_permissions -p /  user_admin '.*' '.*' '.*'

3.rabbitmq 的使用

3.1 工作队列(workqueues) 模式

注意的点:

  1. 工作队列中的消息被所有的消费者共享。
  2. rabbitmq在路由消息到消费者的时候使用轮询(round-robin)的方式,找到第n个消费者来消费消息
  3. ack(ackownledgment)机制确保消息被消费,不出现消息没有消费就从内存中删除的情况

3.1.1 ack机制(Message acknowledgment)

RabbitMQ 支持消息确认,当消费者(Consumer)接受到消息并且处理完成之后,回给RabbitMQ 发送一个确认消息,Rabbitmq这个时候可以随意的删除这个消息

如果消费者die ,但是没有发送确认消息,这个时候RabbitMQ会认为消息没有完全处理,这个时候会这个消息重新放到队列之中,使用轮询的方式分配给其他的消费者消费

RabbitMQ 默认开启了确认机制,使用的时候设置 autoAck = true 来关闭确认机制,设置autoAck = false 则开启

3.1.2 消息持久化(Message durability)

启动消息持久化,需要在申明channel的时候设置持久化属性为true

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

一个确定了是否持久化的队列,不能再修改durable的值

设置队列为持久化队列之后,需要设置下消息的属性,例如设置属性为MessageProperties (实现了 BasicProperties)

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

使用持久化消息并不能完全保证消息的持久化,应为消息可能先保存在缓存,后面保存到硬盘上,不过对于一般的task是完全足够的,如果想要确保完全的持久化,可以结合 publisher confirms 来实现

3.1.3 公平分配(Fair dispatch)

通过设置prefetchCount 的值,控制一个消费者接受的任务数量

int prefetchCount = 1;
channel.basicQos(prefetchCount);

如上面的设置,当一个消费者还有没有处理完的任务的时候,rabbitmq不会分配任务给他

开启管理功能

rabbitmq-plugins enable rabbitmq_management

web管理界面: http://server-name:15672/


3.2 发布订阅(Publish/Subscribe)模式

使用发布订阅模式,一条消息会被发送给多个消费者消费

使用示例说明:

创建一个日志的生产者 EmitLog 来发布日志,使用多个日志接受者ReceiveLogs
创建两个接受者,一个把日志往硬盘上面写,一个把日志打印到显示器。整个过程发布日志
的只有一个。日志被广播到多个接受者

1.1 交换(Exchanges)

在rabbitmq的消息模型中

生产者-->发送消息的一端

队列--> 消息缓冲区

消费者-->消费消息的一端

rabbitmq的核心思想中,生产者通常不直接发送消息给一个队列,事实上,
生产者也不知道把消息发送给哪个队列

实际上,生成这是把消息发送给一个Exchange,这个Exchange一方面从生产者那边接受消息,
一方面推送消息给队列(queue),exchange清楚的知道自己接受的消息要怎么处理——是追加到
一个指定的队列?还是追加到多个队列?还是直接丢弃?具体做法要根据exchange的类型(type)
来定

可以通过下面的指令列出exchange的类型

./rabbitmqctl list_exchanges

exchange的类型:

  1. direct
  2. topic
  3. headers
  4. fanout

之前的例子中没有使用exchange是因为我们使用了默认的exchange 即使用空字符串"" 来定义的
(默认exchange的消息会被发送到指明的routingKey的队列中)

channel.basicPublish("", "hello", null, message.getBytes());

现在创建一个exchange并发布

channel.exchangeDeclare("logs", "fanout");
channel.basicPublish( "logs", "", null, message.getBytes());

1.2 临时队列(Temporary queues)

使用无参的 queueDeclare() 方法可以创建一个非持久化的、独立的、自动删除的、名称随机的一个
队列。例如:产生一个名为 amq.gen-JzTY20BRgKO-HjmUJj0wLg 的队列

String queueName = channel.queueDeclare().getQueue();

1.3 绑定(Bindings)

之前说过,消息分配到哪个队列是有exchange来处理的,那么要确定消息去哪儿,就需要明确队列和exchange
的关系。这个过程称为binding

channel.queueBind(queueName, "logs", "");

3.3 路由(Routing)

之前的发布订阅示例中,我们使用的是广播消息,所有的接受者都能接受。使用routing之后
能够让接收者只接收消息的一个子集。例如之前的示例中,只有错误级别的日志写到硬盘,所有的
的错误全部打印

1.bindings

在绑定的过程中加上 routingKey,为了同 basic_publish 区分,我们称之为 binding key

channel.queueBind(queueName, EXCHANGE_NAME, "black");

2. Direct exchange

fanout exchange :只适合无脑的广播模式

direct exchange :消息会被分配到与消费的routing key 对应的 binding key的队列上,匹配不上的
消息直接丢弃

image

如图:direct 的 exchange X 关联了两个队列 Q1、Q2,Q1的 binding key=orange
,Q2的绑定了两个key black 和 green

在这种情况下:

routing key=orange的消息会被 exchange X 传递给 Q1

routing key=black 或者 routing key=green 的消息会被 exchange X 传递给 Q2

3. 多绑定 (Multiple bindings)

direct-exchange-multiple

如图,对direct exchange 同时使用 binding key =black 绑定Q1、Q2,这个时候
direct的exchange就同之前的fanout的一样了,直接广播消息了

4. 测试

测试时候一个开两个接收者,一个设置级别为 error ,一个设置为 info,error
然后开两个生产者 分别发送 error 级别的消息和info级别的消息

观察接收情况


3.4 主题(Topic)模式

在前面的日志例子中,我们可以通过级别来区分日志,但是我们还想通过日志来源来区分日志
就像unix tool 中的syslog,他能通过 级别(info/warn/error)和 来源(auth/cron/kern)来
路由日志

例如:我们只处理来自“cron” 的 “error”级别的日志,同时打印来自“kern”的所有级别的
日志

要实现这种要,我们就需要使用 topic exchange

1. Topic exchange

topic exchange 的 routing key必须是 一个或者英文单词,中间使用点隔开的方式,
最大长度是255字节

binding key 必须是相同格式,topic exchange 的逻辑和direct 很相似,消息只会分配到匹配的binding key的队列

  1. *(星) 代表一个单词
  2. #(哈希)代表零个或者多个单词

例如:

topic-example

在这个例子中,接收动物消息,消息会被发送到一个拥有三个单词(两个点隔开)的 routing key
格式为 <speed>.<color>.<species>

Q1绑定了 binding key“ *.orange.* ”, Q2 绑定了 “ *.*.rabbit” 和 “lazy.#”

Q1只对颜色为 orange的消息感兴趣

Q2关心所有的rabbit 和 lazy的消息

routing key 为 quick.orange.rabbit既会被发送到Q1 也会被发送到 Q2
lazy.orange.elephant --> Q1,Q2
quick.orange.fox --> Q1
lazy.brown.fox --> Q2
lazy.pink.rabbit --> Q2(有两个binding key 匹配的情况下依然只会被发送一次)
quick.brown.fox --> 匹配不上,直接丢弃
lazy.orange.male.rabbit -->Q2 (匹配上了"lazy.#")

当不使用 * 或者 # 站位符的时候,topic exchange表现的就和direct 是一样的


3.5 RPC

这一节使用 RabbitMQ 建立一个RPC系统:一个客户端和一个可扩展的服务器

本示例中在客户端调用一个call 方法,服务端返回一个Fibonacci 数列的值

关于使用rpc的几点建议:

  1. 本地方法和远程方法要定义明确,一目了然

  2. 系统加注释,是组件之间的依赖清晰可见

  3. 异常处理,当rpc 服务器出现异常的时候,客户端改如何处理

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,092评论 3 51
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,219评论 0 11
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,979评论 3 41