Introduction
有很多人问过我这么一类问题:**RabbitMQ如何确保消息可靠?**很多时候,笔者的回答都是:说来话长的事情何来长话短说。的确,要确保消息可靠不只是单单几句就能够叙述明白的,包括Kafka也是如此。可靠并不是一个绝对的概念,曾经有人也留言说过类似全部磁盘损毁也会导致消息丢失,笔者戏答:还有机房被炸了也会导致消息丢失。可靠性是一个相对的概念,在条件合理的范围内系统所能确保的多少个9的可靠性。一切尽可能的趋于完美而无法企及于完美。 我们可以尽可能的确保RabbitMQ的消息可靠。在详细论述RabbitMQ的消息可靠性之前,我们先来回顾下消息在RabbitMQ中的经由之路。
如图所示,从AMQP协议层面上来说:
消息先从生产者Producer出发到达交换器Exchange; 交换器Exchange根据路由规则将消息转发对应的队列Queue之上; 消息在队列Queue上进行存储; 消费者Consumer订阅队列Queue并进行消费。 我们对于消息可靠性的分析也从这四个阶段来一一探讨。
Phase 1
消息从生产者发出到达交换器Exchange,在这个过程中可以发生各种情况,生产者客户端发送出去之后可以发生网络丢包、网络故障等造成消息丢失。一般情况下如果不采取措施,生产者无法感知消息是否已经正确无误的发送到交换器中。如果消息在传输到Exchange的过程中发生失败而可以让生产者感知的话,生产者可以进行进一步的处理动作,比如重新投递相关消息以确保消息的可靠性。
为此AMQP协议在建立之初就考虑到这种情况而提供了事务机制。RabbitMQ客户端中与事务机制相关的方法有三个:
- channel.txSelect、
- channel.txCommit
- channel.txRollback。
channel.txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,而channel.txRollback用于事务回滚。在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。注意这里的RabbitMQ中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。
事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则我们便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制的话会“吸干”RabbitMQ的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP协议层面来看并没有更好的办法,但是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)。
生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。
事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。
生产者通过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式,之后RabbitMQ会返回 Confirm.Select-Ok命令表示同意生产者将当前信道设置为confirm模式。所有被发送的后续消息都被ack或者nack一次,不会出现一条消息即被ack又被nack的情况。并且RabbitMQ也并没有对消息被confirm的快慢做任何保证。
事务机制和publisher confirm机制两者是互斥的,不能共存。如果企图将已开启事务模式的信道再设置为publisher confirm模式,RabbitMQ会报错:{amqp_error, precondition_failed, “cannot switch from tx to confirm mode”, ‘confirm.select’},或者如果企图将已开启publisher confirm模式的信道在设置为事务模式的话,RabbitMQ也会报错:{amqp_error, precondition_failed, “cannot switch from confirm to tx mode”, ‘tx.select’ }。
事务机制和publisher confirm机制确保的是消息能够正确的发送至RabbitMQ,这里的“发送至RabbitMQ”的含义是指消息被正确的发往至RabbitMQ的交换器,如果此交换器没有匹配的队列的话,那么消息也将会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。更进一步的讲,发送方要配合mandatory参数或者备份交换器一起使用来提高消息传输的可靠性。
Phase 2
mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。而RabbitMQ提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。 RabbitMQ 3.0版本开始去掉了对于immediate参数的支持,对此RabbitMQ官方解释是:immediate参数会影响镜像队列的性能,增加代码复杂性,建议采用TTL和DLX的方法替代。所以本文只简单介绍mandatory和备份交换器。 当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形的话,消息直接被丢弃。 那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用channel.addReturnListener来添加ReturnListener监听器实现。使用mandatory参数的关键代码如下所示:
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return返回的结果是:" + message);
}
});
上面代码中生产者没有成功的将消息路由到队列,此时RabbitMQ会通过Basic.Return返回“mandatory test”这条消息,之后生产者客户端通过ReturnListener监听到了这个事件,上面代码的最后输出应该是“Basic.Return返回的结果是:mandatory test”。
生产者可以通过ReturnListener中返回的消息来重新投递或者其它方案来提高消息的可靠性。 备份交换器,英文名称Alternate Exchange,简称AE,或者更直白的可以称之为“备胎交换器”。生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失,如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂化。如果你不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。 可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange参数来实现,也可以通过策略的方式实现。如果两者同时使用的话,前者的优先级更高,会覆盖掉Policy的设置。
参考下图,如果此时我们发送一条消息到normalExchange上,当路由键等于“normalKey”的时候,消息能正确路由到normalQueue这个队列中。如果路由键设为其他值,比如“errorKey”,即消息不能被正确的路由到与normalExchange绑定的任何队列上,此时就会发送给myAe,进而发送到unroutedQueue这个队列。
备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,如若读者想设置为direct或者topic的类型也没有什么不妥。需要注意的是消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。备份交换器的实质就是原有交换器的一个“备胎”,所有无法正确路由的消息都发往这个备份交换器中,可以为所有的交换器设置同一个AE,不过这里需要提前确保的是AE已经正确的绑定了队列,最好类型也是fanout的。如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
Phase 3
mandatory或者AE可以让消息在路由到队列之前得到极大的可靠性保障,但是消息存入队列之后的可靠性又如何保证?
首先是持久化。持久化可以提高队列的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。队列的持久化是通过在声明队列时将durable参数置为true实现的,如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据将会丢失,此时数据也会丢失。正所谓“皮之不存,毛将焉附”,队列都没有了,消息又能存在哪里呢?队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依旧存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。
在持久化的消息正确存入RabbitMQ之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。RabbitMQ并不会为每条消息都做同步存盘(调用内核的fsync6方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
如果在Phase1中采用了事务机制或者publisher confirm机制的话,服务端的返回是在消息落盘之后执行的,这样可以进一步的提高了消息的可靠性。但是即便如此也无法避免单机故障且无法修复(比如磁盘损毁)而引起的消息丢失,这里就需要引入镜像队列。镜像队列相当于配置了副本,绝大多数分布式的东西都有多副本的概念来确保HA。在镜像队列中,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全的保证RabbitMQ消息不丢失(比如机房被炸。。。),但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。
Phase 4
进一步的从消费者的角度来说,如果在消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了这些消息。
采用消息确认机制后,只要设置autoAck参数为false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显式调用Basic.Ack命令为止。
当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认,如果只是简单的拒绝那么消息会丢失,需要将相应的requeue参数设置为true,那么RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。如果requeue参数设置为false的话,RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
还有一种情况需要考虑:requeue的消息是存入队列头部的,即可以快速的又被发送给消费,如果此时消费者又不能正确的消费而又requeue的话就会进入一个无尽的循环之中。对于这种情况,笔者的建议是在出现无法正确消费的消息时不要采用requeue的方式来确保消息可靠性,而是重新投递到新的队列中,比如设定的死信队列中,以此可以避免前面所说的死循环而又可以确保相应的消息不丢失。对于死信队列中的消息可以用另外的方式来消费分析,以便找出问题的根本。