一. 为什么有消息确认机制
在RabbitMq中,一个消息从产生到最终的消息接受,中间大致会有三个环节,首先是消息到达交换机、然后是消息通过交换机到达队列,最后消费者消费绑定的队列消息。
但是在这个过程中,如果出现网络或者系统的异常,就会导致消息不能被正常消费。如果不能正常消费消息,会造成两方面的问题。
1.1 在服务端
消息到达队列,但是没有消费者去消费,就会造成消息积压,被积压的消息会存入缓存,直到有消费者进行消费。如果一直没有消费者进行消费,那么就会直接将内存占满,影响服务器性能。
1.2 消费端
一个消息一旦被消费后,那么就会从队列删除。如果说消息已经到达消费者,但是消费者处理消息之前系统出现了异常,那么就相当于这条消息丢失了,是个很大的问题。
所以RabbitMq才会出现消息确认机制。对应的也是服务端客客户端两个方面解决
二、 怎么使用消息确认机制
2.1 消息发送确认
发送的确认也是分为两个步骤:到交换机的确认 ConfirmCallback 和到队列的确认 ReturnCallback
这些确认机制默认都是不开启的,在SpringBoot 项目中,我们可以在配置文件中开启:
spring.rabbitmq.publisher-confirms = true
spring.rabbitmq.publisher-returns = true
或者 在配置连接工厂的时候开启:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
//开启到交换机的确认
connectionFactory.setPublisherConfirms(true);
//开启到队列的确认
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
在代码中实现 RabbitTemplate.ConfirmCallback 接口,如果消息被交换机正常接受,就会回调confirm 方法,参数的含义通过代码可以知晓。
实现 RabbitTemplate.ReturnCallback 接口,如果消息不能被发送到队列,就会调用ReturnedMessage 方法。
注意:一个是接收成功调用,一个是接收失败调用
@Component
public class ASender implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info(" 回调id:" + correlationData);
if (ack) {
logger.info("消息成功消费");
} else {
logger.info("消息消费失败:" + cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
logger.info("消息内容:{}", new String(message.getBody()));
logger.info("回复文本:{},回复代码:{}", replyText, replyCode);
logger.info("交换器名称:{},路由键:{}", exchange, routingKey);
}
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(EXCHANGE_C, "aa.apple.big", content, correlationId);
}
}
2.2 消息接受确认
消费端消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK
ACK 确认模式分为三种:
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认
默认是自动确认,开启手动确认的方式也是两种方式:
配置文件配置:
spring: rabbitmq: listener: simple: acknowledge-mode: manual另一种是在RabbitListenerContainerFactory配置:
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
return factory;
}
在客户端接受消息:
@RabbitHandler
public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
System.out.println(message);
try {
channel.basicAck(tag, false); // 确认消息
logger.info("消费者成功确认" + message);
} catch (IOException e) {
e.printStackTrace();
}
}
确认 basicAck 参数解释:
- deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
- multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
部分参考: https://www.jianshu.com/p/2c5eebfd0e95
Demo: https://github.com/zhuanzhiBUG/springboot-rabbitmq.git