RabbitMq学习(二)RabbitMQ的消息确认机制

Stella981
• 阅读 492

一. 为什么有消息确认机制

在RabbitMq中,一个消息从产生到最终的消息接受,中间大致会有三个环节,首先是消息到达交换机、然后是消息通过交换机到达队列,最后消费者消费绑定的队列消息。

 但是在这个过程中,如果出现网络或者系统的异常,就会导致消息不能被正常消费。如果不能正常消费消息,会造成两方面的问题。

1.1 在服务端

消息到达队列,但是没有消费者去消费,就会造成消息积压,被积压的消息会存入缓存,直到有消费者进行消费。如果一直没有消费者进行消费,那么就会直接将内存占满,影响服务器性能。

1.2 消费端

一个消息一旦被消费后,那么就会从队列删除。如果说消息已经到达消费者,但是消费者处理消息之前系统出现了异常,那么就相当于这条消息丢失了,是个很大的问题。

所以RabbitMq才会出现消息确认机制。对应的也是服务端客客户端两个方面解决

二、 怎么使用消息确认机制

2.1 消息发送确认

发送的确认也是分为两个步骤:到交换机的确认 ConfirmCallback 和到队列的确认 ReturnCallback

       这些确认机制默认都是不开启的,在SpringBoot 项目中,我们可以在配置文件中开启:

spring.rabbitmq.publisher-confirms = true

spring.rabbitmq.publisher-returns = true

或者 在配置连接工厂的时候开启:

@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        //开启到交换机的确认
        connectionFactory.setPublisherConfirms(true);
        //开启到队列的确认
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

在代码中实现 RabbitTemplate.ConfirmCallback 接口,如果消息被交换机正常接受,就会回调confirm 方法,参数的含义通过代码可以知晓。

实现 RabbitTemplate.ReturnCallback 接口,如果消息不能被发送到队列,就会调用ReturnedMessage 方法。

注意:一个是接收成功调用,一个是接收失败调用

@Component
public class ASender implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回调id:" + correlationData);
        if (ack) {
            logger.info("消息成功消费");
        } else {
            logger.info("消息消费失败:" + cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode,
                                String replyText, String exchange, String routingKey) {
        logger.info("消息内容:{}", new String(message.getBody()));
        logger.info("回复文本:{},回复代码:{}", replyText, replyCode);
        logger.info("交换器名称:{},路由键:{}", exchange, routingKey);
    }

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(EXCHANGE_C, "aa.apple.big", content, correlationId);

    }
}

2.2 消息接受确认

 消费端消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK

  ACK 确认模式分为三种:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

默认是自动确认,开启手动确认的方式也是两种方式:

配置文件配置:

spring:  rabbitmq:    listener:          simple:            acknowledge-mode: manual另一种是在RabbitListenerContainerFactory配置:

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //开启手动 ack
    return factory;
}

在客户端接受消息:

@RabbitHandler
public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    System.out.println(message);
    try {
        channel.basicAck(tag, false);            // 确认消息
        logger.info("消费者成功确认" + message);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

确认 basicAck 参数解释:
  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
  • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

部分参考: https://www.jianshu.com/p/2c5eebfd0e95

Demo:      https://github.com/zhuanzhiBUG/springboot-rabbitmq.git

点赞
收藏
评论区
推荐文章
Stella981 Stella981
3年前
MQ对比之RabbitMQ & Redis
消息队列选择:RabbitMQ&RedisRabbitMQRabbitMQ是一个由erlang开发的AMQP(AdvancedMessageQueue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message
Stella981 Stella981
3年前
Spring Boot(七):RabbitMQ 详解
一、RabbitMQ简介RabbitMQ即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的
Wesley13 Wesley13
3年前
3.rabbitmq
rabbitmq发布订阅模式模型组成一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送
Stella981 Stella981
3年前
RabbitMQ 消息中间件搭建详解
1.RabbitMQ简介消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包
Stella981 Stella981
3年前
RabbitMq学习笔记——概念
1、RabbitMQ简介  MQ全称为MessageQueue(消息队列),是一种“应用程序”<—“应用程序”的通信方法。MQ是一个典型的“消费”<—“生产者”模型的代表,生成者往消息队列中写入消息,消费者从消息队列中读取消息。2、MQ的应用场景  对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者
Stella981 Stella981
3年前
RabbitMQ如何保证队列里的消息99.99%被消费?
1\.本篇概要其实,还有1种场景需要考虑:当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了?,比如用户下单,订单中心发送了1个消息到RabbitMQ里的队列,积分中心收到这个消息,准备给这个下单的用户增加20积分,但积分还没增加成功呢,积分中心自己挂掉了,导致数据出现问题。那么如何解
Stella981 Stella981
3年前
RabbitMQ小技巧
导读在使用RabbitMQ消息中间件时,因为消息的投递是异步的,默认情况下,RabbitMQ会删除那些无法路由的消息。为了能够检出消息是否顺利投递到队列,我们需要相应的处理机制。今天就来验证一下相关的验证机制。!RabbitMQ小技巧确定消息投递情况RabbitMQ小技巧确定消息投递情况(https://imgblog.csdnim
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Stella981 Stella981
3年前
RabbitMQ学习:RabbitMQ的基本概念及RabbitMQ使用场景(二)
1、RabbitMQ的基本概念RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统
Stella981 Stella981
3年前
RabbitMQ——队列消息数
背景在实际使用过程,会遇到这么些情况:生产者发送的消息数量与消费者接收的消息数量不一致。例如生产者向rabbitmq投递了100条消息,消费者只从队列中接收到了80条消息,并且当前队列中已经没有任何消息。要定位这个问题,通常是分段来定位,一方面统计生产者到底发送了多少消息,一方面统计有多少消息是正确路由到