小白一枚,第一次写博客,请大家多多包涵,有什么错误,请大家不吝指教。
- 这篇将类型为 direct和topic ,direct和topic 差不多,只是一个是精准匹配,一个是模糊匹配,可以使用通配符。所以我放在一起了。
同样先配置pom.xml 和 application.properties,跟上篇一样配置 ,这里就不写了
- 同样配置config
package com.study.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
/**
* @author: Nice
* @Description :
* @date: 创建时间:2018年8月29日 上午9:54:37
* @version: 1.0
*/
@Configuration
public class TDRabbitConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
public static final String TEXCHANGE = "topicexchange";// 消息交换机的名字
public static final String DEXCHANGE = "directexchange";// 消息交换机的名字
public static final String TQUEUE = "t_topic"; // topic 队列名
public static final String DQUEUE = "d_topic"; // direct 队列名
public static final String TROUTING_KEY = "topic.*"; // routing key
public static final String DROUTING_KEY = "direct.routing"; // routing key
/**
* @date:2018/8/29
* @className:ConnectionFactory
* @author:nice
* @description: 创建连接工厂
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.host, this.port);
connectionFactory.setUsername(this.username);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtualHost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
/**
*
* @author: nice
* @time: 2018年8月29日 上午11:24:58
* @Description: 定义交换机类型为topic
* @return: FanoutExchange
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TDRabbitConfig.TEXCHANGE);
}
/**
*
* @author: nice
* @time: 2018年8月29日 下午2:02:30
* @Description: 定义交换机类型为direct
* @return: DirectExchange
*/
@Bean
public DirectExchange directExchage() {
return new DirectExchange(TDRabbitConfig.DEXCHANGE);
}
/**
*
* @author: nice
* @time: 2018年8月29日 上午9:45:59
* @Description: 定义与topic类型的交换机绑定的队列名字
* @return: Queue
*/
@Bean
public Queue queueTopic() {
Queue queue = new Queue(TDRabbitConfig.TQUEUE, true, false, false);
return queue;
}
/**
*
* @author: nice
* @time: 2018年8月29日 下午2:04:27
* @Description: 定义与direct类型的交换机绑定的队列名字
* @return: Queue
*/
@Bean
public Queue queueDirect() {
Queue queue = new Queue(TDRabbitConfig.DQUEUE, true, false, false);
return queue;
}
/**
*
* @author: nice
* @time: 2018年8月29日 下午2:06:22
* @Description: 为topic类型的exchange与queue,binding也就是reoutingkey 进行绑定;
* @return: Binding
*/
@Bean
public Binding bindingTopicExchange() {
return BindingBuilder.bind(queueTopic()).to(topicExchange()).with(TDRabbitConfig.TROUTING_KEY);
}
/**
*
* @author: nice
* @time: 2018年8月29日 下午2:10:22
* @Description: 为direct类型的exchange与queue,binding也就是reoutingkey 进行绑定;
* @return: Binding
*/
@Bean
public Binding bindingDirectExchange() {
return BindingBuilder.bind(queueDirect()).to(directExchage()).with(TDRabbitConfig.DROUTING_KEY);
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
/**
*
* @author: nice
* @time: 2018年8月29日 下午2:28:56
* @Description: 监听队列配置的containner工厂
* @return: SimpleRabbitListenerContainerFactory SimpleMessageListenerContainer
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
- queue与消费者进行绑定,下面两种方法都可以,上面在(一)中介绍的那种直接在config中写死的也可以,但是有局限性如果是微服务,生产者和消费者在不同服务上,就不好用了。当然下面这两种方法是支持的。
- 与类型为topic的exchange进行绑定的queue与之对应的消费者配置
package com.study.rabbitConsumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.study.config.TDRabbitConfig;
/**
* @author: Nice
* @Description :
* @date: 创建时间:2018年8月29日 下午2:23:58
* @version: 1.0
*/
@Component
@RabbitListener(queues = TDRabbitConfig.TQUEUE)
public class TopicConsumer {
@RabbitHandler
public void process(String s) {
System.out.println("TopicConsumer---->" + s);
}
}
- 与类型为direct的exchange进行绑定的queue与之对应的消费者配置
package com.study.rabbitConsumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author: Nice
* @Description :
* @date: 创建时间:2018年8月29日 下午2:24:10
* @version: 1.0
*/
@Component
public class DirectConsumer {
@RabbitListener(queues="d_topic")
public void consumer(String s){
System.out.println("DirectConsumer---->"+s);
}
}
- 生产者的配置,其中exchange类型为 topic的,发送消息时的routing key 可以是topic.* ,*匹配所有;如果是#,则只能匹配一个单词。携带的消息可以是任意类型的,要注意的是,这边传递的是什么类型,那么对应的消费者接受的类型也要一样,不然接收不了;
package com.study.ExcessController;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.study.config.FanoutRabbitConfig;
import com.study.config.TDRabbitConfig;
/**
* @author: Nice
* @Description :
* @date: 创建时间:2018年8月29日 上午10:14:41
* @version: 1.0
*/
@RestController
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/fanout")
public String fanout() {
rabbitTemplate.convertAndSend(FanoutRabbitConfig.F_EXCHANGE,FanoutRabbitConfig.ROUTINGKEY1,"fanoutssss");
return "fanout";
}
@RequestMapping("/direct")
public String direct() {
rabbitTemplate.convertAndSend(TDRabbitConfig.DEXCHANGE,TDRabbitConfig.DROUTING_KEY,"direct");
return "direct";
}
@RequestMapping("/topic")
public String topic() {
rabbitTemplate.convertAndSend(TDRabbitConfig.TEXCHANGE,"topic.message","topic");
return "topic";
}
}
4. 在浏览器中输入:
http://localhost:8080/direct
http://localhost:8080/topic
如果在控制台中有相应的输出,则代表配置成功;