不支持的操作
//判断延时级别,如果大于0抛出异常,原因为:批量消息发送不支持延时
throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
//判断topic是否以 **"%RETRY%"** 开头,如果是, 则抛出异常,原因为:批量发送消息不支持消息重试
throw new UnsupportedOperationException("Retry Group is not supported for batching");
//判断集合中的每个Message的topic与批量发送topic是否一致,如果不一致则抛出异常,原因为:批量消息中的每个消息实体的Topic要和批量消息整体的topic保持一致。
throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
//判断批量消息的首个Message与其他的每个Message实体的等待消息存储状态是否相同,如果不同则报错,原因为:批量消息中每个消息的waitStoreMsgOK状态均应该相同。
throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
Producer
从客户端日志中看出,rocketmq 消息体大小有限制,
CODE: 13 DESC: the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(this.nameServer);
producer.setMaxMessageSize(10 * 1024 * 1024);
producer.start();
String topic = "topic1";
String tag = "tag6";
List<Message> messages = new ArrayList<>();
for (int i=0; i<3; i++) {
messages.add(new Message(topic, tag, new byte[1024 * 1024]));
}
//批量发送消息,一次发送的消息总量不能超过4MB。
producer.send(messages);
可以通过setMaxMessageSize()指定生产者发送消息的大小限制,单位是字节,默认是4MB。如果需要发送超过4MB大小的消息,除了通过生产者的setMaxMessageSize()调整生产者这端的消息大小,还需要在Broker端通过maxMessageSize指定Broker允许的最大消息大小。此外,如果在批量发送消息的时候集合中包含的消息体的总体积超过4MB的,还可以对集合进行拆分,分为多次发送。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.batch;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//split the large batch into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
}
}
class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > sizeLimit) {
//it is unexpected that single message exceeds the sizeLimit
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
Consumer
我们前面介绍的消息监听器的回调方法consumeMessage()的消息参数msgs是一个集合,但是默认情况下它每次都只包含一条消息。如果需要回调时一次传递多条消息,可以通过消费者的setConsumeMessageBatchMaxSize() 指定一次最多消费的消息数量,默认是1。可能你想着我一次消费的数量越多越好,那我就定义一次消费50条。当定义了消息的最大消费数量是50时,实际上一次可消费的消息数量最多也就32条,因为broker端对此做了限制。因为Consumer默认一次最多只能拉取32条消息,可以通过setPullBatchSize()指定一次可以拉取消息的数量。而由于拉取到的一批消息会立刻拆分成N(取决于consumeMessageBatchMaxSize)批消费任务,所以集合中msgs的最大大小是consumeMessageBatchMaxSize和pullBatchSize的较小值。
Broker端控制批量消息
maxTransferBytesOnMessageInMemory =1024 * 256
maxTransferCountOnMessageInMemory = 32
下面代码指定了一次批量拉取消息的最大数量是30,一次消费消息的最大数量是5。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr(this.nameServer);
//指定批量消费的最大值,默认是1
consumer.setConsumeMessageBatchMaxSize(5);
//批量拉取消息的数量,默认是32
consumer.setPullBatchSize(30);
consumer.subscribe("topic1", "tag6");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "一次收到" + msgs.size() + "消息");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
当指定了批量消费消息的最大数量后,这一批消息将拥有同一个消费状态,即如果消息的消费状态返回的是CONSUME_SUCCESS,则它们都消费成功。如果返回的是RECONSUME_LATER,则它们都将再次投递。另外如果批量消费消息的数量不是1,则一次批量传递的消息都将在一个消费者线程中进行消费。比如指定了批量消费消息的最大数量是5,则如果拉取到了10条消息,则这10条消息会分别传递给两个消费者线程,每个线程各5条消息;而如果批量消费消息的数量使用默认值1则10条消息将传递给10个消费者线程,这样可能消费的性能更佳。读者朋友应该基于这两种情况慎重的考虑是否应该更改批量消费消息的默认值。
小结
1.DefaultMQPushConsumer定义了consumeMessageBatchMaxSize属性,默认值为1;
2.DefaultMQPushConsumerImpl的checkConfig方法会校验defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必须大于等于且小于等于1024;
ConsumeMessageConcurrentlyService的submitConsumeRequest方法在msgs.size()小于等于consumeBatchSize时会创建ConsumeRequest,然后提交到consumeExecutor执行,若出现RejectedExecutionException则执行submitConsumeRequestLater;
3.对于msgs.size()大于consumeBatchSize的,则按consumeBatchSize分批创建ConsumeRequest提交给consumeExecutor执行,若出现RejectedExecutionException则将剩余的msg添加到msgThis,然后执行submitConsumeRequestLater
文章转载自:
RocketMQ(06)——消息的批量发送和消费
聊聊rocketmq的consumeMessageBatchMaxSize