RabbitMQ详解(二)

Stella981
• 阅读 655

RabbitMQ详解(二)------消息通信的概念

消息通信,有很多种,邮箱 qq 微信 短信等,这些通信方式都有发送者,接受者,还有一个中间存储离线消息的容器.但是这些通信方式和RabbitMQ的通信模型是不一样的,比如邮件,邮件服务器基于_POP3/SMTP_协议,通信双方需要明确指定,并且发送的邮件内容有固定的结构.而RabbitMQ服务器基于AMQP协议,这个协议是不需要明确指定发送方和接受方的,而且发送的消息也没有固定的结构,甚至可直接存储二进制数据,并且和邮件服务器一样,也能存储离线消息,最关键的是_RabbitMQ_既能够以一对一的方式进行路由,还能够以一对多的方式进行广播.


生产者和消费者

在RabbitMQ的通信过程中,有两个主要的角色:生产者和消费者.类比于邮件通信的发送方和接收方.

首先我们需要明确RabbitMQ服务器是不能够产生数据的,它是——消息中间件,是一个用来传递消息的中间商.生产者产生创建消息,然后发布到代理服务器(RabbitMQ),而消费者则从代理服务器获取消息(不是直接找生产者要消息),而且在实际应用中,生产者和消费者也是可以角色互相转换的,所以当我们应用程序连接到RabbitMQ服务器时,必须要明确我是生产者还是消费者.


消息

生产者创建消息,然后发布到RabbitMQ服务器中.

这里的消息分为两部分:有效内容内容标签.

  1. 有效内容:可以是任何内容,一个数组,一个集合,甚至二进制数据都可以.RabbitMQ不会在意你发什么数据,尽管发就可以了.
  2. 内容标签:描述有效内容,是RabbitMQ用来决定谁将获得消息.前面说的邮件通信,必须明确指定发送方地址和收件方地址,而基于AMQP协议的RabbitMQ则是通过生产者发送消息附带的内容标签将消息发送给感兴趣的消费者.

一般来说生产者创建消息会设置标签,但是传输到消费者那里就没有标签了,除非在有效内容中说明谁是生产者,一般消费者是不知道谁产生的消息的.


信道

生产者产生了消息,然后发布到RabbitMQ服务器,发布之前肯定要先连接上服务器,也就是要在应用程序和RabbitMQ服务器之间建立一条TCP连接,一旦连接建立,应用程序就可以创建一条AMQP信道.

信道是建立在"真实的TCP"连接内的虚拟连接,AMQP命令都是通过信道发送出去的,每条信道都会被指派一个唯一的ID(AMQP库会帮你记住ID),不论是发布消息,订阅队列或者接受消息,这些动作都是通过信道来完成的.

为什么不直接通过TCP连接来发送AMQP命令呢?

这是因为效率问题,对于操作系统来说,每次建立和销毁TCP会话是非常昂贵的开销,而实际系统中,比如电商双十一,每秒钟高峰期成千上万条连接,一般来说操作系统建立TCP连接是有数量限制的,那这就会遇到瓶颈.

引入信道的概念,我们可以在一条TCP连接上创建N多个信道,这样既能发送命令,也能够保证每条信道的私密性,我们可以将其想象为光纤电缆.


交换器和队列

交换器和队列都是RabbitMQ服务器的一部分,我们知道生产者会将消息发送到RabbitMQ服务器,而进入该服务器后,首先进入交换器部分,然后交换器根据消息附带的内容标签,将消息绑定到相应的队列.什么是队列?

  1. 容纳消息的场所,生产者发送到RabbitMQ服务器的消息会在队列中等待消费者消费.
  2. 队列是RabbitMQ服务器的终点(除非消息进入了黑洞).
  3. 队列可以实现负载均衡,我们可以增加一堆消费者,然后让RabbitMQ以循环的方式来均匀的分配消息.

消息进入RabbitMQ服务器时,会首先将消息发送到交换器,然后交换器会根据特定的路由算法以及消息的内容标签将消息绑定到相应的队列.在AMQP协议中有四种交换器:direct、fanout、topic、headers,每种交换器都实现了不同的路由算法,这也对应RabbitMQ工作的几种不同工作方式.


虚拟主机

首先我们抛出一个问题,一个RabbitMQ肯定不是只服务一个应用程序,那么多个应用程序同时使用RabbitMQ服务器,如何保证彼此之间不会冲突?

答案就是虚拟主机,虚拟主机其实就是一个迷你版的RabbitMQ服务器,它拥有自己的交换器和队列,更重要的是虚拟主机拥有自己的权限机制,一个服务器能够创建多个虚拟主机.那么我们在使用RabbitMQ服务器的时候,只需要讲一个应用程序对应一个虚拟主机,这种各个实例间逻辑上的分离就能够保证不同的应用程序安全的传递信息.

默认的虚拟主机是"/".


简单实例

开启RabbitMQ服务,这里使用的是docker,具体安装可看上篇博文.

pom.xml

<!--RabbitMQ-client-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>

ConnectionUtil.java

package org.alva.Utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句话描述>,RabbitMQ的连接工具类
 * <详细介绍>,
 *
 */
public class ConnectionUtil {
    public static Connection getConnection(String host, int port, String vhost, String username, String password) throws IOException, TimeoutException {
        //1.定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置服务器地址
        connectionFactory.setHost(host);
        //3.设置端口
        connectionFactory.setPort(port);
        //4.设置虚拟主机,用户名,密码
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        //5.通过连接工厂获取连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

Producer.java

package org.alva.RabbitMQ;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.alva.Utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句话描述>,生产者
 * <详细介绍>,
 *
 */
public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.声明通道
        Channel channel = connection.createChannel();
        //3.声明(创建)队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4.定义消息内容
        String message = "hello rabbitmq";
        //5.发布消息
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("[x] send'"+message+"'");
        //6.关闭通道和连接
        channel.close();
        connection.close();

    }
}

Consumer.java

package org.alva.RabbitMQ;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.alva.Utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <一句话描述>,消费者
 * <详细介绍>,
 *
 */
public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.声明通道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4.定义队列的消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //5.监听队列
        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
        //6.获取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received '" + message + "'");
        }
    }
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这