RabbitMQ简介

Stella981
• 阅读 563

先决条件

这份指南会假定你已经安装并在localhost的标准端口(5672)上运行了RabbitMQ。但如果你使用了一个不同的主机、端口或认证机制,则连接设置也要做相应的修改。

RabbitMQ是一个消息的中继代理。它的主要概念非常简单:接收并转发消息。你可以把它想象成一个邮局:当你将信件放进邮箱的时候,你非常确定邮差最终会将信件送到你的收件人手里。使用这个例子来隐喻RabbitMQ,则它是一个邮箱,一个邮局及一个邮差。

RabbitMQ和邮箱最主要的区别在于,它不是处理纸质信件,而是接收、存储及转发二进制的数据块——即_消息_。

RabbitMQ,及一般的消息机制,使用了如下的一些术语:

_生产_仅指发送。一个发送消息的程序是一个_生产者_。我们将用下面这样的图来表示它,注意里面有一个字母“P”:

RabbitMQ简介

一个_队列_是一个邮箱的名字。它位于RabbitMQ的内部。尽管消息流过了RabbitMQ及你的应用程序,但它们只能存储在一个_队列_内部。一个队列不因任何限制而存在边界,它可以存储任意多的消息——它必须是一个无限大的buffer。许多_生产者_可以将消息发送到一个队列中,许多_消费者_可以从一个队列中接收数据。一个队列将用下面这样的图来表示,它的上面是它的名字:

RabbitMQ简介

_消费_的含义与接收相似。一个_消费者_是一个几乎总是在等待着接收消息的程序。我们用下面这样的图来表示,其中有一个“C”字符:
RabbitMQ简介
注意,生产者,消费者,及消息中继代理(broker)不一定要位于同一台机器;实际上在大多数应用中它们确实不在于同一台机器。

Hello World!

(使用pika 0.9.8 Python客户端)
我们的 "Hello world"不会太复杂 —— 让我们发送一条消息,接收它并把它的内容打印到屏幕上。要做到这一点,我们需要两个程序:一个发送消息,另一个接收并打印它。
我们的总体设计看起来像下面这样:

RabbitMQ简介

RabbitMQ库

RabbitMQ使用了一个称为AMQP的协议。要使用Rabbit,你需要一个库,它能够理解与Rabbit所用的相同的协议。几乎每种编程语言都有一个这样的库。Python也不例外,而且还有好多个这样的库可以选择:

py-amqplib
txAMQP
pika

在这个系列的指南中,我们将使用pika。要你可以安装它,使用pip包管理工具:

$ sudo pip install pika==0.9.8

安装操作依赖于pipgit-core包,你可能需要首先安装它们。

在Ubuntu上:

$ sudo apt-get install python-pip git-core

在Debian上:

$ sudo apt-get install python-setuptools git-core
$ sudo easy_install pip

在Windows上: 要安装easy_install,则运行MS Windows Installer的setuptools

> easy_install pip
> pip install pika==0.9.8

发送

RabbitMQ简介

我们的第一个程序send.py将向队列发送一条消息。我们所要做的第一件事就是建立一个与RabbitMQ服务器的连接。

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

现在我们已经连接到一个本地机器——即localhost上的broker 了。如果我们想要连接一个运行于不同机器上的broker,我们只需在这里指定它的名字或IP地址即可。

接下来,在发送之前,我们需要确认接收队列的存在。如果我们向不存在的位置发送了一条消息,RabbitMQ将是简单地丢弃这条消息。让我们创建一个消息将被发送到的目的队列,让我们把它命名为_hello_:

channel.queue_declare(queue='hello')

至此我们已经为发送消息做好了准备。我们的第一条消息将只包含一条字符串_Hello World!_,我们想要把它发送到我们的_hello_队列中。

