预备知识
-
- 交换机 及其 3 种类型
- 队列
- 路由键 与 绑定关系
-
消息生命周期
-
创建结构
- 创建交换机
- 创建队列
- 绑定队列到交换机
-
消费消息
- 消费者指定队列名称, 阻塞接受消息
- 收到消息内容, 程序进行相关处理, 处理成功调用 ACK。 处理失败调用NACK。
-
发送消息
- 程序产生消息内容
- 指定交换机和消息路由键调用发送api
- 交换机根据路由键将消息转到队列
- 队列临时存储消息, 将消息发送给消费者
-
消息的状态
- ready:消息在队列中等待被消费
- unack: 消息已经派发给消费者但未收到消费者成功或失败的回复
- dead:
- 消息被消费者回复reject或unack并且指定requeue(重新入队)为false。
- 设置了超时时间的队列,达到指定时间后仍然未被消费者取走消费
- 自动重新入队:unack状态下, 消费者程序崩溃(tcp连接中断),消息会自动重新入队, 等待后续被消费者再次取走
-
实现思路
可利用的特性
- dlx 死信交换机 声明队列时, 可以指定一个交换机, 当消息变为dead状态时, 该消息会被转发到这个交换机, 这个交换机就叫死信交换机
- 队列超时。声明队列时指定超时时间
尝试方案一
- 过程
- 正常创建业务交换机 BusinessExchange
- 创建业务消费队列 BusinessQueue 并绑定到BusinessExchange, 并且指定dlx为BusinessExchange
- 程序收到消息时检查重试次数, 超过指定次数直接丢弃(不处理,直接返回ACK)
- 效果
消息变为死信后回到队列尾部,等待下次重试, 可以实现重试功能 - 缺陷
消息处理失败通常时由于某些服务异常导致, 服务异常通常会是持续的一小段时间。 如果队列消息过少, 或者消息消费很快, 那么一条异常消息将会再短时间内被消费多次,达到设定的重试上限。所以当消息变为死信时,最好能延迟重新入队 - 但是
rabbit mq 不支持延迟消息
解决方案
参见代码注释
- ExchangeBusiness 中的 QueueBusiness 的消费者主动调用 nack 标识失败, 指定requeue为false
- 消息变为死信, 自动路由到指定死信 ExchangeWait
- ExchangeWait 被一个没有消费者的队列(QueueWait)绑定, 并设置 30s 超时
- 消息在 QueueWait 中超时后再次变为死信. 被路由到 ExchangeRetry
- QueueBusiness 同时绑定 ExchangeBusiness ExchangeRetry, 消息得以再次消费
这里需要创建 QueueBusiness, ExchangeWait, QueueWait, ExchangeRetry, 并返回 QueueBusiness
package mq_topic
import (
"fmt"
"github.com/pkg/errors"
"github.com/streadway/amqp"
"sync"
"time"
)
// 消息队列自动重试
// 1. ExchangeBusiness 中的 QueueBusiness 的消费者主动调用 nack 标识失败, 指定requeue为false
// 2. 消息变为死信, 自动路由到指定死信 ExchangeWait
// 3. ExchangeWait 被一个没有消费者的队列(QueueWait)绑定, 并设置 30s 超时
// 4. 消息在 QueueWait 中超时后再次变为死信. 被路由到 ExchangeRetry
// 5. QueueBusiness 同时绑定 ExchangeBusiness ExchangeRetry, 消息得以再次消费
// 这里需要创建 QueueBusiness, ExchangeWait, QueueWait, ExchangeRetry, 并返回 QueueBusiness
func DeclareRetryQueue(queueBusiness string, durable, autoDelete, exclusive, noWait bool, args amqp.Table, ch *amqp.Channel, waitDuration time.Duration) (q amqp.Queue, err error) {
exchangeWait := fmt.Sprintf("%s-exchange-wait", queueBusiness)
exchangeRetry := fmt.Sprintf("%s-exchange-retry", queueBusiness)
queueWait := fmt.Sprintf("%s-queue-wait", queueBusiness)
// 创建业务队列, 指定死信 exchange
if args == nil {
args = amqp.Table{}
}
args["x-dead-letter-exchange"] = exchangeWait
q, err = ch.QueueDeclare(queueBusiness, durable, autoDelete, exclusive, noWait, args)
if err != nil {
err = errors.Wrap(err, "QueueDeclare queueBusiness error")
return
}
// 创建 exchangeWait
err = ch.ExchangeDeclare(exchangeWait, "fanout", true, false,
false, false, nil)
if err != nil {
err = errors.Wrap(err, "ExchangeDeclare exchangeWait error")
return
}
// 创建 exchangeRetry
err = ch.ExchangeDeclare(exchangeRetry, "fanout", true, false,
false, false, nil)
if err != nil {
err = errors.Wrap(err, "ExchangeDeclare exchangeRetry error")
return
}
// 创建 queueWait 指定死信和超时
_, err = ch.QueueDeclare(queueWait, true, false, false, false,
amqp.Table{"x-dead-letter-exchange": exchangeRetry, "x-message-ttl": int(waitDuration / time.Millisecond)})
if err != nil {
err = errors.Wrap(err, "ExchangeDeclare exchangeRetry error")
return
}
// queueWait 绑定 exchangeWait
err = ch.QueueBind(queueWait, "", exchangeWait, false, nil)
if err != nil {
err = errors.Wrap(err, "QueueBind queueWait exchangeWait error")
return
}
// queueBusiness 绑定到 exchangeRetry
err = ch.QueueBind(queueBusiness, "", exchangeRetry, false, nil)
if err != nil {
err = errors.Wrap(err, "QueueBind queueBusiness exchangeRetry error")
return
}
return
}
// 获取消息重试次数
func GetMessageRetryCount(msg amqp.Delivery) uint {
xDeath, ok := msg.Headers["x-death"]
if !ok {
return 0
}
arr := xDeath.([]interface{})
if len(arr) == 0 {
return 0
}
return uint(arr[0].(amqp.Table)["count"].(int64))
}