RockeMQ quickstart

1、RocketMQ

[官网地址] (http://rocketmq.apache.org)

function : 应用解耦、流量消峰、消息分发、保证最终一致性、方便动态扩容等

2、linux 单机RocketMQ

可以参考官网quick-start

1、准备RocketMQ
  1. 官网下载编译好的二进制文件,或者下载源码自己编译。

    RocketMQ 当前的最新版本是4.2.0

  2. 系统要求: 64bit 的Linux 、Unix 或Mac 。
    Java 版本大于等于JDKl.8 。
    如果需要从GitHub 上下载源码和编译的话需要安装Maven 3.2.x 和Git 。

[root@aliyun rocketmq-all-4.2.0-bin]#>  unzip rocketmq-all-4.2.0-bin-release.zip -d ./rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]#  cd rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]#  ls 

里面含有以下内容: LICENSE NOTICE README.md benchmark/ bin/ conf/ lib/

+ LICENSE 、NOTICE 和README.md 包括一些版权声明和功能说明信息;

+ benchmark 里包括运行benchmark 程序的shell 脚本; 

+ bin 文件夹里含有各种使用RocketMQ的shell脚本和cmd 脚本,比如启动NameServer的mqnamesrv启动Broker的mqbroker,集群管理脚本mqadmin 等; 

+ conf 文件夹里有一些示例配置文件,包括三种方式的broker 配置文件、logback 日志配置文件等,用户在写配置文件的时候,一般基于这些示例配置文件,加上自己特殊的需求即可; 

+ lib 文件夹里包括RocketMQ各个模块编译成的jar 包,以及RocketMQ 依赖的一些jar包,比如Netty、commons-lang、FastJSON 等。
2、启动RocketMQ服务

启动单机的消息队列服务比较简单,不需要写配置文件,只需要依次启动本机的NameServer 和Broker 即可。

启动NameServer:
[root@aliyun rocketmq-all-4.2.0-bin]#> nohup sh bin/mqnamesrv &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/namesrv.log

The Name Server boot success . ..

启动B roker :
[root@aliyun rocketmq-all-4.2.0-bin]# nohup sh bin/mqbroker -n localhost:9876 &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/broker.log

The broker[%s, 192.168.0.233 : 10911] boot success .. .

!!!!!! 1->内存不足

如果Java运行时环境的内存不足,修改jdk参数配置

rocketmq-all-4.2.0-bin/bin/runserver.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" 

rocketmq-all-4.2.0-bin/bin/runbroker.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"

!!!!!! 2-> org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &

3、用命令行发送和接收消息

实际上就是运行写好的demo 程序,后续我们可以参考这些demo 来写自己的发送和接收程序。

运行示例程序,发送和接收消息:

[root@aliyun rocketmq-all-4.2.0-bin]# export NAMESRV_ADDR=localhost:9876

[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

SendResult [sendStatus=SEND OK, msgid= ...

[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

ConsumeMessageThread 主d Receive New Messages : [MessageExt . ..
4、关闭消息队列
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown broker

Send shutdown request to mqbroker (36695 ) OK

[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown namesrv

Send shutdown request t o mqnamesrv (36664) OK
5、java client发送、消费消息 demo
  • java client
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.0.0-incubating</version>
        </dependency> 
  • Producer
    创建DefaultMQProducer对象,设置好GroupName和NameServer后启动,把待发送的消息拼装成Message对象,用Producer发送。
     public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("producer1");
            // 设置NameServer地址 , 多个地址之间用;分隔 ,但是也可以通过环境变量的方式设置 。
            producer.setNamesrvAddr("47.104.209.137:9876");
            producer.start();
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg = new Message(
                         "TopicTest",
                         "TagA",
                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                    );
                    //这里调用的是同步的方式,所以会有返回结果
                    SendResult sendResult = producer.send(msg);
                    //打印返回结果,可以看到消息发送的状态以及一些相关信息
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();
        }
    } 
  • Consumer
    设置GroupName 、NameServer 地址以及端口号。然后指明要操作的Topic 名称,最后进入发送和接收逻辑。
     public class Consumer {
             public static void main(String[] args) throws InterruptedException, MQClientException {
                 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
                 consumer.setNamesrvAddr("47.104.209.137:9876");
                 //这里设置的是一个consumer的消费策略
                 //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
                 //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
                 //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
                 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                 //设置consumer所订阅的Topic和Tag,*代表全部的Tag
                 consumer.subscribe("TopicTest", "*");
                 //设置一个Listener,主要进行消息的逻辑处理
                 consumer.registerMessageListener(new MessageListenerConcurrently() {
                     @Override
                     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                     ConsumeConcurrentlyContext context) {
                         System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                         //返回消费状态
                         //CONSUME_SUCCESS 消费成功
                         //RECONSUME_LATER 消费失败,需要稍后重新消费
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     }
                 });
                 //调用start()方法启动consumer
                 consumer.start();
                 System.out.println("Consumer Started.");
             }
     }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容