RabbitMq + Spring 实现ACK机制

Wesley13
• 阅读 742

概念性解读(Ack的灵活)

         首先啊,有的人不是太理解这个Ack是什么,讲的接地气一点,其实就是一个通知,怎么说呢,当我监听消费者,正常情况下,不会出异常,但是如果是出现了异常,甚至是没有获取的异常,那是不是这条数据就会作废,但是我们肯定不希望这样的情况出现,我们想要的是,如果在出现异常的时候,我们识别到,如果确实是一个不良异常,肯定希望数据重新返回队列中,再次执行我们的业务逻辑代码,此时我就需要一个Ack的通知,告诉队列服务,我是否已经成功处理了这条数据,而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要Ack不自动,而是手动,这样做的好处,就是使得我们开发人员更加人性化或者灵活的来处理我们的业务罗杰代码,更加方便的处理异常的问题以及数据的返回处理等。下面是通话机制的四条原则:

  • Basic.Ack 发回给 RabbitMQ 以告知,可以将相应 message 从 RabbitMQ 的消息缓存中移除。
  • Basic.Ack 未被 consumer 发回给 RabbitMQ 前出现了异常,RabbitMQ 发现与该 consumer 对应的连接被断开,之后将该 message 以轮询方式发送给其他 consumer (假设存在多个 consumer 订阅同一个 queue)。
  • 在 no_ack=true 的情况下,RabbitMQ 认为 message 一旦被 deliver 出去了,就已被确认了,所以会立即将缓存中的 message 删除。所以在 consumer 异常时会导致消息丢失。
  • 来自 consumer 侧的 Basic.Ack 与 发送给 Producer 侧的 Basic.Ack 没有直接关系。

正题部分(配置手动Ack,实现异常消息回滚)

A. 在消费者端的mq配置文件上添加,配置  关键代码为 acknowledeg = "manual",意为表示该消费者的ack方式为手动(此时的queue已经和生产者的exchange通过某个routeKey绑定了)

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    <rabbit:listener queues="queue_xxx" ref="MqConsumer"/>
    <rabbit:listener queues="queue_xxx" ref="MqConsumer2"/>
</rabbit:listener-container>

B. 新建一个类 MqConsumer ,并实现接口  ChannelAwareMessageListener ,实现onMessage方法,不需要指定方法。

springAMQP中已经实现了一个功能,如果该监听器已经实现了下面2个接口,则直接调用onMessage方法

C. 关键点在实现了ChannelAwareMessageListener的onMessage方法后,会有2个参数。

一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

其中deliveryTag是tag的id,由生产者生成。第二个参数我其实也没理解用途,暂时还没有模拟出场景,所以先不讨论。

同样的,如果要Nack或者拒绝消息(reject)的时候,也是调用channel里面的basicXXX方法就可以了(当然要制定tagId)。注意如果抛异常或Nack(并且requeue为true),消息会一直重新入队列,一不小心就会重复一大堆消息不断出现~。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息

D. 针对上面所描述的情况,我们在搭建一个消息队列时候,我们的思路应该是这样的,首先,我们要启动ack的手动方式,紧接着,我们处理代码逻辑,如果发生了异常信息,我们首先通知到ack,我已经表示接受到这条数据了,你可以进行删除了,不需要让他自动的重新进入队列中,然后,我们启用一个错误处理,手动将其重新插入队列中,在此之前,有几个类和Api一起来看一下。

    1. SimpleMessageListenerContainer

    这个是我们的基础监听,他的作用就是队列的总监听,可以为其配置ack模式,异常处理类等。。

    2. org.springframework.amqp.support.converter.SimpleMessageConverter

    这个类和下面的Converter类很容易搞混淆,这个类的作用是可以解析队列中的 message 转 obj

    3. org.springframework.amqp.rabbit.retry.MessageRecoverer

    这个接口,需要我们开发者自定义实现,其中的一个方法recover(Message message, Throwable cause),就可以看出来他是干嘛的,就是说在监听出错,也就是没有抓取的异常而是抛出的异常会触发该方法,我们就会在这个接口的实现中,将消息重新入队列

    4. org.springframework.util.ErrorHandler

    这个接口也是在出现异常时候,会触发他的方法

E.  完整实例

    1. spring配置队列xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
    http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    <!-- 连接服务配置 -->
    <rabbit:connection-factory id="connectionFactory"
        host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
        password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" />
    
    <!-- 设置Ack模式为手动 -->    
    <bean id="ackManual"
        class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
        <property name="staticField"
            value="org.springframework.amqp.core.AcknowledgeMode.MANUAL" />
    </bean>

    <!-- 异常处理,记录异常信息 --> 
    <bean id="mqErrorHandler" class="com.zefun.wechat.utils.MQErrorHandler"/>
    <!-- 将类自动注入,可解析msg信息 -->    
    <bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />

    <!-- 创建rabbitAdmin 代理类 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
    <rabbit:admin connection-factory="connectionFactory" />
    
    <!-- 创建SimpleMessageListenerContainer的理想通道,主要实现异常事件处理逻辑 -->
    <bean id="retryOperationsInterceptorFactoryBean"
        class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
        <property name="messageRecoverer">
            <bean class="com.zefun.wechat.utils.MQRepublishMessageRecoverer"/>
        </property>
        <property name="retryOperations">
            <bean class="org.springframework.retry.support.RetryTemplate">
                <property name="backOffPolicy">
                    <bean
                        class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                        <property name="initialInterval" value="500" />
                        <property name="multiplier" value="10.0" />
                        <property name="maxInterval" value="10000" />
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
    
    <!-- 定义队列,在下面的交换机中引用次队列,实现绑定 -->     
    <rabbit:queue id="queue_system_error_logger_jmail" name="${rabbitmq.system.out.log.error.mail}" durable="true"
        auto-delete="false" exclusive="false" />
                         
    <!--路由设置 将队列绑定,属于direct类型 -->
    <rabbit:direct-exchange id="directExchange"
        name="directExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queue_system_error_logger_jmail" key="${rabbitmq.system.out.log.error.mail}" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
    
    <!-- logger 日志发送功能 -->
    <bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="acknowledgeMode" ref="ackManual" />
        <property name="queueNames" value="${rabbitmq.system.out.log.error.mail}" />
        <property name="messageListener">
            <bean class="com.zefun.wechat.listener.SystemOutLogErrorMessageNoitce" />
        </property>
        <property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers}" />
        <property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
        <property name="errorHandler" ref="mqErrorHandler" />
    </bean>    
