RabbitMQ笔记十六:消息确认之二(Consumer Acknowledgements)

消费确认(comsumer acknowledgements)

broker与消费者之间的消息确认称为comsumer acknowledgements,comsumer acknowledgements机制用于解决消费者与Rabbitmq服务器之间消息可靠传输,它是在消费端消费成功之后通知broker消费端消费消息成功从而broker删除这个消息。

RabbitMQ Java Client 实现消息确认

自动确认

zhihao.miao.order队列中发送一条消息

web管控台查看

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeUnit;

public class Consumer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        /**
         * basicConsume方法的第二个参数是boolean类型,true表示消息一旦投递出去就自动确认,而false表示需要自己手动去确认
         * 自动确认有丢消息的可能,因为如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
         * 设置了false,表示需要人为手动的去确定消息,只有消费者将消息消费成功之后给与broker人为确定才进行消息确认
         * 这边也有个问题就是如果由于程序员自己的代码的原因造成人为的抛出异常,人工确认那么消息就会一直重新入队列,一直重发?
         */

        String consumerTag = channel.basicConsume("zhihao.miao.order",true,new SimpleConsumer(channel));
        System.out.println(consumerTag);

        TimeUnit.SECONDS.sleep(30);

        channel.close();
        connection.close();
    }
}

消费具体逻辑

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(consumerTag);
        System.out.println("-----收到消息了---------------");
        System.out.println("消息属性为:"+properties);
        System.out.println("消息内容为:"+new String(body));
        try
        {
            int i = 1/0;
            System.out.println(i);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

控制台打印:

amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
java.lang.ArithmeticException: / by zero
-----收到消息了---------------
    at com.zhihao.test.day04.SimpleConsumer.handleDelivery(SimpleConsumer.java:29)
amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
消息属性为:#contentHeader<basic>(content-type=json, content-encoding=null, headers={}, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
消息内容为:{"orderId":"abba05db-050e-4b1a-97f1-c469b23ca27b","createTime":"2017-10-22T21:02:41.861","price":100.0}
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

此时可以看到消费端抛出了异常,但是我们发现这条消息也已经消费掉了,此时如果消费端消费逻辑使用spring进行管理的话消费端业务逻辑会进行回滚,这也就造成了实际意义的消息丢失。

web管控台

手动确认

自动确认会造成实际意义上的消息丢失。

将basicConsume方法的第二个参数改为false,表示人工的进行消息确认,如果消费者正在监听队列,那么此时消息进入Unacked,而如果消费者停掉服务,那么消息的状态又变成Ready了。这个机制表明了消息必须是ack确认之后才会在server中删除掉。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeUnit;

public class Consumer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //手动确认
        String consumerTag = channel.basicConsume("zhihao.miao.order",false,new SimpleConsumer(channel));
        System.out.println(consumerTag);

        TimeUnit.SECONDS.sleep(30);

        channel.close();
        connection.close();
    }
}

消费具体逻辑

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println(consumerTag);
        System.out.println("-----收到消息了--------------");

        System.out.println("消息属性为:"+properties);
        System.out.println("消息内容为:"+new String(body));

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
        System.out.println("消息消费成功");
    }
}
此时消息已经发送给消费者,但是消费者还没有进行手动确认

发送一个header中包含error属性的消息,

发送一个header中包含error属性的消息

改造消费逻辑

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println(consumerTag);
        System.out.println("-----收到消息了--------------");

        if(properties.getHeaders().get("error") != null){
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
          
            System.out.println("nack");
            this.getChannel().basicNack(envelope.getDeliveryTag(),false,true);

            return;
        }
        System.out.println("消息属性为:"+properties);
        System.out.println("消息内容为:"+new String(body));


        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
        System.out.println("消息消费成功");
    }
}

控制台打印,说明该消息一直重新入队列然后一直重新消费

amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack

消费端也可以拒绝消息,

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println(consumerTag);
        System.out.println("-----收到消息了--------------");

        if(properties.getHeaders().get("error") != null){
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
           
            //这个api也支持拒绝消息消费,第二个参数表示是否重新入队列
            this.getChannel().basicReject(envelope.getDeliveryTag(),false);
            System.out.println("消息无法消费,拒绝消息");
            return;
        }
        System.out.println("消息属性为:"+properties);
        System.out.println("消息内容为:"+new String(body));


        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
        System.out.println("消息消费成功");
    }
}

控制台打印,因为设置了不重新入队列,所以不再重新发消息了:

