消息持久化
消息在传输过程中,可能会出现各种异常失败甚至宕机情况,为了保证消息传输的可靠性,需要进行持久化,也就是在数据写在磁盘上。消息队列持久化包括三部分:
1.Message持久化,也就是发送时消息持久化。(Message包含body,body为我们需要发送的消息具体内容,一般以json字符串发送,消费端再解析;MessageProperties为Message的一些额外的属性,做一些扩展作用)
Message message = MessageBuilder.withBody("我们发送的消息内容存放在message的body里面".getBytes()).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
2.队列持久化
@Bean
public Queue helloQueue() {
return new Queue("hello",true);
}
第一个参数为队列名,第二个参数为durable,是否持久化。Queue的其他属性
a) Exclusive属性:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道channel是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
b) Auto-delete属性:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
3.交换机持久化
@Bean
public DirectExchange helloExchange(){
return new DirectExchange("helloexchange",true,false);
}
第一个参数为交换机名,第二个参数为durable,是否持久化。第二个参数为autoDelete,当交换机没有绑定队列时会自动删除交换机。
消息确认机制
在保证了消息队列持久化的基础上来做消息确认机制,在两个地方确认。
a) 确认发送(生产者)
有两种方式,只能使用其中一种。
1.transaction 事务
//原生客户端
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
//springRabbitMQ客户端采用confirm机制就不能使用事物。两者只能用一个,rabbittemplate开启事务
rabbittemplate.setChannelTransacted(true);
使用事务的方式确保消息送达,只有消息成功被rabbiMQ的broker接收后,事务提交才能成功,异常中进行事务回滚操作,其实就是同步的,事务会影响性能,可选择confirm。
2.confirm
confirm是异步的,直接发送不需要同步等待,会触发confirm方法,ack为true时消息发送成功,false发送失败可以做一些处理,先要设置为confirm模式。
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("develop");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("dev");
connectionFactory.setPublisherConfirms(true); //必须要设置
return connectionFactory;
}
//设置ConfirmCallback
rabbittemplate.setConfirmCallback(new MyConfirmCallBack());
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import cn.zkz.takeaway.service.amqp.conf.MessageCache;
public class MyConfirmCallBack implements ConfirmCallback {
/**
* 目前调用情况
* 1.echange存在时,routeKey路由无论是否存在,ack为true
* 2.echange不存在时,ack为false。
* 3.当exchange为null或者空字符串时,ack为true
* 网络中断会导致无confirm
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 消息id
String msgId = correlationData.getId();
if (ack) {// 发送成功
//LOGGER.info("confirm:消息id:[{}] 投递成功" + msgId);
MessageCache.deleteSend(msgId);
} else { // 发送失败
//发送失败后 重试 cause 失败原因
}
}
}
b) 确认消费(消费者)
直接写在了代码注释里面,就不多写了
@Bean
public SimpleMessageListenerContainer userMessageContainer(Queue helloQueue,ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(helloQueue);
//需要将channel暴露给listener才能手动确认,AcknowledgeMode.MANUAL时必须为ture
container.setExposeListenerChannel(true);
//并发消费设置最大消费者数量,并发消费的时候需要设置,且>=concurrentConsumers
container.setMaxConcurrentConsumers(3);
//设置默认当前消费者数量
container.setConcurrentConsumers(1);
//设置确认模式手工确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置listener
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
//业务处理
} catch (Exception e) {
// TODO: handle exception
} finally {
//可以根据业务需求调用三个当中的其中一个,如果未调用,消息一直时UnAck状态,unack数量等于PrefetchCount时,不会收到消息
//Queue会一直保存,多个消费者存在时,不会发给其他消费者。
//确认收到,第二个参数是否为批量回复
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//Nack,第二个参数是否为批量回复,第三个参数值该消息是否回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
//拒绝,第二个参数值该消息是否回到队列,只能一个一个地拒绝
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
});
//会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,
//即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
container.setPrefetchCount(5);
return container;
}