RabbitMQ教程——工作队列

Stella981
• 阅读 668

工作队列

(使用 pika 0.9.8 Python客户端)

RabbitMQ教程——工作队列

在第一份教程中我们编写了用于从一个命名队列中接收消息和向一个命名队列中发送消息的程序。在这份教程中,我们将创建一个_工作队列_,它将被用于在多个工作者之间分发耗时的任务。

工作队列(aka:_任务队列_)背后主要的思想是,要避免立即执行一个资源密集的任务并等待它的完成。相反是调度任务稍后完成。我们把一个_任务_封装为一个消息,并把它发送到队列中。一个在后台运行的工作者进程将会pop任务,并最终执行工作。当你运行了很多工作者进程时,任务将在它们之间共享。消息的另外一种形式——任务(Task),或Work。

这个概念在web应用中特别有用,因为在那样的场景中,在一个很短的HTTP请求窗口中处理一个复杂的任务非常重要。

准备

在这份教程的前一部分,我们发送了一条包含有"Hello World!"字符串的消息。现在我们将发送代表复杂任务的字符串。我们并没有一个真正的任务,比如放缩图片,或渲染pdf文件,因而让我们假装我们处于忙碌状态——通过使用time.sleep()函数。我们将把字符串中点的个数作为它的复杂度;每个点占用一秒的"work"。比如,一个由Hello...描述的假想的任务将耗时3秒。

我们将稍微对来自于我们前面的例子的_send.py_代码做一些改动,以便于能够从命令行发送任意的消息。这个程序将会把任务调度到我们的工作队列,因而让我们把它命名为new_task.py:

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)

我们老的_receive.py_脚本也需要做一些修改:它需要针对消息体中的每一个点假装做一秒钟的工作。它将从队列中pop消息并执行task,因而让我们把它称为worker.py:

import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"

由此可见,所谓task形式的消息,其实也是一个字符串消息。然后由消息的接收者将这个字符串解释为一个task。那个字符串应该是按照一定的格式对某一个task进行了序列化的结果,而接收者则将字符串反序列化,并执行task。与RPC远程过程调用好像。

循环调度

使用任务队列的一个好处是,可以很方便的将工作并行化。如果我们正在构建一个工作的backlog,我们可以通过简单地添加更多的工作者进程来扩展。

首先,让我们试着同时运行两个worker.py脚本。它们都将从队列中获取消息,但是究竟怎么样呢?让我们来看一下。

你需要打开3个终端。两个运行worker.py脚本。这些终端将是我们的两个消费者 - C1和C2。

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

在第三个终端中,我们将发布新的任务。一旦你启动了消费者,则你可以发布一些消息来看一下:

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

让我们看一下向我们的工作者进程传递了些什么:

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默认情况下,RabbitMQ将顺序地把每一条消息发送给下一个消费者。平均下来,每个消费者将得到相同数量的消息。这种分发消息的方式被称为循环调度(round-robin)。试一下启动3个或更多workers进程的情况。

消息确认

执行一个task可能会耗费好几秒。你可能好奇,一个消费者启动了一个需要长时间运行的task,却在只完成了一部分工作时死掉了,会发生什么。在我们当前的代码中,一旦RabbitMQ把消息发送给了客户,它将立即把它从内存中移除。在这种情况下,如果你杀死了一个工作者进程,我们将会丢失它正在处理的消息。我们也会丢失那些分配给这个特定的worker进程但还没有来得及被处理的消息。

但我们不想丢失任何tasks。如果一个worker进程死掉了,我们希望task被分配给另外的一个worker进程来执行。

为了确保不会有消息丢失,RabbitMQ支持消息_确认_。一个ack(nowledgement)被从消费者发回给RabbitMQ以告诉RabbitMQ一个特定的消息已经被接收、处理过了,从而RabbitMQ可以自由地删除它了。

如果一个消费者死掉了却没有发送ack,RabbitMQ将明白,一个消息没有被完全处理掉,它将重新把那个消息发送给另一个消费者。通过这种方式你可以确保没有消息会丢失,即使workers进程突然死亡。

没有任何消息超时机制;RabbitMQ只有在worker连接死掉时,才会重新发送消息。即使处理一个消息需要耗费非常非常长的时间,它也会好好的。

默认情况下消息确认是打开的。在前面的例子中,我们显式地通过no_ack=True标记把它们给关掉了。是时候移除这个标记,并在任务完成时从worker进程中发送一个适当的确认了。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

使用这段代码,我们就可以确保,即使你在worker进程处理一条消息时使用CTRL+C杀死了它,也不会有任何东西丢失。worker进程死后不久所有的未确认消息就将被重新发送了。

忘记确认

一个常见的错误就是漏掉了basic_ack。它是一个简单的错误,但结果却很严重。消息将在你的客户端退出时被重新分发(这看起来将像是随机的重复分发),但RabbitMQ将由于它不能释放任何未确认的消息,而吃掉越来越多的内存。

为了调试这种类型的错误,你可以使用rabbitmqcti来打印messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

消息持久性

