RocketMQ集群消费时队列分配

RocketMQ集群消费时队列分配

何时需要消息队列
业务解耦
最终一致性
广播
错峰流控
RocketMQ的核心概念

详情见于文档

PushConsumer调用负载均衡的方法
    public void doRebalance() {

        if (this.rebalanceImpl != null && !this.pause) {

            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());

        }

    }

可以看出真正负载均衡的是rebalanceImpl这个成员变量的工作,在RebelanceImpl类中代码为


public void doRebalance(final boolean isOrder) {

        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

        if (subTable != null) {

            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {

                final String topic = entry.getKey();

                try {

                    this.rebalanceByTopic(topic, isOrder);

                } catch (Exception e) {

                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

                        log.warn("rebalanceByTopic Exception", e);

                    }

                }

            }

        }



        this.truncateMessageQueueNotMyTopic();

    }

  1. 获取订阅关系,订阅关系是在consumer.start()中从consumer中复制过来的,订阅关系是以topic为key,tags组成的subscriptionData。

  2. 第二步遍历订阅关系,调用rebalanceByTopc(topic,isOrder)方法,根据topic和isOrder进行负载均衡

  3. 遍历结束,调用truncateMessageQueueNotMyTopic()方法,去除不属于当前consumer的topic对应的消息队列

分析rebalanceByTopic方法


private void rebalanceByTopic(final String topic, final boolean isOrder) {

        switch (messageModel) {

            case CLUSTERING: {

                //SD:集群模式下,从订阅信息表中获取topic对应的所有消息队列

                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);

                //SD:根据topic和consumerGroupName获取所有相关的consumerId

                //SD:此处如果同一个consumerGroupName下面的订阅关系不一致的话,会导致消息消费失败

                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

                if (null == mqSet) {

                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {

                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);

                    }

                }

                if (null == cidAll) {

                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);

                }

                if (mqSet != null && cidAll != null) {

                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();

                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);

                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;

                    //把所有的消息队列按照配置的分配策略进行分配,获取当前consumer获得的消息队列

                    try {

                        allocateResult = strategy.allocate(//

                                this.consumerGroup, //

                                this.mQClientFactory.getClientId(), //

                                mqAll, //

                                cidAll);

                    } catch (Throwable e) {

                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),

                                e);

                        return;

                    }



                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();

                    if (allocateResult != null) {

                        allocateResultSet.addAll(allocateResult);

                    }

                    //根据负载均衡的结果更新处理队列,consumer根据消息队列拉取消息

                    //移除不属于当前consumer的队列或者距离上次拉取时间超过最大间隔的队列

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

                    if (changed) {

                        log.info(

                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",

                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),

                                allocateResultSet.size(), allocateResultSet);

                        this.messageQueueChanged(topic, mqSet, allocateResultSet);

                    }

                }

                break;

            }

            default:

                break;

        }

    }

默认情况下,使用平均分配消息队列的策略;
分析truncateMessageQueueNotMyTopic方法


private void truncateMessageQueueNotMyTopic() {

        //获取从consumer处copy的订阅关系

        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

        //遍历自己的执行队列的消息队列集合

        // 如果目标消息队列的topic不存在当前的订阅关系中,移除这个消息队列

        for (MessageQueue mq : this.processQueueTable.keySet()) {

            if (!subTable.containsKey(mq.getTopic())) {



                ProcessQueue pq = this.processQueueTable.remove(mq);

                if (pq != null) {

                    pq.setDropped(true);

                    log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);

                }

            }

        }

    }

什么时候负载均衡

在consumer启动的过程中,rebalanceImpl会从consumer处复制订阅关系


public void start() throws MQClientException {

        switch (this.serviceState) {

            case CREATE_JUST:

                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),

                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());

                this.serviceState = ServiceState.START_FAILED;



                this.checkConfig();

                //复制订阅关系

                this.copySubscription();



                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {

                    this.defaultMQPushConsumer.changeInstanceNameToPID();

                }



                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);



                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());

                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());

                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);



                this.pullAPIWrapper = new PullAPIWrapper(//

                        mQClientFactory, //

                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());

                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);



                if (this.defaultMQPushConsumer.getOffsetStore() != null) {

                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();

                } else {

                    switch (this.defaultMQPushConsumer.getMessageModel()) {

                        case BROADCASTING:

                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

                            break;

                        case CLUSTERING:

                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());

                            break;

                        default:

                            break;

                    }

                }

                this.offsetStore.load();



                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

                    this.consumeOrderly = true;

                    this.consumeMessageService =

                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {

                    this.consumeOrderly = false;

                    this.consumeMessageService =

                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

                }

                //启动消费消息的服务

                this.consumeMessageService.start();



                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);

                if (!registerOK) {

                    this.serviceState = ServiceState.CREATE_JUST;

                    this.consumeMessageService.shutdown();

                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()

                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),

                            null);

                }

                //启动工厂方法

                mQClientFactory.start();

                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());

                this.serviceState = ServiceState.RUNNING;

                break;

            case RUNNING:

            case START_FAILED:

            case SHUTDOWN_ALREADY:

                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//

                        + this.serviceState//

                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),

                        null);

            default:

                break;

        }

        //当订阅关系发生变化省,更新订阅关系

        this.updateTopicSubscribeInfoWhenSubscriptionChanged();



        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        //立即负载均衡

        this.mQClientFactory.rebalanceImmediately();

    }

工厂方法启动逻辑


public void start() throws MQClientException {



        synchronized (this) {

            switch (this.serviceState) {

                case CREATE_JUST:

                    this.serviceState = ServiceState.START_FAILED;

                    // If not specified,looking address from name server

                    if (null == this.clientConfig.getNamesrvAddr()) {

                        this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());

                    }

                    // Start request-response channel

                    this.mQClientAPIImpl.start();

                    // Start various schedule tasks

                    this.startScheduledTask();

                    // Start pull service

                    this.pullMessageService.start();

                    // Start rebalance service

                    this.rebalanceService.start();

                    // Start push service

                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

                    log.info("the client factory [{}] start OK", this.clientId);

                    this.serviceState = ServiceState.RUNNING;

                    break;

                case RUNNING:

                    break;

                case SHUTDOWN_ALREADY:

                    break;

                case START_FAILED:

                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

                default:

                    break;

            }

        }

    }

rebalanceService是一个线程任务类,在线程任务中定时factory执行调用负载均衡


    public void run() {

        log.info(this.getServiceName() + " service started");



        while (!this.isStoped()) {

            this.waitForRunning(WaitInterval);

            this.mqClientFactory.doRebalance();

        }



        log.info(this.getServiceName() + " service end");

    }

factory执行负载均衡,其实就是遍历factory中的所有consumer,调用doRebalance()方法


    public void doRebalance() {

        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {

            MQConsumerInner impl = entry.getValue();

            if (impl != null) {

                try {

                    impl.doRebalance();

                } catch (Exception e) {

                    log.error("doRebalance exception", e);

                }

            }

        }

    }

AllocateMessageQueueStrategy 队列分配策略

默认情况下PushConsumer的AllocateMessageQueueStrategy
    public DefaultMQPushConsumer() {

        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());

    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容