RabbitMQ,Kafka与RPC原理,

Stella981
• 阅读 700

RabbitMQ,Kafka与RPC原理,

参考连接:

  https://www.rabbitmq.com/getstarted.html

rabbitmq默认端口:5672

 笔记整理:

# -*- coding: utf-8 -*-
# __author__ = "maple"
""""""

# 1. 你了解的消息队列
"""
    - Queue,将数据存储当前服务器的内存.
    - redis 列表,
    - rabbitMQ/kafka/zeroMQ(专业做消息队列)
    
    补充:saltstack
        - ssh:安装方便,但执行效率慢.
        - agent:执行效率高(基于消息队列zeroMQ做的RPC).
"""

# 2. 公司在什么情况下会用消息队列?
"""
    任务处理,请求数量太多,需要把消息临时放到某个地方.
    发布订阅,一旦发布消息,所有订阅者都会收到一条相同的消息.
    
    应用场景:
        - 长轮询
        - 智能玩具调用百度AI接口时,celery + RabbitMQ
        - 生产者&消费者
"""

# 3. rabbitMQ安装
"""
    服务端: 192.168.19.14
        安装配置epel源
           $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
        安装erlang
           $ yum -y install erlang
        安装RabbitMQ
           $ yum -y install rabbitmq-server
        启动(无用户名密码):
            service rabbitmq-server start/stop
            
        设置用户密码:
            sudo rabbitmqctl add_user xxx pwd  # 设置用户名跟密码
            # 设置用户为administrator角色
            sudo rabbitmqctl set_user_tags wupeiqi administrator
            # 设置权限
            sudo rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*" #此处的 * 代表对rabbitmq的所有队列都有权限.
            
            service rabbitmq-server start/stop
    客户端:
        pip3 install pika 
"""

# 4. 使用
"""
    生产者消费者
        n VS 1 
        n VS m
    发布订阅
        fanout,和exchange关联的所有队列都会接收到信息.
        direct,关键字精确匹配exchange关联的队列都会接收到信息.
        topic,关键字模糊匹配exchange关联的队列都会接收到信息.
"""

# 5. exchange是什么?
"""
    消息处理的重建建,可以帮助生成者将相关信息发送到指定相关队列.
"""

# 6. RPC
"""
    前戏:
        我   ->    去哪儿    ->      首都机场票务中心
    远程过程调用.
        我   ->    去哪儿    任务/结果      首都机场票务中心
    ...
"""

rabbitmq:在数据安全方面(保证数据不丢失比较擅长)
1.A向B发了一个请求,B把数据放到rabbitmq里了,然后B去rabbitmq里面取数据去处理,
还没有处理完B挂掉了,rabbitmq可以保证数据不丢.
2.A向B发了一个请求,B还没有把数据放到rabbitmq里了,B挂掉了,此时数据就会丢失,跟rabbitmq还没有关系呢;
3.A向B发了一个请求,B把数据放到rabbitmq里了,B还没有去rabbitmq取数据,此时rabbitmq挂掉了,此时数据不会丢失;

即:客户端挂掉或者服务端挂掉,数据都不会丢失的,

kafka:
在分布式以及消息的传递更快一些,消息的存储/发送要比rabbitmq快,

zeromq:集成在saltstack里面,

minion在监听队列,有数据的时候就去取,然后执行命令,执行结果放到另一个队列里面,

执行效率要比rabbitmq跟kafka都要高,

为什么要使用消息队列:
当前的服务器或者应用处理不了太多的数据,需要先把数据放到某个地方(队列),然后慢慢去处理.

代码: 生产者

# -*- coding: utf-8 -*-

import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14')) # 先建一个链接

# 有密码
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel() # 创建一个对象
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='duilie_1')

channel.basic_publish(exchange='',
                      routing_key='duilie_1', # 消息队列名称
                      body='msg7') # 发送的数据
connection.close() # 记得关闭链接

/

消费者:

# -*- coding: utf-8 -*-
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='s13q1')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(callback,queue='s13q1',no_ack=True)  # 有消息立即执行callback函数,没有消息就夯住了,

channel.start_consuming() # 开始进行消费.

 //

注意:

1.声明一个队列,如果存在就使用,不存在就创建一个;

有消息会立即执行函数,没有消息就会夯住了,

可以有多个生产者,多个消费者(代表多个人处理同一个队列)

默认处理队列,多个消费者的时候,是轮流来取数据的,

ack回复:带回复功能,针对客户端挂掉之后来做的.(客户端挂掉,数据的保留.服务端进行的数据保存)

ack就是来确保数据安全的,从服务端取走数据之后,如果开启回复的功能,需要等客户端回复,服务端才会把数据清掉,

  假如客户端取走数据之后,没来及处理,挂掉了,也就是没有回复服务端,服务端会保留数据的.

