JMS API中约定了Client端可以使用四种ACK_MODE,在javax.jms.Session接口中:
- AUTO_ACKNOWLEDGE = 1 自动确认
- CLIENT_ACKNOWLEDGE = 2 客户端手动确认
- DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
- SESSION_TRANSACTED = 0 事务提交并确认
- INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认(AcitveMQ补充的一个自定义的ACK_MODE,只有ActiveMQ支持)
ACK_MODE描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了。ACK在consumer与Broker之间建立一种简单的“担保”机制。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
我们需要在创建Session时指定ACK_MODE,由此可见,ACK_MODE将是session共享的,意味着一个session下所有的 consumer都使用同一种ACK_MODE。
但如果此session为事务类型,用户指定的ACK_MODE将被忽略,而强制使用"SESSION_TRANSACTED"类型。如果session非事务类型时,也将不能将ACK_MODE设定为"SESSION_TRANSACTED"。
Client端指定了ACK_MODE,但是在Client与broker在交换ACK指令的时候,还需要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,比如Consumer消费消息时出现异常,就需要向broker发送ACK指令,ACK_TYPE为"REDELIVERED_ACK_TYPE",那么broker就会重新发送此消息。在JMS API中并没有定义ACT_TYPE,因为它通常是一种内部机制,并不会面向开发者。ActiveMQ中定义了如下几种ACK_TYPE(参看MessageAck类):
- DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未处理结束
- STANDARD_ACK_TYPE = 2 "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
- POSION_ACK_TYPE = 1 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
- REDELIVERED_ACK_TYPE = 3 消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
- INDIVIDUAL_ACK_TYPE = 4 表示只确认"单条消息",无论在任何ACK_MODE下
- UNMATCHED_ACK_TYPE = 5 BROKER间转发消息时,接收端"拒绝"消息
ACK指令中不同的ACK_MODE,将意味着在不同的时机发送ACK指令,每个ACK指令中又会包含不同的ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作。
ACK_MODE:
AUTO_ACKNOWLEDGE : 自动确认,这就意味着消息的确认时机将由consumer择机确认。对于consumer而言,optimizeAcknowledge(优化应答)属性只会在AUTO_ACK模式下有效。AUTO_ACK又分为两种情况:
在同步(receive)情况下,单条消息将立即确认,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,可能造成消息的丢失。
在异步(messageListener)情况下,将会调用listener.onMessage(message)
,此后再ACK。如果onMessage方法异常且没有被catch,将导致client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令,此消息会被redelivery。如果重发次数到达阈值,此消息将会进入DLQ。
public class Listener implements MessageListener {
public void onMessage(Message message) {
int i = 8/0;//会导致redelivery
try {
if(message instanceof ActiveMQTextMessage){
ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
System.out.println("收到的消息:" + textMessage.getText()); }
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Listener implements MessageListener {
public void onMessage(Message message) {
try {
int i = 8/0;//不会导致redelivery
if(message instanceof ActiveMQTextMessage){
ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
System.out.println("收到的消息:" + textMessage.getText());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息。
CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会自动ACK任何消息。如果一个conmuser在消费结束前没有调用message.acknowledge()确认一个消息,之后调用其他conmuser时会再次消费它,因为对于broker而言,那些尚未真正ACK的消息被视为未消费,直到它被确认。
DUPS_ACKNOWLEGE:也是一种潜在的AUTO_ACK,在确认消息的条数和时间上有所不同。
SESSION_TRANSACTED : 当session使用事务时,就是使用此模式。
在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。commit()方法将会导致当前session的事务中所有消息立即被确认。 当session.commit()异常时,会自动调用inner-rollback回滚事务(也可以捕捉到异常,手动调用session.rollback()回滚事务),也可以在事务开始之后的任何时候手动调用session.rollback()。rollback意味着当前事务的结束,事务中所有的消息都将被重发。