什么是activeMQ呢?
答:activeMQ是传送企业间应用的消息或者数据的一种中间件,为什么会有这个activeMQ的产生呢?
在各大企业中有很多的应用并不是单独的应用,而是组合起来的,分布式存在,他们之间相互传递消息或者事件,完全通过消息传递来发送和接收商业信息,能够在各种不同的数据类型、操作系统、协议甚至是不同的编程语言中交流数据,activeMQ为其提供了异步消息传递。
JMS从图上来看是一种规范,提供消息传递的API,而activeMQ是JMS的一种实现,即我们利用JMS提供的各大消息的接口可以实现应用之间的数据传送,但是需要实现接口的内容才能将消息发送出去。
1 写个简单的demo理解前需要搭建一个环境
1.首先下载activeMQ
去官方网站下载:http://activemq.apache.org/
2.运行activeMQ
解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\Win64或者Win32\activemq.bat运行ActiveMQ程序。
启动ActiveMQ以后,登陆:http://localhost:8161/admin/显示登录的账号和密码,账号/密码是admin
2 写个简单的demo来理解一下吧
写一个消息的传递者和消息接收者
JMSProducer
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 测试使用JMS API 连接ActiveMQ
*/
public class JMSProducer {
/**
* 因为是演示代码,这里就不用添加异常处理,但是正式代码是需要添加异常处理的
* 在activeMQ中实现JMS的规范的步骤:
* 1.首先需要确定是实现那个消息协议,创建一个消息协议
* 2.创建会话
* 3.创建消息的目的地
* 4.创建JMS和ActiveMq的连接
* @param args
*/
public static void main(String[] args) throws Exception{
//定义JMS-ActiveMQ连接信息(默认为Openwire协议)
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
/* connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
**/
Session session = null;
Destination sendQueue;
Connection connection = null;
//进行连接,会有异常抛出
connection = connectionFactory.createQueueConnection();
connection.start();
//建立会话(设置一个带有事务的会话)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
//建立queue
sendQueue = session.createQueue("/test");
//建立消息发送者对象
MessageProducer messageProducer = session.createProducer(sendQueue);
//建立文本信息对象
TextMessage textMessage = session.createTextMessage();
textMessage.setText("这是发送的内容");
//发送,JMS是支持事务的
messageProducer.send(textMessage);
session.commit();
//关闭
messageProducer.close();
connection.close();
}
}
JMSConsumer
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 消费者从消息队列获取消息
*/
public class JMSConsumer {
public static void main(String[] args) throws Exception {
//建立连接
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
/* connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
*/
//定义各种需要的变量/
Connection connection = null;
Destination sendQueue;
Session session = null;
//进行连接
connection = connectionFactory.createQueueConnection();
connection.start();
//创建session,createSession 中的参数(第一个是是否使用参数,第二个是签收参数)
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//创建消息队列
sendQueue = session.createQueue("/test");
//创建消息接收者
MessageConsumer consumer = session.createConsumer(sendQueue);
//创建监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
//接收到消息后启动该方法,接收到消息后,不需要再发送ack了
System.out.println("Message"+message);
}
});
synchronized (JMSConsumer.class){
JMSConsumer.class.wait();
}
//关闭
consumer.close();
connection.close();
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.phpdragon</groupId>
<artifactId>jms-activemq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>jms-activemq-demo</name>
<url>http://maven.apache.org</url>
<developers>
<developer>
<id>phpdragon</id>
<name>phpdragon</name>
<email>phpdragon@qq.com</email>
</developer>
</developers>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<fastjson.vesrion>1.2.35</fastjson.vesrion>
<spring.version>4.3.10.RELEASE</spring.version>
<junit.version>4.12</junit.version>
<testng.version>6.11</testng.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.vesrion}</version>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.1</version>
</dependency>
<!-- spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- spring -->
<!--单元测试-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!--自动化测试-->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testng.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
因为采用的maven管理项目,而不是导入各种jar包,所以需要在项目中添加maven,导入pom文件,
**当你已经在项目中添加了maven,并且在activeMQ中已经启动了ActiveMQ,在网页上也已经可以观察该消息中间件的各种连接,运行JMSProducer和JMSConsumer无论优先级,
这是运行的结果
MessageActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:ZB-PC0DAGZS-51385-1520579894242-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:ZB-PC0DAGZS-51385-1520579894242-1:1:1:1, destination = queue:///test, transactionId = TX:ID:ZB-PC0DAGZS-51385-1520579894242-1:1:1, expiration = 0, timestamp = 1520579894414, arrival = 0, brokerInTime = 1520579894415, brokerOutTime = 1520579894492, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@29edaf12, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 这是发送的内容}
可以看一下管理端的运行效果
OK!!!