理论上来说,我们需要保证消息中间件上的消息,只有被消费者确认过以后才会被签收,相当于我们寄一个快递出去,收件人没有收到快递,就认为这个包裹还是属于待签收状态,这样才能保证包裹能够安全达到收件人手里。
消息中间件也是一样。消息的消费通常包含 3 个阶段:客户接收消息、客户处理消息、消息被确认
首先,来简单了解 JMS 的事务性会话和非事务性会话的概念JMS Session 接口提供了 commit 和 rollback 方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。 事务性的会话总是牵涉到事务处理中,commit 或 rollback 方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务
1.在事务型会话中
在事务状态下进行发送操作,消息并未真正投递到中间件,而只有进行 session.commit 操作之后,消息才会发送到中间件,再转发到适当的消费者进行处理。如果是调用rollback 操作,则表明,当前事务期间内所发送的消息都取消掉。通过在创建 session 的时候使用 true or false 来决定当前的会话是事务性还是非事务性connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
在事务性会话中,消息的确认是自动进行,也就是通过session.commit()以后,消息会自动确认。
2.在非事务型会话中
消息何时被确认取决于创建会话时的应答模式(acknowledgement mode). 有三个可选项
Session.AUTO_ACKNOWLEDGE
当客户成功的从 receive 方法返回的时候,或者从MessageListenner.onMessage 方法成功返回的时候,会话自动确认客户收到消息。Session.CLIENT_ACKNOWLEDGE
客户通过调用消息的 acknowledge 方法确认消息。CLIENT_ACKNOWLEDGE 特性
在这种模式中,确认是在会话层上进行,确认一个被消费的消息将自动确认所有已被会话消费的消息。列如,如果一个消息消费者消费了 10 个消息,然后确认了第 5 个消息,那么 0~5 的消息都会被确认 .
演示如下:发送端发送 10 个消息,接收端接收 10 个消息,但是在 i==5 的时候,调用 message.acknowledge()进行确认,会发现 0~4 的消息都会被确认Session.DUPS_ACKNOWLEDGE
消息延迟确认。指定消息提供者在消息接收者没有确认发送时重新发送消息,这种模式不在乎接受者收到重复的消息。
3.消息的持久化存储
消息的持久化存储也是保证可靠性最重要的机制之一,也就是消息发送到 Broker 上以后,如果 broker 出现故障宕机了,那么存储在 broker 上的消息不应该丢失。可以通过下面的代码来设置消息发送端的持久化和非持久化特性
MessageProducer producer=session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
对于非持久的消息,JMS provider 不会将它存到文件/数据库等稳定的存储介质中。也就是说非持久消息驻留在内存中,如果 jms provider 宕机,那么内存中的非持久消息会丢失
对于持久消息,消息提供者会使用存储-转发机制,先将消息存储到稳定介质中,等消息发送成功后再删除。如果 jms provider 挂掉了,那么这些未送达的消息不会丢失;jms provider 恢复正常后,会重新读取这些消息,并传送给对应的消费者。
4.持久化消息和非持久化消息的存储原理
正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的。能够存储的最大消息数据在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点SystemUsage配置设置了一些系统内存和硬盘容量
<systemUsage>
<systemUsage>
<memoryUsage>
//该子标记设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过ActiveMQ本身设置的最大内存大小。其中的
percentOfJvmHeap属性表示百分比。占用70%的堆内存
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
//该标记设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。该子标记的limit属性必须要进行设置
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
//一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,非持久化消息就会被转储到 temp store区域,虽然
我们说过非持久化消息不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时非持久化消息大量堆积致使内存耗
尽的情况出现,还是会将非持久化消息写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp
store区域的“可用磁盘空间限制”
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
从上面的配置我们需要get到一个结论,当非持久化消息堆积到一定程度的时候,也就是内存超过指定的设置阀值时,ActiveMQ会将内存中的非持久化消息写入到临时文件
,以便腾出内存。但是它和持久化消息的区别是,重启之后,持久化消息会从文件中恢复,非持久化的临时文件会直接删除
5.消息的持久化策略分析
消息持久性对于可靠消息传递来说是一种比较好的方法,即时发送者和接受者不是同时在线或者消息中心在发送者,发送消息后宕机了,在消息中心重启后仍然可以将消息发送出去。消息持久性的原理很简单,就是在发送消息出去后,消息中心首先将消息存储在本地文件、内存或者远程数据库,然后把消息发送给接受者,发送成功后再把消息从存储中删除,失败则继续尝试。接下来我们来了解一下消息在broker上的持久化存储实现方式
(1) 持久化存储支持类型
这里可以参考最新文档的http://activemq.apache.org/features.html的Persistence的章节
ActiveMQ支持多种不同的持久化方式,主要有以下几种,不过,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
- KahaDB存储(默认存储方式)
- JDBC存储
- Memory存储
- LevelDB存储
- JDBC With ActiveMQ Journal
(2) KahaDB存储
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃
- KahaDB的配置方式
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
- KahaDB的存储原理
在data/kahadb这个目录下,会生成四个文件, db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-.log里面存储的消息
db.redo 用来进行消息恢复
db-.log 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序入,因此消息存储是比较快的。默认是32M,达到阀值会自动递增
lock文件 锁,表示当前获得kahadb读写权限的broker
(3)JDBC存储
使用JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。
ACTIVEMQ_MSGS 消息表,queue和topic都存在这个表中
ACTIVEMQ_ACKS 存储持久订阅的信息和最后一个持久订阅接收的消息ID
ACTIVEMQ_LOCKS 锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库
- JDBC存储实践
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="# MySQL-DS " createTablesOnStartup="true" />
</persistenceAdapter>
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false
- Mysql持久化Bean配置
<bean id="Mysql-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.11.156:3306/activemq?
relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
</bean>
-
添加Jar包依赖
(4)LevelDB存储
LevelDB持久化性能高于KahaDB,虽然目前默认的持久化方式仍然是KahaDB。并且,在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。
不过,据ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB
<persistenceAdapter>
<levelDBdirectory="activemq-data"/>
</persistenceAdapter>
(5)Memory 消息存储
基于内存的消息存储,内存消息存储主要是存储所有的持久化的消息在内存中。persistent=”false”,表示不设置持久化存储,直接存储到内存中
<beans>
<broker brokerName="test-broker" persistent="false"xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors>
</broker>
</bean
(6)JDBC Message store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。
ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。举个例子,生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的消息,那么这个时候只需要同步剩余的10%的消息到DB。
如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
在服务端循环发送消息。可以看到数据是延迟同步到数据
配置如下
<persistenceFactory>
<journalPersistenceAdapterFactory dataSource="#Mysql-DS" dataDirectory="activemq-data"/>
</persistenceFactory