RabbitMQ消息持久化和消息确认机制

Stella981
• 阅读 927
  • 消息持久化

消息在传输过程中,可能会出现各种异常失败甚至宕机情况,为了保证消息传输的可靠性,需要进行持久化,也就是在数据写在磁盘上。消息队列持久化包括三部分:

1.Message持久化,也就是发送时消息持久化。(Message包含body,body为我们需要发送的消息具体内容,一般以json字符串发送,消费端再解析;MessageProperties为Message的一些额外的属性,做一些扩展作用)

Message message = MessageBuilder.withBody("我们发送的消息内容存放在message的body里面".getBytes()).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

2.队列持久化

    @Bean
    public Queue helloQueue() {
        return new Queue("hello",true);
    }

第一个参数为队列名,第二个参数为durable,是否持久化。Queue的其他属性

a) Exclusive属性:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道channel是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。

b)   Auto-delete属性:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

3.交换机持久化

    @Bean
    public DirectExchange helloExchange(){
        return new DirectExchange("helloexchange",true,false);
    }

第一个参数为交换机名,第二个参数为durable,是否持久化。第二个参数为autoDelete,当交换机没有绑定队列时会自动删除交换机。

  • 消息确认机制

在保证了消息队列持久化的基础上来做消息确认机制,在两个地方确认。

a) 确认发送(生产者)

有两种方式,只能使用其中一种。

1.transaction 事务

//原生客户端
try {
    channel.txSelect();
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    int result = 1 / 0;
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}

//springRabbitMQ客户端采用confirm机制就不能使用事物。两者只能用一个,rabbittemplate开启事务
 rabbittemplate.setChannelTransacted(true);

    使用事务的方式确保消息送达,只有消息成功被rabbiMQ的broker接收后,事务提交才能成功,异常中进行事务回滚操作,其实就是同步的,事务会影响性能,可选择confirm。

2.confirm

confirm是异步的,直接发送不需要同步等待,会触发confirm方法,ack为true时消息发送成功,false发送失败可以做一些处理,先要设置为confirm模式。

  @Bean  
  public ConnectionFactory connectionFactory() {  
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
      connectionFactory.setAddresses("127.0.0.1:5672");
      connectionFactory.setUsername("develop");
      connectionFactory.setPassword("123456");
      connectionFactory.setVirtualHost("dev"); 
      connectionFactory.setPublisherConfirms(true); //必须要设置  
      return connectionFactory;  
  }  

//设置ConfirmCallback
rabbittemplate.setConfirmCallback(new MyConfirmCallBack());


import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;

import cn.zkz.takeaway.service.amqp.conf.MessageCache;

public class MyConfirmCallBack implements ConfirmCallback {

