RabbitMQ指南之二:工作队列(Work Queues)

Stella981
• 阅读 638

在上一章的指南中,我们写了一个命名队列:生产者往该命名队列发送消息、消费从从该命名队列中消费消息。在本章中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完成。如果你启动了多个工作者,这些任务将在多个工作者之间分享。

这个概念也即我们说的异步,在项目中,有时候一个简单的Web请求,后台要做一系统的操作,这时候,如果后台执行完成之后再给前台返回消息将会导致浏览器页面等待从而出现假死状态。因此,通常的做法是,在这个Http请求到后台,后台获取到正确的参数等信息后立即给前台返回一个成功标志,然后后台异步地进行后续的操作。

RabbitMQ指南之二:工作队列(Work Queues)

1、准备

  本章中,我们将发送字符串消息来模拟复杂的任务。这里因为没有一个真实的复杂任务,因此用Thread.sleep()方法来模拟复杂耗时的任务。我们用字符串中的含点(“.")的数量来表示任务的复杂程度,一个点表示一秒钟的耗时,例如:一个发送”Hello ...“字符串的任务将会耗时3秒钟。

  我们可以直接将上一章中的_Send.java_代码拿过来修改,允许从命令行发送消息。本程序将会把任务调试到工作队列,因此,我们将类名改为NewTask.java:

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

   此时完整的NewTask.java代码为:

 1 public class NewTask {
 2 
 3     private final static String QUEUE_NAME = "hello";
 4 
 5     public static void main(String[] argv) throws IOException, TimeoutException {
 6 
 7         ConnectionFactory connectionFactory = new ConnectionFactory();
 8         connectionFactory.setHost("HOST");
 9 
10         try(Connection connection = connectionFactory.newConnection();
11             Channel channel = connection.createChannel()) {
12 
13             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
14 
15             String message = String.join(" ", argv);
16             
17             channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
18             System.out.println(" [x] Sent '" + message + "'");
19         }
20     }
21 }

  之前的Recv.java也要做一些修改:模拟字符串消息中的每个点耗时1秒钟,它将处理传送过来的消息并执行任务,因此,我们修改为Work.java:

 1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 2   String message = new String(delivery.getBody(), "UTF-8");
 3 
 4   System.out.println(" [x] Received '" + message + "'");
 5   try {
 6     doWork(message);
 7   } finally {
 8     System.out.println(" [x] Done");
 9   }
10 };
11 boolean autoAck = true; // acknowledgment is covered below
12 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  我们模拟执行过程中耗时的伪任务:

1 private static void doWork(String task) throws InterruptedException {
2     for (char ch: task.toCharArray()) {
3         if (ch == '.') Thread.sleep(1000);
4     }
5 }

  此时完整的Work.java为:

 1 public class Worker {
 2     private final static String TASK_QUEUE_NAME = "hello";
 3 
 4     public static void main(String[] args) throws Exception {
 5 
 6         ConnectionFactory connectionFactory = new ConnectionFactory();
 7         connectionFactory.setHost("HOST");
 8 
 9         Connection connection = connectionFactory.newConnection();
10         Channel channel = connection.createChannel();
11         channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
12 
13         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
14             String message = new String(delivery.getBody(), "UTF-8");
15 
16             System.out.println(" [x] Received '" + message + "'");
17             try {
18                 doWork(message);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             } finally {
22                 System.out.println(" [x] Done");
23             }
24         };
25 
26         boolean autoAck = true; // acknowledgment is covered below
27         channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
28     }
29 
30     private static void doWork(String task) throws InterruptedException {
31         for (char ch: task.toCharArray()) {
32             if (ch == '.') Thread.sleep(1000);
33         }
34     }
35 }

2、循环调度

  使用工作队列的优点之一是能够轻松地进行并行化操作。假设我们在做一个后台日志收集系统,我们可以很容易地增加更多的Worker从而提高系统性能。

  首先,我们同时启动两个Worker,同样地,我这里也放到IDEA中启动:

  RabbitMQ指南之二:工作队列(Work Queues)

  接下来,我们先后启动5个Task,并分别通过main()参数传入五个字符串消息:

