代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下
1.confirm 消息确认机制
消息的确认是指生产者投递消息后,如果 Broker 接收到消息,则会给生产者一个应答。生产者进行接收应答,用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障。其流程图如下所示
实现 confirm 确认消息
第一步:在 channel 上开启确认模式:channel.confirmSelect()
第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!
代码实现:
生产端
/**
* confirm机制生产端
*/
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
log.info("生产已启动,并发送了:{}", msg);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, null, msg.getBytes());
//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override //deliveryTag表示消息的唯一标签,
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("-------ack!-----------");
log.info("deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
@Override //失败时进入这里
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("-------no ack!-----------");
log.info("deliveryTag: {}, multiple: {}", deliveryTag, multiple);
}
});
}
}
消费端代码无需什么修改
/**
* confirm机制消费端
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_confirm_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_REG = "confirm.#";
public static final String ROUTING_KEY = "confirm.abc";
public static final String QUEUE_NAME = "test_confirm_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true); //true表示持久化
//是否持久化,独占模式,自动删除
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_REG);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 是否自动签收 autoAck
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
log.info("消费端已启动");
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消费端: {}", msg);
}
}
}
启动消费端
再启动生产端,注意看日志打印,收到了 confirm
自此,confirm 机制的用法接受完毕。
2. return 消息机制
return Listener 用于处理一些不可路由的消息!
生产者通过指定一个 exchange 和 routingkey 把消息送达到某个队列中去,然后消费者监听队列,进行消费处理。但是在某些情况下,如果我们在发送消息时,当前的 exchange 不存在或者指定的 routingkey 路由不到,这个时候如果要监听这种不可达的消息,就要使用 return Listener。流程图如下所示
实现 return 消息机制
在基础 API 中有一个关键的配置项 Mandatory:如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,则 broke r端自动删除该消息。
代码实现
生产端代码:
/**
* Return返回消息 生产端
*/
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("---------handle return----------");
log.info("响应码replyCode: {}", replyCode);
log.info("文本信息replyText: {}", replyText);
log.info("exchange: {}", exchange);
log.info("routingKey: {}", routingKey);
log.info("properties: {}", properties);
log.info("body: {}" ,new String(body));
}
});
/**
* 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,
* 如果为false,则broker端自动删除该消息。
*/
log.info("生产端{}发送:{}", Consumer.ROUTING_KEY, msg + Consumer.ROUTING_KEY);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + Consumer.ROUTING_KEY).getBytes());
log.info("生产端{}发送:{}", Consumer.ROUTINGKEY_ERROR, msg + Consumer.ROUTINGKEY_ERROR);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTINGKEY_ERROR, true, null, (msg + Consumer.ROUTINGKEY_ERROR).getBytes());
log.info("生产端{}发送:{}", Consumer.ROUTINGKEY_ERROR2, msg + Consumer.ROUTINGKEY_ERROR2);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTINGKEY_ERROR2, false, null, (msg + Consumer.ROUTINGKEY_ERROR2).getBytes());
}
}
消费端代码和以前的一样,无需修改什么
/**
* Return返回消息 消费端
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_return_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_REG = "return.#";
public static final String ROUTING_KEY = "return.save";
public static final String ROUTINGKEY_ERROR = "abc.true";
public static final String ROUTINGKEY_ERROR2 = "abc.false";
public static final String QUEUE_NAME = "test_return_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_REG);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
log.info("消费端启动成功");
while(true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消费者: {}", msg);
}
}
}
先启动消费端,然后上 rabbitMQ 的管控台查看对应的 exchange 和 queue 是否创建,绑定成功。
再启动生产端,注意看打印的日志
监听器会接收到路由不可达的消息,然后进行后续处理,前提是消费端的 Mandatory 为 true,可以修改为 false,再试试看能不能接收到,我这里就不演示了。