什么是rocketmq
RocketMQ 是阿里巴巴开源的消息队列中间件。具有下列特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 亿级消息堆积能力
- 事务消息
“严格的消息顺序” 是指在需要的情况下,可以使 producer 发送的消息被 consumer 顺序的接收; “丰富的消息拉取模式” 是指可以选择 pull 或 push 两种消息消费模式(但是其实都是 consumer 主动从broker 拉取消息);“订阅者水平扩展能力” 是指可以多个 consumer 同时 subscribe 同一个队列时,根据 consumer 是否在同一个 consumer group 来决定消息是交给所有 consumer 消费还是选择某个 consumer 消费,可以实现 consumer 侧的负载均衡;“亿级消息堆积能力” 是指 broker 接收到的消息后会将其存在文件中,所以可以做到存储大量消息,并供不同消费者重复消费。“事务消息” 是指可以用来实现最终一致性的分布式事务。
rocketmq的组成部分
上图是一个典型的 RocketMQ 网络拓扑图,有以下组成部分:
- producer
- consumer
- Name server
- Broker
Broker 又分为 master 和 slave,master 可以进行消息的读写,slave 同步 master 接收的消息,只能用来进行消息的读取。其中:
(1) producer 为消息的生产者,为了提高写消息的效率,同时防止单点,可以部署多个 master broker,producer 可以向不同的 broker 写入数据。
(2)consumer 为消息的消费者,有集群模式和广播模式两种消费方式,还可以设置 consumer group。在集群模式下,同一条消息只会被同一个 consumer group 中的一个消费者消费,不同 consumer group 的 consumer 可以消费同一条消息;而广播模式则是多个 consumer 都会消费到同一条消息。
(3)Name server 用来管理 broker 以及 broker 上的 topic,可以接收 Broker 的注册、注销请求,让 producer 查询 topic 下的所有 BrokerQueue,put 消息,Consumer 获取 topic 下所有的 BrokerQueue,get 消息
(4) Broker 又分为 master 和 slave,master 可以进行消息的读写,slave 同步 master 接收的消息,只能用来进行消息的读取。一个 Master 可以有多个 Slave,但一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有的 NameServer。
需要注意的是 producer 和 consumer 在生产和消费消息时,都需要指定消息的 topic,当 topic 匹配时,consumer 才会消费到 producer 发送的消息,除此之外, producer 在发送消息时还可以指定消息的 tag,consumer 也可以指定自己关注哪些 tag,这样就可以对消息的消费进行更加细粒度的控制 。
broker 中同一个 topic 又可以分为不同的 queue,consumer 在集群模式下消费时,同一个 topic 下不同的 queue 会被 分配给同一个 consumer group 中不同的 consumer,实现接收端的负载均衡,同时也为顺序消息的实现提供了基础。
在同一个broker上,所有 topic 的所有 queue 的消息,存放在一个文件里面,并且,为不同的 queue 生成了不同的 ConsumeQueue,这样, consumer 就可以指定 topic、消息发送时间等信息,从 ConsumeQueue 中读取消息在 commit log 中的偏移,然后再去 commit log 中读取消息:
rocketmq环境搭建与基本使用
安装
搭建 RocketMQ 环境需要下列条件:
- 64bit JDK 1.7+;
- Maven 3.2.x
先从 github 获取 RocketMQ 的源码:
git clone https://github.com/apache/incubator-rocketmq.git
然后进入源码目录进行编译:
mvn clean package install -Prelease-all assembly:assembly -U
需要注意的是,在 Mac os x 上,有些测试无法通过,加入 -DskipTests 即可,不影响使用。在 linux 和 windows 上都没有这个问题。
然后就可以进入 target/apache-rocketmq-all/
,准备运行 name server 和 broker了。
单 broker 测试
运行 name server:
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
如果看到日志中出现: The Name Server boot success...,说明 name server 就启动成功了。
运行 broker:
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
看到 The broker[%s, 192.168.0.133:10911] boot success... 这样的日志,就算启动成功了。
然后运行 consumer,代码如下:
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.0.133:9876");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume. * represent this consumer will consume all sub tags
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
再运行 producer:
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.133:9876");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Launch the instance.
*/
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
就可以看到 consumer 打印出接收到的消息了:
...
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=749, sysFlag=0, bornTimestamp=1492238283708, bornHost=/192.168.0.103:50436, storeTimestamp=1492238278824, storeHost=/192.168.0.104:10911, msgId=C0A8006800002A9F0000000000091409, commitLogOffset=594953, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1492238283710, UNIQ_KEY=C0A80067C1B018B4AAC248A9BDBC03E3, WAIT=true, TAGS=TagA}, body=18]]]
...
RocketMQ 集群
只使用单个 Broker 单个 Name Server 的话,无法保证服务的高可用,所以一般会选择启动多个 NameServer,多个 Master 以及 多个 slave。可以配置的选项主要有:(1)接收到消息写入文件后刷盘是异步还是同步,同步刷盘会导致磁盘 IO 增多从而运行效率下降,同时由于有若干 slave 备份消息,一般不建议使用同步刷盘;(2)master slave 之间复制消息使用同步还是异步方式,同步方式的情况下 producer 写入消息后,当消息从 master 复制到 slave 成功后才返回,而异步情况下 master 处理好了消息就直接返回了。在 incubator-rocketmq/target/apache-rocketmq-all/conf 目录下,有一些示例配置:2m-2s-async、2m-2s-sync、2m-noslave 分别对应不同的配置示例,这里就配置 2m-noslave。
有2台服务器 192.168.0.133 以及 192.168.0.104,我们先在两台服务器上分别启动 name server。
然后使用
nohup bash mqbroker -c ../conf/2m-noslave/broker-a.properties -n '192.168.0.133:9876;192.168.0.104:9876' &
nohup bash mqbroker -c ../conf/2m-noslave/broker-b.properties -n '192.168.0.133:9876;192.168.0.104:9876' &
分别启动不同的 Broker。这里需要注意的是 Broker 的配置项和 org.apache.rocketmq.common.BrokerConfig 类的成员变量一一对应,如果有定制化的,直接看看 BrokerConfig 中有什么选项就好了。
查看 Name Server 的日志,可以看到两个 Broker 分别在两个 Name Server 上注册成功。在 consumer 和 producer 中,也记得使用下面的代码来设置 Name Server:
consumer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
producer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
顺序消息
顺序消息指消息被消费的顺序和 producer 发送消息的顺序严格一致。RocketMQ 要实现顺序消息有 2 个要求:
- Producer 保证发送消息到同一个队列;
- consumer 保证同一个队列同时只有一个 consumer 在消费。
具体实现上,Producer 需要使用 MessageQueueSelector
根据业务需求使用某个参数,比如订单号,将关联的数据发送到同一个队列去。
Consumer 需要使用 MessageListenerOrderly
,它将会定时的向 Broker 申请锁住某些特定的队列,Broker 的RebalanceLockManager 里的 ConcurrentHashMap mqLockTable 记录着队列与 consumer client 的对应关系,consumer 可以尝试对队列加锁,并获取自己当前持有哪些队列的锁:
private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
对于 consumer,除了知道自己持有哪些队列的锁,可以对这些队列进行消费外,还需要保证同一时间只有一个线程会消费同一个队列,所以在本地维护了一个变量,其类型为:
public class MessageQueueLock {
private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
}
对于每一个队列,都有一个 objLock,在消费时对该 objLock 使用 synchronizd 加锁,保证同一时间只有一个线程在消费该队列。
对于每个正在处理中的队列,用一个 ProcessQueue
维护其状态,并在内部使用一个 TreeMap 记录所有本地获取到且未消费的消息,key 为消息的 offset,value 为消息,方便按消息的 offset 获取消息:
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
为了实现消费失败时暂停消费,还再读取消息进行处理时将消息放到一个暂存队列里:
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
msgTreeMapTemp.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
if (result.isEmpty()) {
consuming = false;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
这样,就可以在处理失败时将消息从 msgTreeMapTemp 放回 msgTreeMap 中,在成功时候增加消息消费的 offset 了:
public void rollback() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.putAll(this.msgTreeMapTemp);
this.msgTreeMapTemp.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("rollback exception", e);
}
}
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
this.msgTreeMapTemp.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
在处理完消息后,会根据处理结果进行一些后序动作,包括增加消费的 offset,并更新 offset 到 Broker 等,这样就不会每次队列重启都重新消费之前的数据了:
public boolean processConsumeResult(//
final List<MessageExt> msgs, //
final ConsumeOrderlyStatus status, //
final ConsumeOrderlyContext context, //
final ConsumeRequest consumeRequest//
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
分析了这么多,还是上一段代码来说明一下使用的方法,下面为 producer:
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
// producer.setNamesrvAddr("192.168.0.104:9876");
// producer.setNamesrvAddr("192.168.0.133:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
Random random = new Random();
random.setSeed(System.currentTimeMillis());
for (int i = 0; i < 100; i++) {
int orderId = Math.abs(random.nextInt());
Message msg =
new Message("TopicTestShunxu", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
System.out.println(mqs);
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
下面是 consumer:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestShunxu", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
this.consumeTimes.incrementAndGet();
for (MessageExt msg : msgs) {
System.out.println(msg.getStoreHost() + " " + msg.getQueueId() + " " + new String(msg.getBody(), Charset.forName("UTF-8")));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
注意,对于 consumer 而言,在暂时无法成功处理消息时,需要返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,这样就会在一段时间之后重试消费消息。
另外还有一点要注意的是,顺序消息不能保证消息只被消费一次:比如当某个 consumer 处理完消息但还没有更新消息 offset 到 broker 时挂了,其他的 consumer 会获取队列的锁,并且重新消费该消息。所以在 consumer 的业务逻辑中一定一定要对消息做去重处理,否则要是发了两份货或者转了两笔钱,你老板可能就会扣你工资了 😁。
事务消息
所谓的事务消息,是指将事务处理+消息发送结合起来,保证同时失败或同时成功。比如从账户 A 扣钱,发了一个消息给账户 B 增加一笔钱,那么必须保证扣钱成功就发出去消息,扣钱失败不能发出去消息。这样做的好处是什么呢?
在单机环境下,一个转账操作如下:
但是当用户十分多以后,两个账户可能不在一台服务器上,可能需要这样做:
但是像上图这样做,编程会十分复杂,要考虑到各种异常情况,同时效率也比较低。那么可能会有下面的这种解决方案,将大事务分解为小事务+消息,不追求完全的一致性,只需要最终一致就好:
最终一致性这种处理问题的思路我们其实经常会用到,一个典型的例子就是调用通过第三方支付平台给用户转账,我们在调用其 API 进行请求时,可能会返回成功,可能会返回失败,也可能返回未知状态。如果直接返回了成功或失败,就可以直接决定调用失败或者是调用成功,减少用户账户余额,但是如果返回未知,则可能需要从用户账户中扣款,然后记录用户有一笔转账在进行中,后续对该转账进行处理,查询其是否成功来决定完成扣款或返还金额到用户账户。这里就用一个转账记录实现了最终一致性。
但是这种场景有一个问题:那就是到底什么时发送消息。如果在事务完成之前发,那么事务失败的话怎么办?如果在事务完成之后发,那么消息发送失败了怎么办?当然还有一种选择是在事务中发送消息,先不 commit 事务,在消息发送后根据消息发送结果决定是 commit 还是 rollback,但是这样又会造成事务时间过长,可能会造成数据库查询效率下降。
RocketMQ 解决这个问题的方法是进行两阶段提交,在事务开始前先发送一个 prepared 消息,完成事务后再发送确认消息,之后,consumer 就可以读取到这个消息进行消费了。但是,这又引入了一个问题,确认消息发送失败了怎么办?RocketMQ 是这么做的:在收到 prepared 消息而未收到确认消息的情况下,每隔一段时间向消息发送端( producer )确认,事务是否执行成功。这样就能保证消息发送与本地事务同时成功或同时失败。
所以,使用事务消息要提供两种 callback:
- 执行事务的 callback,在执行完事务后根据执行结果发送确认消息;
- RocketMQ 查询事务结果的 callback,在这个 callback 里查询事务执行的结果。
下面,就来一个简单的例子:
//执行事务的 callback
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex = new AtomicInteger(1);
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement();
System.out.println("execute local transaction " + msg.toString());
if (value == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
//检查事务完成情况的 callback 比如可以在 msg 中带上 订单号,查询订单是否支付成功
public class TransactionCheckListenerImpl implements TransactionCheckListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.printf("server checking TrMsg " + msg.toString() + "%n");
int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
下面是 producer:
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("trans_group");
producer.setNamesrvAddr("192.168.0.133:9876");
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
try {
Message msg =
new Message("topicTrans", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
以上。