</beans>

    2. MessageRecoverer 配置,将小心重新入队列

package com.zefun.wechat.utils;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;

public class MQRepublishMessageRecoverer implements MessageRecoverer {
    
    private static final Logger logger = Logger.getLogger(MQRepublishMessageRecoverer.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private MessageConverter msgConverter;

    @Override
    public void recover(Message message, Throwable cause) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        headers.put("x-exception-stacktrace", getStackTraceAsString(cause));
        headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
        headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
        headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
        this.rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message);
        logger.error("handler msg (" + msgConverter.fromMessage(message) + ") err, republish to mq.", cause);
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }
}

    3. MQErrorHandler 写法,在出现异常时,记录异常

package com.zefun.wechat.utils;

import java.lang.reflect.Field;
import java.util.Date;

import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ErrorHandler;

import com.zefun.wechat.service.RedisService;

public class MQErrorHandler implements ErrorHandler {

    private static final Logger logger = Logger.getLogger(MQErrorHandler.class);

    @Autowired
    private RedisService redisService;
    @Autowired
    private MessageConverter msgConverter;

    @Override
    public void handleError(Throwable cause) {
        Field mqMsgField = FieldUtils.getField(MQListenerExecutionFailedException.class, "mqMsg", true);
        if (mqMsgField != null) {
            try {
                Message mqMsg = (Message) mqMsgField.get(cause);
                Object msgObj = msgConverter.fromMessage(mqMsg);
                logger.error("handle MQ msg: " + msgObj + " failed, record it to redis.", cause);
                redisService.zadd(App.MsgErr.MQ_MSG_ERR_RECORD_KEY, new Double(new Date().getTime()), msgObj.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            logger.error("An error occurred.", cause);
        }
    }

}

    4. SystemOutLogErrorMessageNoitce 实现 ChannelAwareMessageListener接口,处理邮件服务

package com.zefun.wechat.listener;

import javax.mail.internet.MimeMessage;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.mail.javamail.MimeMessageHelper;

import com.rabbitmq.client.Channel;
import com.zefun.wechat.utils.App;
import net.sf.json.JSONObject;

public class SystemOutLogErrorMessageNoitce  implements ChannelAwareMessageListener {

    private static final Logger logger = Logger.getLogger(MemberWechatMessageTextNoitce.class);
    @Autowired
    private MessageConverter msgConverter;
    /** logger b */
    @Autowired
    private JavaMailSenderImpl senderImpl;
    
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        Object obj = null;
        try {
            obj = msgConverter.fromMessage(message);
        } catch (MessageConversionException e) {
            logger.error("convert MQ message error.", e);
        } finally {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            if (deliveryTag != App.DELIVERIED_TAG) {
                channel.basicAck(deliveryTag, false);
                message.getMessageProperties().setDeliveryTag(App.DELIVERIED_TAG);
                logger.info("revice and ack msg: " + (obj == null ? message : new String((byte[]) obj)));
            }
        }
        if (obj == null) {
            return;
        }
        JSONObject map = JSONObject.fromObject(obj);
        sendMailSystemLoggerError(map.getString("date"), map.getString("subject"), map.getString("domain"), map.getString("requestURL"), map.getString("message"));
    }
    
    /**
     * jmail logger 
    * @author 小高
    * @date 2016年10月25日 下午3:24:46
    * @param date          日期
    * @param subject       主题账户
    * @param domain        域名环境
    * @param message       logger日志
    * @param requestURL    请求路径
    * @throws Exception    异常信息
     */
    public void sendMailSystemLoggerError(String date, String subject, String domain, String requestURL, String message) throws Exception{
        MimeMessage mailMessage = this.senderImpl.createMimeMessage();
        MimeMessageHelper messageHelper = new MimeMessageHelper(mailMessage, true);
        messageHelper.setTo("1043851832@qq.com");
        messageHelper.setFrom("18734911338@163.com");
        messageHelper.setSubject(date + " 系统异常");
        String msg = "<p>异常时间:" + date + "</p><p>门店企业:" + subject + "</p>"
                    + "<p>部署环境:" + domain + "</p><p>异常连接:" + requestURL + "</p>"
                    + "<p>异常内容:</p>" + message;
        messageHelper.setText("<html><head></head><body>" + msg + "</body></html>", true);
        senderImpl.send(mailMessage);
        logger.info("jmail push message success");
    }

}

E. rabbitMq中文文档,方便查阅API http://rabbitmq.mr-ping.com/AMQP/amqp-0-9-1-quickref.html

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这