RabbitMQ,Kafka与RPC原理,
参考连接:
https://www.rabbitmq.com/getstarted.html
rabbitmq默认端口:5672
笔记整理:
# -*- coding: utf-8 -*-
# __author__ = "maple"
""""""
# 1. 你了解的消息队列
"""
- Queue,将数据存储当前服务器的内存.
- redis 列表,
- rabbitMQ/kafka/zeroMQ(专业做消息队列)
补充:saltstack
- ssh:安装方便,但执行效率慢.
- agent:执行效率高(基于消息队列zeroMQ做的RPC).
"""
# 2. 公司在什么情况下会用消息队列?
"""
任务处理,请求数量太多,需要把消息临时放到某个地方.
发布订阅,一旦发布消息,所有订阅者都会收到一条相同的消息.
应用场景:
- 长轮询
- 智能玩具调用百度AI接口时,celery + RabbitMQ
- 生产者&消费者
"""
# 3. rabbitMQ安装
"""
服务端: 192.168.19.14
安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
安装erlang
$ yum -y install erlang
安装RabbitMQ
$ yum -y install rabbitmq-server
启动(无用户名密码):
service rabbitmq-server start/stop
设置用户密码:
sudo rabbitmqctl add_user xxx pwd # 设置用户名跟密码
# 设置用户为administrator角色
sudo rabbitmqctl set_user_tags wupeiqi administrator
# 设置权限
sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*" #此处的 * 代表对rabbitmq的所有队列都有权限.
service rabbitmq-server start/stop
客户端:
pip3 install pika
"""
# 4. 使用
"""
生产者消费者
n VS 1
n VS m
发布订阅
fanout,和exchange关联的所有队列都会接收到信息.
direct,关键字精确匹配exchange关联的队列都会接收到信息.
topic,关键字模糊匹配exchange关联的队列都会接收到信息.
"""
# 5. exchange是什么?
"""
消息处理的重建建,可以帮助生成者将相关信息发送到指定相关队列.
"""
# 6. RPC
"""
前戏:
我 -> 去哪儿 -> 首都机场票务中心
远程过程调用.
我 -> 去哪儿 任务/结果 首都机场票务中心
...
"""
rabbitmq:在数据安全方面(保证数据不丢失比较擅长)
1.A向B发了一个请求,B把数据放到rabbitmq里了,然后B去rabbitmq里面取数据去处理,
还没有处理完B挂掉了,rabbitmq可以保证数据不丢.
2.A向B发了一个请求,B还没有把数据放到rabbitmq里了,B挂掉了,此时数据就会丢失,跟rabbitmq还没有关系呢;
3.A向B发了一个请求,B把数据放到rabbitmq里了,B还没有去rabbitmq取数据,此时rabbitmq挂掉了,此时数据不会丢失;
即:客户端挂掉或者服务端挂掉,数据都不会丢失的,
kafka:
在分布式以及消息的传递更快一些,消息的存储/发送要比rabbitmq快,
zeromq:集成在saltstack里面,
minion在监听队列,有数据的时候就去取,然后执行命令,执行结果放到另一个队列里面,
执行效率要比rabbitmq跟kafka都要高,
为什么要使用消息队列:
当前的服务器或者应用处理不了太多的数据,需要先把数据放到某个地方(队列),然后慢慢去处理.
代码: 生产者
# -*- coding: utf-8 -*-
import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14')) # 先建一个链接
# 有密码
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel() # 创建一个对象
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='duilie_1')
channel.basic_publish(exchange='',
routing_key='duilie_1', # 消息队列名称
body='msg7') # 发送的数据
connection.close() # 记得关闭链接
/
消费者:
# -*- coding: utf-8 -*-
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='s13q1')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(callback,queue='s13q1',no_ack=True) # 有消息立即执行callback函数,没有消息就夯住了,
channel.start_consuming() # 开始进行消费.
//
注意:
1.声明一个队列,如果存在就使用,不存在就创建一个;
有消息会立即执行函数,没有消息就会夯住了,
可以有多个生产者,多个消费者(代表多个人处理同一个队列)
默认处理队列,多个消费者的时候,是轮流来取数据的,
ack回复:带回复功能,针对客户端挂掉之后来做的.(客户端挂掉,数据的保留.服务端进行的数据保存)
ack就是来确保数据安全的,从服务端取走数据之后,如果开启回复的功能,需要等客户端回复,服务端才会把数据清掉,
假如客户端取走数据之后,没来及处理,挂掉了,也就是没有回复服务端,服务端会保留数据的.
针对消费者进行的改动,生产者不用改动.import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='s13q2')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
# int('asdfasdf')
ch.basic_ack(delivery_tag=method.delivery_tag) # 此行代码代表告诉服务端,数据已经取走了,只要执行到此,服务端的数据就清掉了.
channel.basic_consume(callback,queue='s13q2',no_ack=False) # 此处的no_ack=False,就是开启回复
channel.start_consuming()
//
durable持久化,服务端挂掉,数据的保留.
在生产者处进行的持久化,
生产者进行的改动,消费者不用改动.import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14'))
# 有密码
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)-支持持久化
channel.queue_declare(queue='s13q3',durable=True) # 标红加粗是新加的参数.另外,一个队列如果建立时没有进行持久化,后续加参数是不管用.
channel.basic_publish(exchange='',
routing_key='s13q3', # 消息队列名称
body='msg7',
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
)) # 标红的是新加的参数.
connection.close()
/
避免排队,充分利用消费者
生产者不用动,消费者进行修改.import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='s13q1')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_qos(prefetch_count=1) # 此处是进行的配置,哪个闲置,哪个就进行额外的多工作.
channel.basic_consume(callback,queue='s13q1',no_ack=True)
channel.start_consuming()
/
发布订阅
发布订阅的时候,队列还是要随机生成的,
生成一个exchange(exchange只要创建了就不会再创建,),消费者脚本只要运行一个,就生成一个队列
消费者_订阅者
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m1',exchange_type='fanout') #exchange='m1',是随意起的名字(和生产者的要对应上) fanout就是所有队列都放一份任务, 代指的是消息传送的模式
# 随机生成一个队列,
result = channel.queue_declare(exclusive=True) # 随机的生成一个队列
queue_name = result.method.queue # 队列名称
# 让exchange和queue进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
/
生产者_发布者
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m1',exchange_type='fanout') # 指定 exchange
channel.basic_publish(exchange='m1',
routing_key='', # 此处routing_key是空的,
body='gb')
connection.close()
//
/
/
带关键字的发布订阅(相当于指定对哪个队列进行发布)
消费者_订阅者1
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m2',exchange_type='direct') # 注意exchange_type的类型要变一下
# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='test1') # 此处指定关键字,就是队列的名字
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='test2')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
/
消费者_订阅者2
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m2',exchange_type='direct')
# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='test1')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
/
生产者_发布者
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m2',exchange_type='direct')
channel.basic_publish(exchange='m2',
routing_key='test1', # 此处有哪个关键字, 绑定关键字的消费者就能拿到数据
body='xxxx')
connection.close()
/
关键字的模糊匹配
生产者_发布
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m3',exchange_type='topic') # 注意:exchange_type的类型又变了
channel.basic_publish(exchange='m3',
routing_key='old.alex.py', # 队列名称
body='x1')
connection.close()
/
消费者_订阅者_1
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m3',exchange_type='topic')
# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='old.*') # *是匹配一个单词,#是可以匹配多个单词的,
def callback(ch, method, properties, body):
print(method.routing_key)
print("消费者接受到了任务: %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
/
消费者_订阅者_2
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m3',exchange_type='topic')
# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='old.#')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
/
rabbitmq实现的rpc
rpc:远程过程调用
rpc:远程过程调用.
例子:
就是去哪网,携程网,都去票务中心订票,
他们把任务放到一个队列里面(q1队列),同时建立一个自己的队列(q_qn,q_xc)票务中心监控队列(q1),有任务就去处理,
处理完,把对应的结果放到对应的队列里面(q_qn,q_xc,),
去哪网,携程网,在往队列里面放任务时, 就把自己创建的队列带过去了,票务中心知道自己该往哪个队列里面放结果.
/
去哪网
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
credentials = pika.PlainCredentials("root", "123")
self.connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14', credentials=credentials))
self.channel = self.connection.channel()
# 随机生成一个消息队列(用于接收结果)
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
# 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
# 去哪网 给 票务中心 发送一个任务: 任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称
self.channel.basic_publish(exchange='',
routing_key='rpc_queue', # 票务中心接收任务的队列名称
properties=pika.BasicProperties(
reply_to = self.callback_queue, # 用于接收结果的队列
correlation_id = self.corr_id, # 任务ID
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return self.response
fibonacci_rpc = FibonacciRpcClient()
response = fibonacci_rpc.call(50)
print('返回结果:',response)
/
票务中心
import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# 票务中心监听任务队列
channel.queue_declare(queue='rpc_queue')
def on_request(ch, method, props, body):
n = int(body)
response = n + 100
# props.reply_to 要放结果的队列.
# props.correlation_id 任务
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
channel.start_consuming()
/
/
/
/