消息中间件——RocketMQ(一) 环境搭建(完整版)

求关注

环境搭建

每章一点正能量:自我控制是最强者的本能。——萧伯纳

前言

最近在学习消息中间件——RocketMQ,打算把这个学习过程记录下来。此章主要介绍环境搭建。此次主要是单机搭建(条件有限),包括在Windows、Linux环境下的搭建,以及console监控平台搭建,最后加一demo验证一下。

环境准备

在搭建RocketMQ之前,请先确保如下环境已经搭建完毕

  • Java环境(我的JDK1.8)
  • Maven环境(我的3.6.1目前最新版)
  • Git环境

没有搭建的同学走传送门:

JDK环境搭建: JAVA8环境搭建
Maven环境搭建: Windows环境下使用Nexus 3.X 搭建Maven私服及使用介绍
Git环境搭建:Git环境搭建及配置


1. Windows环境下搭建

1.1 下载

官方网站:http://rocketmq.apache.org/

官网

目前最新版的是V4.5.0,点击进去。


rocketmq-all-4.5.0-bin-release.zip

选择下载 rocketmq-all-4.5.0-bin-release.zip。弹出另外一个页面,这里选择rocketmq-all-4.5.0-bin-release.zip进行下载。


rocketmq-all-4.5.0-bin-release.zip

下载成功后,选择一个目录放好并解压。
解压

1.2 修改JVM配置

以上操作完毕之后,进入目录bin目录,我这里是
H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
找到runserver.cmdrunbroker.cmd中的JAVA_OPT。

在这里插入图片描述

原JAVA_OPT:

set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

将 Xms Xmx 这两个值改小一些,改为1g,如:

set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

自己根据虚拟机内存大小设置,超出内存大小可能会报错。

1.3 配置环境变量

上述步骤执行完毕后,我们需要将RocketMQ安装目录的bin目录配置到环境变量中。

RocketMQ_HOME

1.4 启动

以上配置都完成,接下来就是启动过程。中间有点坑,请务必按步骤安装。

在RocketMQ安装目录的bin目录下,执行命令cmd:

我的目录:

H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin

可以通过shift+鼠标右击 触发cmd窗口选项。也可以通过win+R 在窗口输入cmd,进入cmd窗口后移动到bin目录下。

1.4.1 启动NAMESERVER

  • 执行命令:start mqnamesrv.cmd

成功后会弹出提示框,此框勿关闭。


success

1.4.3 启动BROKER

  • 执行命令:‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’

注意:假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。

失败

打开 runbroker.cmd 进行修改
原:

set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"

修改后:

set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
修改后

再次执行命令:
启动成功!

成功

这时候一共有三个窗口。


2. 安装Console监控

2.1 下载

下载地址:https://github.com/apache/rocketmq-externals

下载完后如图所示:选择——>rocketmq-console


rocketmq-console

2.2 配置

下载完成之后,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。


修改配置

修改配置

2.2 编译启动

进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。中间有个比较慢的下载过程需要等待。


编译

编译成功之后,cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.1.jar’,启动‘rocketmq-console-ng-1.0.1.jar’。

rocketmq-console-ng-1.0.1.jar

rocketmq-console-ng-1.0.1.jar

2.3 查看

访问地址:localhost:8082


界面

2.Linux环境下搭建

2.1 环境准备

  • Java环境
  • Maven环境

2.1.1 Linux环境搭建Jdk

下载JDK:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

下载需要的版本:

下载地址

上传到创建的目录/usr/java

解压命令

tar -zxvf jdk-8u181-linux-x64.tar.gz

配置环境变量命令

vim /etc/profile
 
JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH
 
source /etc/profile

验证是否成功命令


java -version

JDK安装成功

按照以上操作,完成JDK的安装。接下来安装Maven环境。

2.1.2 Linux环境搭建Maven

  1. 下载命令:

wget http://mirror.bit.edu.cn/apache/maven/binaries/apache-maven-3.2.2-bin.tar.gz

  1. 解压命令:

tar -zxvf apache-maven-3.2.2-bin.tar.gz

  1. 配置Maven环境命令:

vim /etc/profile
 
#配置maven环境变量
export MAVEN_HOME=/usr/maven/apache-maven-3.5.4
export MAVEN_HOME
export PATH=$PATH:$MAVEN_HOME/bin
 
source /etc/profile

  1. 验证是否成功命令:

mvn -v


Maven安装成功

2.2 下载RocketMQ

  1. 下载命令:

wget http://mirrors.hust.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-source-release.zip

  1. 解压命令:
unzip rocketmq-all-4.4.0-source-release.zip

[图片上传失败...(image-792c35-1557466617990)]

  1. 构建二进制文件命令

进入解压后的文件目录。

mvn -Prelease-all -DskipTests clean install -U

构建二进制成功

2.3 修改JVM配置

同Windows环境一样,修改JVM配置。
移动到目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/bin 中。编辑bin目录下runserver.shrunbroker.sh文件。

根据个人虚拟机大小进行修改


vim runserver.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"

修改JVM配置

2.4 配置RocketMQ环境变量

分别执行如下命令:

#修改环境变量
vim /etc/profile

export ROCKETMQ=/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
export PATH=$PATH:$ROCKETMQ/bin