针对消费者进行的改动,生产者不用改动.import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='s13q2')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)
    # int('asdfasdf')
    ch.basic_ack(delivery_tag=method.delivery_tag) # 此行代码代表告诉服务端,数据已经取走了,只要执行到此,服务端的数据就清掉了.

channel.basic_consume(callback,queue='s13q2',no_ack=False) # 此处的no_ack=False,就是开启回复

channel.start_consuming()

//

durable持久化,服务端挂掉,数据的保留.

在生产者处进行的持久化,

生产者进行的改动,消费者不用改动.import pika
# 无密码
# connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14'))

# 有密码
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)-支持持久化
channel.queue_declare(queue='s13q3',durable=True) # 标红加粗是新加的参数.另外,一个队列如果建立时没有进行持久化,后续加参数是不管用.

channel.basic_publish(exchange='',
                      routing_key='s13q3', # 消息队列名称
                      body='msg7',
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))  # 标红的是新加的参数.
connection.close()

/

避免排队,充分利用消费者

生产者不用动,消费者进行修改.import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 声明一个队列(创建一个队列)
channel.queue_declare(queue='s13q1')

def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_qos(prefetch_count=1) # 此处是进行的配置,哪个闲置,哪个就进行额外的多工作.
channel.basic_consume(callback,queue='s13q1',no_ack=True)

channel.start_consuming()

 /

 发布订阅

发布订阅的时候,队列还是要随机生成的,

生成一个exchange(exchange只要创建了就不会再创建,),消费者脚本只要运行一个,就生成一个队列

消费者_订阅者

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m1',exchange_type='fanout') #exchange='m1',是随意起的名字(和生产者的要对应上) fanout就是所有队列都放一份任务,  代指的是消息传送的模式

# 随机生成一个队列,
result = channel.queue_declare(exclusive=True) # 随机的生成一个队列
queue_name = result.method.queue # 队列名称
# 让exchange和queue进行绑定. 
channel.queue_bind(exchange='m1',queue=queue_name)


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

/

生产者_发布者

import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m1',exchange_type='fanout')  # 指定 exchange

channel.basic_publish(exchange='m1',
                      routing_key='', # 此处routing_key是空的,
                      body='gb')

connection.close()

//

RabbitMQ,Kafka与RPC原理,

/

RabbitMQ,Kafka与RPC原理,

/

 带关键字的发布订阅(相当于指定对哪个队列进行发布)

 消费者_订阅者1

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m2',exchange_type='direct') # 注意exchange_type的类型要变一下

# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='test1') # 此处指定关键字,就是队列的名字
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='test2')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

/

消费者_订阅者2

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='test1')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

/

生产者_发布者

import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m2',exchange_type='direct')

channel.basic_publish(exchange='m2',
                      routing_key='test1', # 此处有哪个关键字, 绑定关键字的消费者就能拿到数据
                      body='xxxx')

connection.close()

/

 关键字的模糊匹配

 生产者_发布

import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='m3',exchange_type='topic') # 注意:exchange_type的类型又变了

channel.basic_publish(exchange='m3',
                      routing_key='old.alex.py', # 队列名称
                      body='x1')

connection.close()

/

消费者_订阅者_1

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m3',exchange_type='topic')

# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='old.*') # *是匹配一个单词,#是可以匹配多个单词的,


def callback(ch, method, properties, body):
    print(method.routing_key)
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

/

消费者_订阅者_2

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m3',exchange_type='topic')

# 随机生成一个队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='old.#')


def callback(ch, method, properties, body):
    print("消费者接受到了任务: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

/

 rabbitmq实现的rpc

rpc:远程过程调用

rpc:远程过程调用.

例子:
就是去哪网,携程网,都去票务中心订票,
他们把任务放到一个队列里面(q1队列),同时建立一个自己的队列(q_qn,q_xc)票务中心监控队列(q1),有任务就去处理,
处理完,把对应的结果放到对应的队列里面(q_qn,q_xc,),
去哪网,携程网,在往队列里面放任务时, 就把自己创建的队列带过去了,票务中心知道自己该往哪个队列里面放结果.

/

去哪网

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("root", "123")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14', credentials=credentials))
        self.channel = self.connection.channel()

        # 随机生成一个消息队列(用于接收结果)
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        # 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
        self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        # 去哪网 给 票务中心 发送一个任务:  任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue', # 票务中心接收任务的队列名称
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue, # 用于接收结果的队列
                                         correlation_id = self.corr_id, # 任务ID
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()

        return self.response

fibonacci_rpc = FibonacciRpcClient()

response = fibonacci_rpc.call(50)
print('返回结果:',response)

/

票务中心

import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.19.14',credentials=credentials))
channel = connection.channel()

# 票务中心监听任务队列
channel.queue_declare(queue='rpc_queue')

def on_request(ch, method, props, body):
    n = int(body)
    response = n + 100
    # props.reply_to  要放结果的队列.
    # props.correlation_id  任务
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
channel.start_consuming()

/

/

/

/

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这