    /**
     * 目前调用情况 
     * 1.echange存在时,routeKey路由无论是否存在,ack为true 
     * 2.echange不存在时,ack为false。
     * 3.当exchange为null或者空字符串时,ack为true 
     * 网络中断会导致无confirm
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息id
        String msgId = correlationData.getId();
        if (ack) {// 发送成功
            //LOGGER.info("confirm:消息id:[{}] 投递成功" + msgId);
            MessageCache.deleteSend(msgId);
        } else { // 发送失败
            //发送失败后 重试 cause 失败原因
            
        }
    }
    

}

b) 确认消费(消费者)

直接写在了代码注释里面,就不多写了

    @Bean  
    public SimpleMessageListenerContainer userMessageContainer(Queue helloQueue,ConnectionFactory connectionFactory) {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);  
        container.setQueues(helloQueue);  
        //需要将channel暴露给listener才能手动确认,AcknowledgeMode.MANUAL时必须为ture
        container.setExposeListenerChannel(true);  
        //并发消费设置最大消费者数量,并发消费的时候需要设置,且>=concurrentConsumers
        container.setMaxConcurrentConsumers(3);  
        //设置默认当前消费者数量
        container.setConcurrentConsumers(1);  
        //设置确认模式手工确认  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
        //设置listener
        container.setMessageListener(new ChannelAwareMessageListener() {
            
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                
                try {
                    //业务处理
                } catch (Exception e) {
                    // TODO: handle exception
                } finally {
                    //可以根据业务需求调用三个当中的其中一个,如果未调用,消息一直时UnAck状态,unack数量等于PrefetchCount时,不会收到消息
                    //Queue会一直保存,多个消费者存在时,不会发给其他消费者。
                    
                    //确认收到,第二个参数是否为批量回复
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                    //Nack,第二个参数是否为批量回复,第三个参数值该消息是否回到队列
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                    //拒绝,第二个参数值该消息是否回到队列,只能一个一个地拒绝
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        });  
        //会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,
        //即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
        container.setPrefetchCount(5);
        return container;  
    }
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Stella981 Stella981
3年前
RabbitMQ如何通过持久化保证消息99.99%不丢失?
1\.本篇概要要解决该问题,就要用到RabbitMQ中持久化的概念,所谓持久化,就是RabbitMQ会将内存中的数据(Exchange交换器,Queue队列,Message消息)固化到磁盘,以防异常情况发生时,数据丢失。其中,RabblitMQ的持久化分为三个部分:1.交换器(Exchange
Stella981 Stella981
3年前
MQ对比之RabbitMQ & Redis
消息队列选择:RabbitMQ&RedisRabbitMQRabbitMQ是一个由erlang开发的AMQP(AdvancedMessageQueue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message
Stella981 Stella981
3年前
RabbitMQ存储和队列结构
本文讲解RabbitMQ的存储,主要有以下内容:1.存储原理2.队列结构3.惰性队列存储原理首先确认一个点,持久化和非持久化的消息都会落地磁盘,区别在于持久化的消息一定会写入磁盘(并且如果可以在内存中也会有一份),而非持久化的消息只有在内存吃紧的时候落地磁盘。两种类型消息的落盘都是在Rabb
Stella981 Stella981
3年前
RabbitMQ系列三 (深入消息队列)
消息持久化是RabbitMQ最为人津津乐道的特性之一,RabbitMQ能够在付出最小的性能代价的基础上实现消息的持久化,最大的奥秘就在于RabbitMQ多层消息队列的设计上。下面,本文就从MessageQueue的设计和消息在MessageQueue的生命周期两个方面全面介绍 RabbitMQ的消息队列。RabbitMQ完全实现
Stella981 Stella981
3年前
RabbitMQ学习:RabbitMQ的基本概念及RabbitMQ使用场景(二)
1、RabbitMQ的基本概念RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统
Stella981 Stella981
3年前
RabbitMQ学习:安装RabbitMQ及RabbitMQ的初步配置(一)
RabbitMQ基础含义RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
Wesley13 Wesley13
3年前
JMS消息的概念解释
1、默认生产者消息是持久的:会存数据库\消费者的持久:createDurableSubscriber是指消费者能收到所有它订阅时间点之后的消息,即使消费者注册后关闭,当它重启就能收到注册时间点之后所有的消息;即当此消费用户ID(AAA)在producer发送之前就已经注册,那么此id能收到producer发送的所有消息,如果是在produce
Stella981 Stella981
3年前
RabbitMQ数据丢失分析
RabbitMQ数据丢失分析简要系统流程场景图!简要系统流程场景图(http://wx1.sinaimg.cn/mw690/9e2b10fagy1fst744xqz0j20jf03a746.jpg)数据丢失场景以下场景分析前提是队列持久化,交换器持久化,消息持久化,非持久化场
融云IM即时通讯 融云IM即时通讯
1个月前
融云IM干货丨IM服务消息推送,客户端版本更新后,如何确保消息不丢失?
确保客户端版本更新后消息不丢失,可以采取以下几种策略:消息持久化:确保消息被存储在可靠的存储介质中,如数据库或磁盘,这样即使客户端或服务端发生故障,消息也不会丢失。对于RabbitMQ等消息队列,需要开启持久化机制,将消息持久化到硬盘上,即使服务重启也能从