#更新配置
source /etc/profile

[图片上传失败...(image-18d4c3-1557466617991)]

2.5 启动NAMESERVER

依然在之前的目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq

  • 执行命令:
##启动命令
nohup sh bin/mqnamesrv  >/dev/null 2>&1 &

##查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

[图片上传失败...(image-88d435-1557466617991)]

可以看图已经成功了!

2.6 启动BROKER

  • 执行命令:
##启动命令
nohup sh bin/mqbroker -n localhost:9876 &

##查看日志
tail -f ~/logs/rocketmqlogs/broker.log

[图片上传失败...(image-62e759-1557466617991)]

注意防火墙,如果端口连接失败,注意开通。

2.7 关闭命令

sh bin/mqshutdown broker    //停止 broker
 
sh bin/mqshutdown namesrv   //停止 nameserver

2.8 配置Console监控平台

同Windows平台搭建

2.8.1 启动Console

我这里直接将Windows平台打包好的jar包直接丢到了Linux系统中

  • 启动命令:
java -jar rocketmq-console-ng-1.0.1.jar

[图片上传失败...(image-435d8d-1557466617991)]

2.8.2 访问Console管理界面

访问地址:http://192.168.220.72:8082

[图片上传失败...(image-635ed8-1557466617991)]


3. Console监控平台说明

这里不做过多介绍,可以参考以下文章

官网地址:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

其他博客地址:https://guozh.net/rocketmqzhiconsolejiankongpingtaishiyongxiangjiesan/

3. 案例测试

案例整合环境:SpringBoot环境
案例来源于网络

3.1 pom.xml文件


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.coderprogramming.rocketmq</groupId>
    <artifactId>rocketmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.2 Producer生产者


**
 * @Description: 生产者
 * @author Coder编程
 * @date 2019/5/8 17:08
 */

@Component
public class Producer {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;


    public void orderedProducer() throws MQClientException, InterruptedException {
        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
         * 注意:ProducerGroupName需要由应用来保证唯一
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();


        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */
        try {
            for (int i = 0; i < 10; i++) {
                    Message msg = new Message("Topic1",// topic
                            "TagA",// tag
                            "001",// key
                            ("Send Msg:Hello MetaQ1").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);

                    Message msg2 = new Message("Topic2",// topic
                            "TagB",// tag
                            "002",// key
                            ("Send Msg:Hello MetaQ2").getBytes());// body
                    SendResult sendResult2 = producer.send(msg2);
                    System.out.println(sendResult2);


                    Message msg3 = new Message("Topic3",// topic
                            "TagC",// tag
                            "003",// key
                            ("Send Msg:Hello MetaQ3").getBytes());// body
                    SendResult sendResult3 = producer.send(msg3);
                    System.out.println(sendResult3);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        producer.shutdown();
    }

}


3.3 Consumer消费者



/**
 * @Description: 消费者
 * @author Coder编程
 * @date 2019/5/8 17:08
 */

@Component
public class Consumer {

    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;


    /**
     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
     * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
     */
    public void orderedConsumer() throws InterruptedException,MQClientException {
        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
        // consumer.setNamesrvAddr("10.10.0.102:9876");
        consumer.setNamesrvAddr(namesrvAddr);
        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe("Topic1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("Topic2", "*");
        consumer.subscribe("Topic3", "*");


        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("Topic1")) {
                    if (null != msg.getTags()) {
                        // 执行Topic1的消费逻辑
                        if (msg.getTags().equals("TagA")) {
                            // 执行TagA的消费
                            System.out.println("TagA开始。");
                        } else if (msg.getTags().equals("TagC")) {
                            System.out.println("TagC开始。");
                            // 执行TagC的消费
                        } else if (msg.getTags().equals("TagD")) {
                            // 执行TagD的消费
                            System.out.println("TagD开始。");
                        }
                    }
                } else if (msg.getTopic().equals("Topic2")) {
                    // 执行Topic2的消费逻辑
                    System.out.println("Topic2");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
         */
        consumer.start();
        System.out.println("Consumer Started.");
    }

}

3.3 properties配置文件


# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=192.168.220.72:9876
# 设置应用端口
server.port=8089

3.4 测试代码


/**
 * @author Coder编程
 * @Title: HelloWord
 * @ProjectName rocketmq
 * @Description: Hello World
 * @date 2019/5/814:14
 */

@RestController
public class Test {

    @Autowired
    private Producer producer;

    @Autowired
    private Consumer consumer;

    @RequestMapping("/test")
    public String testMQ2() {
        try {
            System.out.println("-----------------开始生产-----------------");
            producer.orderedProducer();
            System.out.println("-----------------开始消费-----------------");
            consumer.orderedConsumer();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "success";
    }

}


4.奉上源码

以上安装jar包和案例测试源码已经上传至GitHub/Gitee

源码图

源码地址:

Github地址

Gitee地址

文末

欢迎关注公众号:Coder编程
获取最新原创技术文章和相关免费学习资料,随时随地学习技术知识!

微信公众号

求关注
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,841评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,415评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,904评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,051评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,055评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,255评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,729评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,377评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,517评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,420评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,467评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,144评论 3 317
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,735评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,812评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,029评论 1 256
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,528评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,126评论 2 341

推荐阅读更多精彩内容