1 First message.
2 Second message..
3 Third message...
4 Fourth message....
5 Fifth message.....

  RabbitMQ指南之二:工作队列(Work Queues)

   执行五个发送任务之后,来看一下两个Worker都接收到了什么样的消息:

  RabbitMQ指南之二:工作队列(Work Queues)

        RabbitMQ指南之二:工作队列(Work Queues)

  默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息。这种消息的调度方式称之为循环调度,你可以开启更多的Worker来进行测试。

3、消息回执

  因为消费者执行一个任务会有时间耗时,假设一个消费者在执行一个任务执行一半的时候挂掉了将会怎样?消息会不会因此丢失?在我们目前的代码里,一旦RabbitMq将一条消息转发给了一个消费者后,将会立即将消息删除(注意Worker.java里的autoAck),因此,在我们上面例子里,如kill掉一个正在处理数据的Worker,那么该数据将会丢失。不仅如此,所有那些指派给该Worker的还未处理的消息也会丢失。

  但在实际工作的,我们并不希望一个Worker挂掉之后就会丢失数据,我们希望的是:如果该Worker挂掉了,所有转发给该Worker的消息将会重新转发给其他Worker进行处理(包括处理了一半的消息)。为了确保一条消息永不丢失,RabbitMq支持消息回执。消费者在接收到一条消息,并且成功处理完成之后会给RabbitMq回发一条确认ack确认消息,RabbitMq此时才会删除该条消息。

  如果一个Worker正在处理一条消息时挂掉了(信道关闭、连接关闭、TCP连接丢失),它将没有机会发送ack回执,RabbitMq就认为该消息没有消费成功,于是便会将该消息重新放到队列中,如果此时有其他消费者还是在线状态,RabbitMq会立即将该条消息再转发给其他在线的消费者。这种机制可以保证任何消息都不会丢失。

  默认情况下,需要手动进行消息确认,在前面的例子里,我们通过autoAck=true显示地关闭了手动消息确认,因此,RabbitMq将采用自动消息确认的机制。现在,我们修改我们的程序,采用手动发送回执的方式,当我们完成对消息的处理后,再手动发送回执确认:

 1 channel.basicQos(1); // accept only one unack-ed message at a time (see below)
 2 
 3 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 4   String message = new String(delivery.getBody(), "UTF-8");
 5 
 6   System.out.println(" [x] Received '" + message + "'");
 7   try {
 8     doWork(message);
 9   } finally {
10     System.out.println(" [x] Done");
11     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
12   }
13 };
14 boolean autoAck = false;
15 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  ack发送信道必须和接收消息的信道(channel)是同一个,如果尝试通过一个不同的信道发送ack回执,将会抛出channel等级协议异常(官网说会抛出异常,但是我在实际测试中并没有抛异常,只是该条消息得不到回执,从而也无法删除)。

  一个常见的错误是忘了手动回执,虽然只是一个简单的错误,但是带来的后果却是严重的,它将导致已经消费掉的消费不会被删除,并且当消费该消息的消费者在退出之后,RabbitMq会将该条消息重新进行转发,内存将被慢慢耗尽。我们可以通过正面的命令来检查这种错误:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

RabbitMQ指南之二:工作队列(Work Queues)

RabbitMQ指南之二:工作队列(Work Queues)

  该命令有三列内容,第一列是在监听的队列名称,第二列是Ready状态的消息数量,第三列是Unacked的消息数量。

4、消息的持久化

  在3中我们讲解了如何保证当消费者挂掉之后消息不被丢失,但是,如果RabbitMq服务或者部署RabbitMq的服务器挂掉了之后,消息仍然会丢失。当RabbitMq崩溃之后,它将会忘记所有的队列和消息,除非,有什么机制让RabbitMq将队列信息和消息保存下来。

  要确保消息和队列不会丢失,我们必须要确保两件事情。

  首先,我们要确保RabbitMq永远不丢失队列,要做到这点,我们在定义的时候就需要告诉RabbitMq它是需要持久化的,通过指定durable参数实现:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

  虽然这个命令本身是正确的,但是在我们目前它不能工作。因为我们前面已经定义了一个非持久化的hello队列,RabbitMq不允许重新定义一个已经存在的队列(用不同的参数),否则会抛出异常:

Exception in thread "main" java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
    at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:23)
    Suppressed: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10)
        at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:396)
        at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:292)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:607)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:541)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
        at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:29)

  要么重启RabbitMq让该临时队列消失,要么在控制台将该队列删除,或者重新创建一个新的队列:

