前言
JMS的消息确认模式,定义了客户端(消息发送者或者消费者)与broker确认消息的方式,可以认为是客户端与Broker之间建立一种简单的“担保”机制。
在java的JMS标准 中,javax.jms.Session 包定义了4种消息确认模式,分别是:
- ** AUTO_ACKNOWLEDGE: ** 自动确认
- ** CLIENT_ACKNOWLEDGE : ** 客户端手动确认
- ** DUPS_OK_ACKNOWLEDGE: ** 自动批量确认
- ** SESSION_TRANSACTED: ** 事务提交并确认
其中,前面三个确认模式是针对消费端的,即消息被消息消费者接收之后,消费者应该如何告诉broker,它已经正确接收消息和处理消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理,这时,它会从broker的队列中,删除消息。
最后一个确认模式SESSION_TRANSACTED,为消息发送者提供了消息发送的事务处理方式。也就是指,消息发送者发送消息给broker后,broker只是暂存该消息,只有当发送者给broker进行事务确认消息后,broker才把消息加入到待发送队列中,换言之,如果消息发送者进行了事务回滚,消息会直接从broker中删除。
在 activemq中,通过下面API设置消息确认模式
Session createSession(boolean transacted, int acknowledgeMode)
transacted:是否开启事务,设置为true后,acknowledgeMode的设置无效,但当transacted为false以及acknowledgeMode设置为 SESSION_TRANSACTED 时,会冲突并报异常
acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session
acknowledgeMode:即确认模式
其实,客户端与broker的通信是通过指令完成的。客户端在不同的ACK_MODE消息确认模式时,根据不同的时机发送ACK指令,每个ACK Command中会包含ACK_TYPE,那么broker端就可以根据ACK_TYPE来决定此消息的后续操作,例如消息需不需要重发,消息需不需要删除等。
ACK_TYPE如下:
- ** DELIVERED_ACK_TYPE: ** 消息"已接收",但尚未处理结束
- **STANDARD_ACK_TYPE : ** "标准"类型,通常表示为消息"处理成功",broker端可以删除消息了
- ** POSION_ACK_TYPE : ** 消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者加入到DLQ(死信队列)
- ** REDELIVERED_ACK_TYPE : ** 消息需"重发",比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
- ** INDIVIDUAL_ACK_TYPE:** 表示只确认"单条消息",无论在任何ACK_MODE下
- ** UNMATCHED_ACK_TYPE: ** BROKER间转发消息时,接收端"拒绝"消息
下面我们来详细介绍一下四种消息确认模式。
AUTO_ACKNOWLEDGE
消息自动确认,意思是消费者接收消息后,不需要显示告诉broker已经接收到消息,而是由底层代码根据消息确认模式为AUTO_ACKNOWLEDGE,自动发送消息确认ack给broker,这样broker会从队列中删除消息。
有些文章说,采用AUTO_ACKNOWLEDGE模式,即使接收者正确接收消息后处理业务时发生了异常,接收者客户端也会被当作正常发送接收信息处理,那么这样不就在broker删除信息了吗?通过实验,事实并非如此,在接收消息处理消息过程中出现异常,broker会重发消息,或者把消息加入到死信队列中,这会在后面文章中介绍。
代码清单:消费者
public class Consumer {
public static void main(String[] args) throws IOException {
try {
Connection connection = ActiveMQManager.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //
Queue queue = session.createQueue(Utils.QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("-------- ----- --------- ");
System.out.println("触发消息接收");
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
handleMsg(text);
System.out.println("成功处理消息 : " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 线程一直等待
System.in.read();
consumer.close();
session.close();
connection.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
// 处理消息
private static void handleMsg(String msg) {
System.out.println("准备处理消息 : " + msg);
try {
Thread.sleep(500); //500毫秒处理一个
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟处理的时候发生了异常
//throw new RuntimeException("未知错误");
}
}
CLIENT_ACKNOWLEDGE
由客户端(消费者)手动确认。我们在正确接收和处理消息后,需要使用message.acknowledge(),确认消息,否则,消息不会被broker删除。当我们重启消费者后,会重新取这些未确认的消息。
由于代码与上面雷同,所以这里只列出相关代码
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("-------- ----- --------- ");
System.out.println("触发消息接收");
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
handleMsg(text);
message.acknowledge(); //确认消息,broker删除消息
System.out.println("成功处理消息 : " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
DUPS_OK_ACKNOWLEDGE
没研究,暂时不讨论。
SESSION_TRANSACTED
带事务的会话。在前面已经解释很清楚了,即消息发送者发送消息后,需要提交事务,否则消息不进入broker待发送队列中。
相关代码:
public void sendMessage(String message) throws JMSException {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); //开启事务
Queue queue = session.createQueue(Utils.QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
TextMessage toMessage = session.createTextMessage(message);
producer.send(toMessage);
session.commit(); //如果不提交事务,消息不会进入broker队列
producer.close();
session.close();
}