amq.ctag-kiy_49AkC3f4qRkqCMujrw
amq.ctag-kiy_49AkC3f4qRkqCMujrw
-----收到消息了--------------
消息无法消费,拒绝消息

总结
消费端的消息确认分为二个步骤,

  • 在channel.basicConsume指定为手动确认。
  • 具体根据业务逻辑来进行判断什么是ack什么时候nack(又分为要不要重新requeue)

这边有个问题就是nack时候或者reject时候重新入队列如果业务端因为代码逻辑问题一直重发怎样去设置一个次数值?
我的设想就是设置一个重新发送的递增值,这个值与消息id对应,去处理解决它。或者在redis或者memcache等其他保存方式然后记录这个重发次数。
How do I set a number of retry attempts in RabbitMQ?

Spring AMQP消费端实现消息确认

自动确认

配置类

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");
        /**
         * 自动确认涉及到一个问题就是如果在消息消息的时候抛出异常,消息处理失败,但是因为自动确认而server将该消息删除了。
         * NONE表示自动确认
         */
        container.setAcknowledgeMode(AcknowledgeMode.NONE);
        container.setMessageListener((MessageListener) message -> {
            System.out.println("====接收到消息=====");
            System.out.println(new String(message.getBody()));

            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //相当于自己的一些消费逻辑抛错误
            throw new NullPointerException("consumer fail");

        });
        return container;
    }
}

应用启动类

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        TimeUnit.SECONDS.sleep(100);
        System.out.println("message container startup");

        context.close();
    }
}

控制台打印:

====接收到消息=====
{"orderId":"d232eea5-35ae-4534-80f4-cfb31f49178f","createTime":"2017-10-22T22:11:34.239","price":100.0}
十月 22, 2017 10:11:58 下午 org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
警告: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception

Web控制台上显示消息消费确认也成功。问题还是自动确认会造成事实上的消息丢失。

手动确认

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");

        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            System.out.println("====接收到消息=====");
            System.out.println(new String(message.getBody()));
            TimeUnit.SECONDS.sleep(10);
            if(message.getMessageProperties().getHeaders().get("error") == null){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                System.out.println("消息已经确认");
            }else {
                //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                System.out.println("消息拒绝");
            }

        });
        return container;
    }
}

总结

AcknowledgeMode.NONE:自动确认,等效于autoAck=true
AcknowledgeMode.MANUAL:手动确认,等效于autoAck=false,此时如果要实现ack和nack回执的话,使用ChannelAwareMessageListener监听器处理。

AcknowledgeMode.AUTO的使用

我们发现AcknowledgeMode除了AcknowledgeMode.NONEAcknowledgeMode.MANUAL常量值之外还有一个AcknowledgeMode.AUTO的常量。

配置类

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener((MessageListener) (message) -> {
            System.out.println("====接收到消息=====");
            System.out.println(new String(message.getBody()));
            //抛出NullPointerException异常则重新入队列
            //throw new NullPointerException("消息消费失败");
            //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
            //throw new AmqpRejectAndDontRequeueException("消息消费失败");
            //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
            throw new ImmediateAcknowledgeAmqpException("消息消费失败");

        });
        return container;
    }
}

应用启动类

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        TimeUnit.SECONDS.sleep(100);
        System.out.println("message container startup");

        context.close();
    }
}

AcknowledgeMode.AUTO 根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)

  • 如果消息成功被消费(成功的意思就是在消费的过程中没有抛出异常),则自动确认。

1)当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false(不重新入队列)
2)当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
3)其他的异常,则消息会被拒绝,且requeue=true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过setDefaultRequeueRejected(默认是true)去设置,

源码分析

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainerdoReceiveAndExecute方法,

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR

    Channel channel = consumer.getChannel();

    for (int i = 0; i < this.txSize; i++) {

        logger.trace("Waiting for message from consumer.");
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (message == null) {
            break;
        }
        try {
           //具体的逻辑,具体执行Listener
            executeListener(channel, message);
        }
        //当ImmediateAcknowledgeAmqpException异常的时候打印日志然后直接break
        catch (ImmediateAcknowledgeAmqpException e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("User requested ack for failed delivery: "
                        + message.getMessageProperties().getDeliveryTag());
            }
            break;
        }
        catch (Throwable ex) { //NOSONAR
            if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery: "
                            + message.getMessageProperties().getDeliveryTag());
                }
                break;
            }
            if (this.transactionManager != null) {
                if (this.transactionAttribute.rollbackOn(ex)) {
                    RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
                            .getResource(getConnectionFactory());
                    if (resourceHolder != null) {
                        consumer.clearDeliveryTags();
                    }
                    else {
                        /*
                         * If we don't actually have a transaction, we have to roll back
                         * manually. See prepareHolderForRollback().
                         */
                        consumer.rollbackOnExceptionIfNecessary(ex);
                    }
                    throw ex; // encompassing transaction will handle the rollback.
                }
                else {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("No rollback for " + ex);
                    }
                    break;
                }
            }
            else {
               //进入这边
                consumer.rollbackOnExceptionIfNecessary(ex);
                throw ex;
            }
        }
    }

    return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));

}