在RabbitMQ中,消息从来都不会被直接发送给队列,它总是需要通过一个_exchange_。但让我们先不要纠缠于这些细节 —— 你可以在这份指南的第三部分读到更多关于_exchange_的东西。现在我们需要了解的全部即是如何使用由一个空字符串标识的默认exchange。这个exchange有点特别 —— 它允许我们精确的指定消息应该被发送到哪个队列。队列的名字需要通过routing_key参数来指定:

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"

在退出程序之前,我们需要确保网络缓冲区有被刷出(flush),并且我们的消息被实际传送到了RabbitMQ。我们可以通过优雅的关闭连接来做到这一点。

connection.close()

发送操作不起作用

如果这是你第一次使用RabbitMQ,而且你没有看到“被发送”的消息,你可能因此而抓耳挠腮的想到底是什么地方出了问题。可能是由于broker没有足够的空闲磁盘空间(默认情况下它需要至少1Gb的空闲空间),因而它拒绝接收消息。检查broker logfile来确认,并在需要的时候降低限制。配置文件文档将向你说明如何设置disk_free_limit。

接收

RabbitMQ简介

我们的第二个程序receive.py将从队列接收消息,并把它们打印到屏幕上。

再一次,我们需要做的第一件事就是连接RabbitMQ服务器。用于连接到Rabbit的代码与前面的一样。

下一步,像之前一样,是确保队列的存在。使用queue_declare来创建一个队列是幂等的(idempotent) —— 对于下面的命令,我们可以执行任意多次,但只有一个队列会被创建:

channel.queue_declare(queue='hello')

你可能会问,我们为什么又声明了一次队列 —— 我们已经在前面的代码中声明了它。如果我们能确定队列存在的话我们可以省略这行代码。比如,如果send.py程序已经被在前面运行过了。但我们还无法确定哪个程序会先运行。在这种情况下,在两个程序中重复声明队列是一种比较好的实践。

列出队列

你可能想要查看RabbitMQ有什么队列,及它们之中有多少条消息。你可以(以一个特权用户的身份)通过使用rabbitmqctl工具来做到这一点:

$ sudo rabbitmqctl list_queues
Listing queues ...
hello    0
...done.

(在Windows上则省略sudo)

从一个队列接收消息要更复杂一点。它通过提交一个callback函数给一个队列来实现。无论何时我们接收一条消息,这个callback函数被Pika库调到。在我们的例子中,这个函数将向屏幕打印消息的内容。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

接着我们需要告诉RabbitMQ这个特定的callback函数应该从我们的_hello_队列中接收消息:

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

要使这个命令成功执行,我们必须确保我们想要提交的队列是存在的。很幸运,我们对这一点很自信——我们已经在上面创建了一个队列——使用queue_declare。

稍后我们将描述no_ack参数。

最后,我们进入一个无限循环,一直等待数据,并在任何需要的时候运行callbacks。

print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

完整代码

send.py的完整的代码:

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

(send.py source)

receive.py的完整代码:

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
channel.start_consuming()

(receive.py source)

现在我们可以在终端中试一下我们程序了。首先,让我们使用我们的send.py程序发送

 $ python send.py
 [x] Sent 'Hello World!'

生产者程序send.py将在每次运行之后就停止。让我们接收它:

Hurray!我们能够通过RabbitMQ发送我们的第一条消息了。正如你可能已经注意到的,receive.py程序没有退出。它将准备着接收更多的消息,并可以通过Ctrl-C来中断。

试着在一个新的终端中再一次运行send.py。

我们已经学习了如何向/从一个命名队列中发送和接收一条消息了。是时候移向第二部分,并构建一个简单的_工作队列_了。

除了前文中已经有过说明,会在后续文档中进行说明的exchange的概念外,connection的channel又是什么概念呢?发送消息时,除了用channel.basic_publish()函数之外,还能使用哪些函数?那些函数与这个函数的区别又是什么?这分文档中的send.py程序中发送消息时,其body为一个字符串,除字符串形式的body外,消息的body还有哪些形式?工作队列的含义又是什么?

Done。

原文地址

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这