Apache RocketMQ之JMS基本概念及使用
Apache RocketMQ 系列:
Apache RocketMQ之JMS基本概念及使用:https://www.jianshu.com/p/d2e3fd77c4f4
Apache RocketMQ 基础概念及架构解析:https://www.jianshu.com/p/95ab928960b3
Apache RocketMQ 的基础特性介绍:https://www.jianshu.com/p/570680b32590
Apache RocketMQ 集群搭建(两主两从):https://www.jianshu.com/p/b090138cf52c
Apache RocketMQ 刷盘策略与复制策略: https://www.jianshu.com/p/d66b381428bb
优秀博客:
https://blog.csdn.net/canot/article/details/53676350
https://blog.csdn.net/caidaoqq/article/details/45938919
https://blog.csdn.net/u013123635/article/details/78362360
介绍流程:
RocketMQ 是什么?
是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
RocketMQ是一个消息中间件,那什么是消息中间件?
关注于数据的发送与接收,利用高效可靠的异步消息传递机制集成分布式系统。
对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)
消息+中间件
- 消息:消息即为数据,数据就会有规划,有长度,有大小。
- 中间件:为我们提供发送消息的程序或者服务.
消息:
JMS(Java Message Service)
Java消息服务应用程序接口,是一个Java平台中关于面向消息中间件(Message Oriented Middleware)的API。
用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
在提到JMS时,我们通常会说到一些术语,解释如下:
消息中间件(JMS Provider) : 指提供了对JMS协议的第三方组件,比如RocketMQ就是一个消息中间件,另外比较知名的还有KafKa、 Rabbit MQ、ActiveMQ等。
消息(Message): 通信内容的载体,其结构主要分为消息头,属性和消息体,并且根据存储结构的不同分为好几种,后面会详细提到。
消息模式:分为点对点(Point to Point,即P2P)和发布/订阅(Pub/Sub),对应的数据结构分别是队列(Queue)和主题(Topic)
消息生产者:产生消息的一方,在P2P模式下,指消息发送者(Sender),在P/S模式下指消息发布者(Publisher)
消息消费者:接收消息的一方,对应于两种模式分别是消息接收者(Receiver)和消息订阅者(Subscriber)
中间件:
- 为我们提供发送消息的程序或者服务,
- 目前主流的有 rocketMq 、kafka、rabbitMq、activemq等。
JMS基本概念及原理详解
基本概念:
- JMS的客户端之间可以通过JMS服务进行异步的消息传输。
体系架构
JMS由以下元素组成。
元素 | 描述 |
---|---|
JMS提供者 | 连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。 |
JMS客户 | 生产或消费消息的基于Java的应用程序或对象。 |
JMS生产者 | 创建并发送消息的JMS客户。 |
JMS消费者 | 接收消息的JMS客户。 |
JMS消息 | 包括可以在JMS客户之间传递的数据的对象。 |
JMS队列 | 一个容纳那些被发送的等待阅读的消息的区域。这些消息将按照顺序发送,一旦一个消息被阅读,该消息将被从队列中移走。 |
JMS主题 | 一种支持发送消息给多个订阅者的机制。 |
JMS 消息(Message)
每个在JMS规范中概念都是围绕处理一个JMS消息,因为它包含了业务数据和事件是怎么被传输的。
JMS消息允许任何内容作为其一部分发送消息,包括文本和二进制数据以及标题中的信息。
JMS消息包含三部分,包括消息头、消息的属性和消息载体(类似于我们常用的大部分协议,如http等)。
消息头提供消息的接受端和发送端两个客户端和JMS规范的元数据。
消息载体是消息的实际内容,并且可以保存文本和承载了各种各样数据的二进制数据(如图片,流等等)。
JMS消息设计宗旨是在易于理解和可扩展,所有的复杂的内容包含在JMS消息头中。
如之前所说的,JMS消息的复杂性在消息头中,消息头有两种类型的报文头,它们具有相同的逻辑概念,但是语义上不同的。
由JMS规范提供的,客户端调用send()方法时,自动设置的消息头。
开发者分配的消息头。
标准的JMS消息头与JMS API提供的方法一起工作。大多数头信息是被自动分配的。
接下来描述每个头的含义,以及如何被分配到消息中的。
客户端调用send()方法时,自动设置的消息头。
JMSDestination
JMS发送消息的目的地。这对于使用来自多个目的地的消息的客户端很有价值。
使用一个Topic或Queue对象来标识目的地,二者都是Destination类型
相关方法:public abstract Destination getJMSDestination()
JMSDeliveryMode
JMS传送模式。支持两种模式:持久模式和非持久模式。默认的传递模式是持久。
一条持久性消息应该被传送“一次而且仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失;它会在服务器恢复正常之后再次传送。一条非持久性消息最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失。在持久性和非持久性者两种传送模式中,消息服务器都不会讲一条消息向同一消费者发送一次以上,不过,这在技术上最有可能实现的。
Persistent:持久模式,通知消息提供者去持久化消息。即使消息提供者挂掉,消息也不会丢失。在这个模式下,JMS提供者必须对消息进行持久化并且只进行一次。如果JMS提供者挂了,此时该JMS提供者的消息并不会丢失,但消息只能被消费者使用一次。
由于持久化消息提供了额外的可靠性保护,因此也需要更多的空间和性能消耗。Nonpersistent:非持久模式, 使得JMS提供者不需求持久化消息。JMS提供者必须最多传递一次非持久消息。如果JMS提供者挂了,此时该JMS提供者的消息会丢失,但不会出现第二次。非持久消息会提供更高的性能和较低的可靠性。
发送模式在消息发送者上设置,并应用于从发送的所有消息。 但是也可以针对单个消息覆盖发送模式。
相关方法:public abstract int getJMSDeliveryMode()
JMSMessageID
JMS消息ID。它是一个String类型的值,唯一标识了一条消息,并且必须以ID开头。
JMSMessageID对于JMS中消费者应用程序的历史仓库来说非常有用,它是仓库中的消息需要的唯一索引。
因为消息ID可能导致JMS提供程序产生一些开销,消息提供者可以建议JMS提供程序,JMS应用程序不依赖于这个消息头的值。
通过 MessageProducer.setDisableMessageID()方法设置。
如果JMS提供程序同意该建议,则消息标识必须设置为null。但JMS提供程序可以忽略此调用并始终分配消息ID。
相关方法:public abstract String getJMSMessageID()
JMSTimestamp
JMS时间戳。它包含的是JMS提供者接受消息的时间,而不是该消息实际传送的时间。这条消息头用于确认发送消息和它被消费者实际接受的时间间隔。
此标头的值使用标准Java millis时间值。
与JMSMessageID头类似,JMS提供者建议JMS生产者不设置JMSTimestamp头
通过MessageProducer.setDisableMessageTimestamp()方法设置,如果JMS生产者接受此建议,则它则将JMSTimestamp设置为零。
相关方法:public abstract long getJMSTimestamp()
JMSExpiration
JMS消息的超时时间。这个头信息被用来阻止过期消息的传递。对于那些数据仅在某一个时间段内有效的消息来说,非常有用的。
消息的超时值可以使用MessageProducer.setTimeToLive()方法设置该生产者发送的所有消息的生存时间,或使用一个MessageProducer.send()方法来设置单个消息的超时值。调用这两个方法都会设置时间,以毫秒为单位。
通过将生存时间添加到JMSExpiration消息头中来计算超时时间。默认情况下,超时时间为零,意味着消息不会过期。
如果未指定超时时间,则使用默认值并且消息不会过期。如果超时时间明确指定为零,那么同样的消息不会过期。
此消息头对于时间敏感的消息很有用。但要注意,JMS提供者程序不应传递已过期的消息和JMS客户端应该被写入以便不处理已经过期的消息。
相关方法:public abstract long getJMSExpiration()
JMSRedelivered
JMS重发。表示该消息将被重新传送给消费者。如果该消息被重新传送,JMSRedelivered消息头就为true,否则为false。
相关方法:public abstract boolean getJMSRedelivered()
JMSPriority
- JMS优先级。在传送一条消息时,消息生产者能够为该消息分配一个优先级,这个头是也设置在消息提供者者。
一旦在生产者上设置了优先级,适用于从该生产者发送的所有消息,也可以对单个消息进行单独设置。
JMS定义了10个级别的消息优先级,范围从 0(最低)到9(最高):
优先级0-4 - 这些优先级是普通优先级。
优先级5-9 - 这些优先级是加急优先级。
JMS提供者不需要实现消息排序,尽管大部分情况是需要排序。
他们应该简单地尝试提供更高优先级的消息在低优先级消息之前。
相关方法:public abstract int getJMSPriority()
开发者分配的消息头:
JMSReplyTo
JMS响应。一个JMS消息生产者可能会要求消费者对一条消息作出应答,JMSReplyTo消息头包含了一个javax.jms.Destination,表明JMS消费者应该应答的地址。
这个信息头通常用于消息的请求/回复模式。
已发送消息使用此标题通常希望得到接受者的响应,它是可选的头信息。
客户端必须做出是否对发送者的响应。
相关方法:public abstract Destination getJMSReplyTo()。
JMSCorrelationID
- JMS关联ID。提供了一个消息头,用于将当前的消息和先前的某些消息或应用程序特定的ID关联起来,这个消息头通常用于将响应消息与请求消息。
JMSCorrelationID的值可以是以下之一:
特定于提供者的消息ID
应用程序特定的字符串
提供程序自身的byte[]值
提供者的消息,以 ID: 为前缀开头,而应用程序特定的消息不能以 ID: 前缀开头。
如果是JMS提供者支持 native correlation ID的概念,JMS客户端可能需要分配一个特定的JMSCorrelationID值,以匹配非JMS客户端所期望的值,但是这不是必须的。
相关方法:public abstract String getJMSCorrelationID()。
JMSType
JMS类型。用于语义标识消息类型,是由JMS客户端设置的一个可选消息头。它的主要作用是标示消息结构和有效负载的类型。
这个消息头并未指明正被发送的消息类型,而是JMS提供者使用的内部消息仓库的一个条目。。
在使用请求/应答场景时,通过这条消息头属性可以进一步实现消息生产者和消息消费者之间的去耦。
这个头信息只有很少厂商使用并且和消息承载的Java类型无关。
相关方法:public abstract String getJMSType()。
消息属性
消息属性就像可以分配一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加消息。它们还用于暴露消息选择器在消息过滤时使用的数据。
message接口为读取和写入属性提供了若干个取值函数和赋值函数方法。
包括布尔值,字节,短整型,长整型,浮点型,双精度型,以及String对象类型的方法。
下面方法取自Message接口。
public interface Message {
...
boolean getBooleanProperty(String name) throws JMSException;
byte getByteProperty(String name) throws JMSException;
short getShortProperty(String name) throws JMSException;
int getIntProperty(String name) throws JMSException;
long getLongProperty(String name) throws JMSException;
float getFloatProperty(String name) throws JMSException;
double getDoubleProperty(String name) throws JMSException;
String getStringProperty(String name) throws JMSException;
Object getObjectProperty(String name) throws JMSException;
...
Enumeration getPropertyNames() throws JMSException;
boolean propertyExists(String name) throws JMSException;
...
void setBooleanProperty(String name, boolean value) throws JMSException;
void setByteProperty(String name, byte value) throws JMSException;
void setShortProperty(String name, short value) throws JMSException;
void setIntProperty(String name, int value) throws JMSException;
void setLongProperty(String name, long value) throws JMSException;
void setFloatProperty(String name, float value) throws JMSException;
void setDoubleProperty(String name, double value) throws JMSException;
void setStringProperty(String name, String value) throws JMSException;
void setObjectProperty(String name, Object value) throws JMSException;
...
}
- 还要注意用于处理消息上的泛型属性的两个方便方法:
- getPropertyNames() 方法,返回给定消息上所有属性的枚举,以便轻松地遍历所有属性。
- propertyExists() 方法用于测试消息上是否存在给定属性。
- 注意,特定于JMS的头部不被认为是通用属性,也不包含在getPropertyNames()方法返回的枚举中。
有三种类型的属性:自定义属性、JMS定义的属性和特定于提供者的属性。
自定义属性:
- 自定义属性是任意的由JMS应用程序定义。JMS应用程序的开发人员可以通过使用前一部分
- getBooleanProperty()/setBooleanProperty()、
- getStringProperty()/ setStringProperty()
- 所示的泛型方法,自由定义任何必需的Java属性。
例如:
TextMessage message = pubSession .createTextMessage();
message.setText(userName +":" +text );
message.setStringProperty("username" , userName );
publisher.publish(message );
JMS定义的属性
- JMS定义的属性具有和应用程序属性相同的特性,除了前者大多数在消息发送时由JMS提供者来设置之外JMS定义的属性可以作为可选的JMS消息头;对于某些另有声明的例外。各厂商可以分别选择不支持、部分支持或全部支持。下面是JMS定义的9个属性清单:
属性 | 描述 |
---|---|
JMSXAppID | 标识发送消息的应用程序 |
JMSXConsumerTXID | 使用此消息的事务的事务标识符 |
JMSXDeliveryCount | 消息传递尝试的数量 |
JMSXGroupID | 此消息是其一部分的消息组 |
JMSXGroupSeq | 组中此消息的序列号 |
JMSXProducerTXID | 生成此消息的事务的事务标识符 |
JMSXRcvTimestamp | JMS提供者向消费者传递消息的时间 |
JMSXState | 用于定义特定于提供程序的状态 |
JMSXUserID | 标识发送消息的用户 |
- 规范为使用这些属性提供的唯一建议 是JMSXGroupID 和 JMSXGroupSeq 属性,并且当按特定顺序对消息或消息分组时,客户端应该使用这些属性。
特定于提供者的属性
- 每个JMS提供者都可以定义一组私有属性,这些属性可以由客户端或提供者自动设置。
- 提供者特定的属性必须以前缀JMS开头,后面紧接着是属性名称。
- 提供者特定的属性,其作用就是支持厂商的私有特性。
既然已经讨论了消息的JMS头和属性,那么它们究竟用于什么呢?
- 在筛选订阅目的地的客户端接收的消息时,报头和属性非常重要。
消息体(Message Body)
JMS为消息体定义了六种类型载体,通过这个类型,你可以发送各种各样的数据。
类型 | 描述 |
---|---|
Message | 最基础的消息体,没有数据载体。仅仅包含了消息体和属性,一般用做简单的时间通知。 |
TextMessage | 文本消息 字符串数据载体。一般用来发送简单的文本,XML数据。 |
MapMessage | key-value键值队作为数据载体。key一般使用字符串,value可以为Java原始类型。 |
BytesMessage | 使用一个二进制数据来做数据载体。 |
StreamMessage | Java原始类型的流数据 |
ObjectMessage | 序列化后的完整Java类。通过使用与复杂Java类型。也支持集合 |
JMS消息模型 (即点对点和发布订阅模型)
- Point-to-Point(P2P)
- Publish/Subscribe(Pub/Sub)
JMS应用程序接口
接口 | 描述 |
---|---|
ConnectionFactory 接口(连接工厂) | 创建Connection对象的工厂,根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂分,别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。 |
Destination 接口(目标) | Destination是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。 |
Connection 接口(连接) | Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。 |
Session 接口(会话) | Session是我们操作消息的接口。表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。 |
MessageProducer 接口(消息的生产者) | 消息生产者由Session创建,并用于将消息发送到Destination。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。 |
MessageConsumer 接口(消息消费者) | 消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。 |
Message 接口(消息) | 是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:1、消息头(必须):包含用于识别和为消息寻找路由的操作设置。2、一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。3、一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口非常灵活,并提供了许多方式来定制消息的内容。 |
MessageListener | 消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。 |
如下图:
其他专题:
Redis:https://www.jianshu.com/nb/32287093