我们已经学习了如何确保,即使消费者死掉了,task也不会丢失。但如果RabbitMQ服务器停掉了,我们的tasks将依然会丢失。

当RabbitMQ停止或崩溃时,它将忘记所有的队列和消息,除非你告诉它不要那样做。需要做两个事情来确保消息不会丢失:我们需要把队列和消息都标记为durable。

首先,我们需要确保RabbitMQ将从不丢失我们的队列。要做到这一点,我们需要把它声明为_durable_:

channel.queue_declare(queue='hello', durable=True)

尽管这个命令本身是正确的,但它在我们的设置中不起作用。那是因为我们已经定义了一个不是durable的称为hello的队列。RabbitMQ不允许你以不同的参数重新定义一个已经存在的队列,它将会向尝试那样做的任何程序返回一个error。但有一个快速的workaround - 让我们以不同的名字声明一个队列,比如task_queue:

channel.queue_declare(queue='task_queue', durable=True)

这个queue_declare的改变需要都被应用到生产者和消费者代码中。

那时我们确定task_queue队列不会丢失,即使RabbitMQ重启。现在我们需要把我们的消息标记为persistent - 通过提供一个值为2的delivery_mode属性。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

注意消息的持久性

把消息标记为persistent不完全保证一条消息不会丢失。尽管它告诉RabbitMQ把消息保存到磁盘上,但当RabbitMQ已经接收了一条消息但还没有保存它时,还是有一个短暂的时间窗口。此外,RabbitMQ不会为每一条消息执行fsync --- 消息可能只是被保存到了cache而没有被真正地写入到磁盘。持久性的保证不是很强,但对于我们的简单任务队列,它足够了。如果你需要一个更强的保证,则你可以使用publisher confirms

公平分发

你可能已经注意到了,分发依然没有完全按照我们期待的那样来做。比如有两个workers进程的情形,但所有的奇数消息都很重,而偶数消息比较轻时,一个worker进程将持续处于忙碌状态,而另一个则几乎没有工作做。很好,RabbitMQ不知道任何关于那些的东西,并将继续公平地分发消息。

这样的事情会发生是由于,RabbitMQ只是在消息进入队列时分发消息。它不去关心一个特定的消费者未确认的消息的个数。它只是盲目地将第n个消息分发给第n个消费者。

RabbitMQ教程——工作队列

为了解决那个问题,我们可以使用basic.qos方法,同时带有的prefetch_count=1设置。这告诉RabbitMQ不要一次给一个特定的worker进程多于一个的消息。或者,换句话说,在一个工作者进程处理完成并且确认了前一个消息之前不要给它分发一个新的消息。想反,它将给下一个不处于忙碌状态的worker进程分发。

channel.basic_qos(prefetch_count=1)

小心队列大小

如果所有的workers进程都处于忙碌状态,你的队列可能填满。你可能想要关注这一点,并可能添加更多的worker进程,或使用其它的策略。

完整代码

new_task.py脚本最后的代码

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

(new_task.py source)

我们的worker:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
channel.start_consuming()

(worker.py source)

你可以使用消息确认和prefetch_count来建立一个工作队列。持久性选项使得tasks即使在RabbitMQ被重启的情况下也能够存活。

现在我们移向tutorial 3来学习如何将相同的消息传送给多个消费者了。

Done。

原文地址

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
RabbitMQ_消息队列基本使用_2
简介RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者也可以从一个队列中接受消息。pika&使用rabbitmq使用的协议是amqp,用于python的推荐客户端是pikapipinstallpika
Stella981 Stella981
3年前
RabbitMQ如何高效的消费消息
在上篇介绍了如何简单的发送一个消息队列之后,我们本篇来看下RabbitMQ的另外一种模式,工作队列。什么是工作队列我们上篇文章说的是,一个生产者生产了消息被一个消费者消费了,如下图!(https://usergoldcdn.xitu.io/2020/5/15/1721768c1b303014?w1824&h55
Stella981 Stella981
3年前
Android消息循环分析
我们的常用的系统中,程序的工作通常是有事件驱动和消息驱动两种方式,在Android系统中,Java应用程序是靠消息驱动来工作的。消息驱动的原理就是:1\.有一个消息队列,可以往这个队列中投递消息;2\.有一个消息循环,不断从消息队列中取出消息,然后进行处理。在Android中通过Looper来封装消息循环,同时在其中封装了一个消息队
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Stella981 Stella981
3年前
RabbitMQ指南之二:工作队列(Work Queues)
在上一章的指南中,我们写了一个命名队列:生产者往该命名队列发送消息、消费从从该命名队列中消费消息。在本章中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完
Stella981 Stella981
3年前
Beanstalkd工作队列
Beanstalkd工作队列Beanstalkd是什么Beanstalkd是目前一个绝对可靠,易于安装的消息传递服务,主要用例是管理不同部分和工人之间的工作流应用程序的部署通过工作队列和消息堆栈,类似于其他受欢迎的解决方案,比如RabbitMQ。然而,创建Beanstalkd使它有别于其他工作。Beanstalkd旨在成为一个工作队列,而不是一
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这