背景描述
一个公司有100个门店,每个门店产生的订单都进入消息队列,Consumer 读取并处理。
如果其中某个门店的订单量特别大,这就会造成资源独占,Consumer 一直在处理这个门店的订单,其他门店的订单就得延迟较长时间才能被处理。
解决思路
给 Topic 设置100个 MessageQueue,把每个门店的订单写入一个 MessageQueue,consumer 默认是采取循环的方式逐个读取Topic中的每个MessageQueue,这样,即使某个店的订单量很大,也是这个店对应的 MessageQueue 消息量增大,不会造成其他店等待时间变长。
示例代码
public void selectorProducer() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
try {
producer.start();
for (int i = 0; i < 20; i++) {
int orderId = i % 10;
Message message = new Message("TopicTest", "push",
("发送消息----" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}