1 boolean durable = true;
2 channel.queueDeclare("task_queue", durable, false, false, null);

  生产者和消费者要做同步修改。

  上面这一步,我们保证了队列(task_quee)的持久化,此时,即便RabbitMq崩溃了也不会丢失该队列,当RabbitMq重启后将自动重新加载该队列。

  其次,我们需要确保我们的消息也被持久化,要做到这一点,在生产者发布消息的时候需要指定消息的属性为:PERSISTENT_TEXT_PLAIN。

1 import com.rabbitmq.client.MessageProperties;
2 
3 channel.basicPublish("", "task_queue",
4             MessageProperties.PERSISTENT_TEXT_PLAIN,
5             message.getBytes());

  注意,即便设置了消息的持久化属性也不能保证消息会被100%地写入到磁盘中,因为RabbitMq在接收到消息和写入到磁盘不是同步的,有可能消息只是被写入到缓存中而还没来和及写入磁盘的时候,RabbitMq崩溃了,此时也会丢失消息。但无论如何,比前面简单的消息队列已经强大了很多。

5、公平调度

  您可能已经注意到,任务调度仍然不能完全按照我们希望的方式工作。举个例子,在只有两个Worker的环境中,奇数的消息比较重,偶数的消息比较轻时,一个Worker将会一直处于忙碌状态,而另一个Worker将会一直处于空闲状态,但RabbitMq并不知道这种情况,它会依然均衡地向两个Worker传递消息。

  发生这种情况是因为,当一个消息进入队列之后,RabbitMq只是盲目地将该第n个消息转发给第n个消费者,它并不关注每个消费者发了多少个回执。

RabbitMQ指南之二:工作队列(Work Queues)

  为了解决这个问题,我们可以通过调用basicQos方法,给它传入1。这将告诉RabbitMq不要同时给一个队列转发多于1条的消息,换句话说,在一个消费者没有完成并回执前一条消息时,不要再给它转发其他消息。

1 int prefetchCount = 1;
2 channel.basicQos(prefetchCount);

6、完整的代码

  一、NewTask.java

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.MessageProperties;
 5 
 6 public class NewTask {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     try (Connection connection = factory.newConnection();
14          Channel channel = connection.createChannel()) {
15         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
16 
17         String message = String.join(" ", argv);
18 
19         channel.basicPublish("", TASK_QUEUE_NAME,
20                 MessageProperties.PERSISTENT_TEXT_PLAIN,
21                 message.getBytes("UTF-8"));
22         System.out.println(" [x] Sent '" + message + "'");
23     }
24   }
25 
26 }

二、Worker.java

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.DeliverCallback;
 5 
 6 public class Worker {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     final Connection connection = factory.newConnection();
14     final Channel channel = connection.createChannel();
15 
16     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
17     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
18 
19     channel.basicQos(1);
20 
21     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
22         String message = new String(delivery.getBody(), "UTF-8");
23 
24         System.out.println(" [x] Received '" + message + "'");
25         try {
26             doWork(message);
27         } finally {
28             System.out.println(" [x] Done");
29             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
30         }
31     };
32     channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
33   }
34 
35   private static void doWork(String task) {
36     for (char ch : task.toCharArray()) {
37         if (ch == '.') {
38             try {
39                 Thread.sleep(1000);
40             } catch (InterruptedException _ignored) {
41                 Thread.currentThread().interrupt();
42             }
43         }
44     }
45   }
46 }

点关注,不迷路,这是一个程序员都想要关注的二维码

RabbitMQ指南之二:工作队列(Work Queues)

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
RabbitMQ如何高效的消费消息
在上篇介绍了如何简单的发送一个消息队列之后,我们本篇来看下RabbitMQ的另外一种模式,工作队列。什么是工作队列我们上篇文章说的是,一个生产者生产了消息被一个消费者消费了,如下图!(https://usergoldcdn.xitu.io/2020/5/15/1721768c1b303014?w1824&h55
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
3.rabbitmq
rabbitmq发布订阅模式模型组成一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Stella981 Stella981
3年前
RabbitMQ教程——工作队列
工作队列(使用pika0.9.8Python客户端)!(http://static.oschina.net/uploads/space/2015/0606/135027_1PPx_919237.png)在第一份教程中我们编写了用于从一个命名队列中接收消息和向一个命名队列中发送消息的程序。在这份教程中,我们将创建一个_工作队
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这