进入rollbackOnExceptionIfNecessary方法

public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {

  //当ack机制为AUTO的时候
    boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
    try {
        if (this.transactional) {
            if (logger.isDebugEnabled()) {
                logger.debug("Initiating transaction rollback on application exception: " + ex);
            }
            RabbitUtils.rollbackIfNecessary(this.channel);
        }
        if (ackRequired) {
           //是否入队列,shouldRequeue就是具体的入队列和不入队列的判断
            boolean shouldRequeue = RabbitUtils.shouldRequeue(this.defaultRequeuRejected, ex, logger);
            for (Long deliveryTag : this.deliveryTags) {
                // With newer RabbitMQ brokers could use basicNack here...
                //执行拒绝策略
                this.channel.basicReject(deliveryTag, shouldRequeue);
            }
            if (this.transactional) {
                // Need to commit the reject (=nack)
                RabbitUtils.commitIfNecessary(this.channel);
            }
        }
    }
    catch (Exception e) {
        logger.error("Application exception overridden by rollback exception", ex);
        throw e;
    }
    finally {
        this.deliveryTags.clear();
    }
}

是否入队列的判断(shouldRequeue

public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
    boolean shouldRequeue = defaultRequeueRejected ||
            throwable instanceof MessageRejectedWhileStoppingException;
    Throwable t = throwable;
    while (shouldRequeue && t != null) {
       //如果抛出的异常是AmqpRejectAndDontRequeueException的时候,不入队列
        if (t instanceof AmqpRejectAndDontRequeueException) {
            shouldRequeue = false;
        }
        t = t.getCause();
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
    }
    return shouldRequeue;
}

container.setDefaultRequeueRejected(false);,那么消息就不会重新入队列,只会拒绝一次。

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("zhihao.miao.order");
    //自动确认涉及到一个问题就是如果在消息消息的时候抛出异常,消息处理失败,但是因为自动确认而server将该消息删除了。
    //NONE表示自动确认
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    container.setDefaultRequeueRejected(false);
    container.setMessageListener((MessageListener) (message) -> {
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        throw new NullPointerException("消息消费失败");
        //throw new AmqpRejectAndDontRequeueException("消息消费失败");
        //throw new ImmediateAcknowledgeAmqpException("消息消费失败");

    });
    return container;
}

使用@RabbitListener注解监听队列

设置确认模式是通过在容器中设置RabbitListenerContainerFactory实例的setAcknowledgeMode方法来设定。

配置:

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //默认的确认模式是AcknowledgeMode.AUTO
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

处理器:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

@Component
public class MessageHandler {

    @RabbitListener(queues ="zhihao.miao.order")
    public void handleMessage(byte[] bytes, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("====消费消息===handleMessage");
        System.out.println(new String(bytes));
        channel.basicAck(tag,false);
    }
}

应用启动类:

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@EnableRabbit
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(3000);
        context.close();
    }
}

可靠消息总结

实际使用mq的实例,每段时间定期的给经常订早餐的推送短信(上新品)。
登录短信(也是使用消息中间件)
下单的时候,使用消息中间件发送到配送系统(消息不能丢失)。

做到消息不能丢失,我们就要实现可靠消息,做到这一点,我们要做到下面二点:

一:持久化
1: exchange要持久化
2: queue要持久化
3: message要持久化
二:消息确认
1: 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)
2:生产者和Server(broker)之间的消息确认。
3: 消费者和Server(broker)之间的消息确认。

对于重要的消息,要结合本地的消息表才能上生产。

参考资料Consumer Acknowledgements and Publisher Confirms

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,738评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,377评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,774评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,032评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,015评论 5 361
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,239评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,724评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,374评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,508评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,410评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,457评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,132评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,733评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,804评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,022评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,515评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,116评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,566评论 18 139
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,813评论 8 167
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,340评论 2 34
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,420评论 0 34
  • 1.什么是消息队列 消息队列允许应用间通过消息的发送与接收的方式进行通信,当消息接收方服务忙或不可用时,其提供了一...
    zhuke阅读 4,445评论 0 12