私聊我做毕设或者实验课题。
1.设计数据库
设计product表,用来记录商品的总数量
设计record表,用来记录消费者的id
2.业务的实现
1.导入相关依赖
<!--整合rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.相关文件的配置
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://112.124.17.134:3306/rabbitMQ?serverTimezone=GMT%2B8
username: root
password: 123456
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
rabbitmq:
host: localhost
username: guest
password: guest
publisher-confirms: true # 开启Rabbitmq发送消息确认机制,发送消息到队列并触发回调方法
publisher-returns: true
listener:
simple:
concurrency: 10 #消费者数量
max-concurrency: 10 #最大消费者数量
prefetch: 1 #限流(消费者每次从队列获取的消息数量)
auto-startup: true #启动时自动启动容器
acknowledge-mode: manual #开启ACK手动确认模式
mybatis-plus:
mapper-locations: classpath:xz/mapper/xml/*.xml
3.代码的实现
1.RabbitConfig类的实现
主要用于生成队列和交换机并进行绑定,这里将消息的转化为json输出,我注释掉了,因为后面我发送消息并不是以string的形式发送,而是自定义一个消息实体类messageHandler类用来发送消息
@Component
public class RabbitConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);
public static final String DIRECT_QUEUE ="DIRECT_QUEUE" ;
public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";
public static final String DIRECT_KEY = "DIRECT_ROUTING_KEY";
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public AmqpTemplate amqpTemplate() {
/**
* 定义消息转换实例 ,转化成 JSON传输
*
* @return Jackson2JsonMessageConverter
*/
//rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//rabbitTemplate.setEncoding("UTF-8");
// 消息发送失败返回到队列中,yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
/**
* 消息发送到交换器Exchange后触发回调。
* 使用该功能需要开启确认,spring-boot中配置如下:
* spring.rabbitmq.publisher-confirms = true
*/
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean b, String s)-> {
if (b) {
LOGGER.info("消息已确认 cause:{}",correlationData.getId());
} else {
LOGGER.info("消息未确认 cause:{}", s);
}
});
/**
* 通过实现ReturnCallback接口,
* 如果消息从交换器发送到对应队列失败时触发
* 比如根据发送消息时指定的routingKey找不到队列时会触发
* 使用该功能需要开启确认,spring-boot中配置如下:
* spring.rabbitmq.publisher-returns = true
*/
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)-> {
LOGGER.error("消息被退回:{}", message);
LOGGER.error("消息使用的交换机:{}", exchange);
LOGGER.error("消息使用的路由键:{}", routingKey);
LOGGER.error("描述:{}", replyText);
});
return rabbitTemplate;
}
/**
* 声明Direct交换机 支持持久化.
*
* @return the exchange
*/
@Bean("directExchange")
public Exchange directExchange() {
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
}
/**
* 声明一个队列 支持持久化.
*
* @return the queue
*/
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE).build();
}
/**
* 通过绑定键 将指定队列绑定到一个指定的交换机 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding directBinding(@Qualifier("directQueue") Queue queue,
@Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DIRECT_KEY).noargs();
}
}
2.发送消息
写一个controller类用来实现商品的抢购
private int userId=0;
//开始抢单
@RequestMapping("/begin")
@ResponseBody
public void begin(){
userId++;
this.send(new MessageHander(true,userId));
}
public String send(MessageHander message){
//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
CorrelationData correlationData=new CorrelationData(Integer.toString(message.getUserId()));
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_KEY, message,correlationData);
return "发送消息成功";
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageHander implements Serializable {
private boolean flag;
private int userId;
}
3.接受消息
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@Autowired
RabbitController controller;
/**
* @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
* @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,
* 根据 MessageConverter 转换后的参数类型
*
*
* 通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK
*/
@RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE}) //指定监听的队列名
public void receiver(MessageHander messageHander, @Headers Channel channel, Message message) throws IOException {
log.info("用户{}开始抢单", messageHander.getUserId());
try {
//处理消息
controller.robbingProduct(messageHander.getUserId());
// 确认消息已经消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝当前消息,并把消息返回原队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
public void robbingProduct(Integer userId) {
QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
queryWrapper.eq("productNO","123321NO");
Product product = productService.getOne(queryWrapper);
if (product != null && product.getTotal() > 0) {
int i = productService.updateProduct("123321NO");
if(i>0){
//插入记录
productService.insertProductRecord(new Record(null,"123321NO", userId));
//发送短信
LOGGER.info("用户{}抢单成功", userId);
}else {
LOGGER.error("用户{}抢单失败", userId);
}
} else {
LOGGER.error("用户{}抢单失败", userId);
}
}