[toc]
一:入门
1.安装Erlang
2.安装RabbitMQ
3.配置
激活 RabbitMQ's Management Plugin
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>rabbitmq-plugins.bat enable rabbitmq
Error: The following plugins could not be found:
rabbitmq
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>rabbitmq-plugins.bat enable rabbitmq_management
The following plugins have been enabled:
amqp_client
cowlib
cowboy
rabbitmq_web_dispatch
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit@DESKTOP-BugYang... started 6 plugins.
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>net stop RabbitMQ && net start RabbitMQ
RabbitMQ 服务正在停止......
RabbitMQ 服务已成功停止。
RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>rabbitmqctl.bat list_users
Listing users
guest [administrator]
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.12\sbin>
http://localhost:15672/
账号密码:guest
4.下载maven
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
5.创建发送者
public class Send {
//队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException, TimeoutException {
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "hello world!";
//往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭频道和连接
channel.close();
connection.close();
}
}
打印
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[x] Sent 'hello world!'
Process finished with exit code 0
6.创建接受者
public class Rec {
//队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
打印
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hello world!'
二:工作队列
1.发送消息
public class NewTask
{
//队列名称
private final static String QUEUE_NAME = "workqueue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连工厂
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//创建连接
Connection connection = factory.newConnection();
//创建队列
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送10条消息,依次在消息后面附加1-10个点
for (int i = 0; i < 10; i++)
{
String dots = "";
for (int j = 0; j <= i; j++)
{
dots += ".";
}
//拼数据
String message = "helloworld" + dots+dots.length();
//推送到rabbitmq中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//推送完成,打印结束语句
System.out.println(" [x] Sent '" + message + "'");
}
//关闭队列
channel.close();
//关闭消息
connection.close();
}
}
[x] Sent 'helloworld.1'
[x] Sent 'helloworld..2'
[x] Sent 'helloworld...3'
[x] Sent 'helloworld....4'
[x] Sent 'helloworld.....5'
[x] Sent 'helloworld......6'
[x] Sent 'helloworld.......7'
[x] Sent 'helloworld........8'
[x] Sent 'helloworld.........9'
[x] Sent 'helloworld..........10'
2.接收消息
运行两个Work类
public class Work
{
//队列名称
private final static String QUEUE_NAME = "workqueue";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
//区分不同工作进程的输出
int hashCode = Work.class.hashCode();
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//创建连接
Connection connection = factory.newConnection();
//创建队列
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(hashCode
+ " [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消费队列
//关闭应答机制,会丢失消息
channel.basicConsume(QUEUE_NAME, true, consumer);
//打开应答机制,不会丢失消息
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(hashCode + " [x] Received '" + message + "'");
// doWork(message);
System.out.println(hashCode + " [x] Done");
}
}
/**
* 每个点耗时1s
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException
{
for (char ch : task.toCharArray())
{
if (ch == '.')
Thread.sleep(1000);
}
}
}
746292446 [x] Received 'helloworld.1'
746292446 [x] Done
746292446 [x] Received 'helloworld...3'
746292446 [x] Done
746292446 [x] Received 'helloworld.....5'
746292446 [x] Done
746292446 [x] Received 'helloworld.......7'
746292446 [x] Done
746292446 [x] Received 'helloworld.........9'
746292446 [x] Done
242131142 [x] Received 'helloworld..2'
242131142 [x] Done
242131142 [x] Received 'helloworld....4'
242131142 [x] Done
242131142 [x] Received 'helloworld......6'
242131142 [x] Done
242131142 [x] Received 'helloworld........8'
242131142 [x] Done
242131142 [x] Received 'helloworld..........10'
242131142 [x] Done
可以看到,默认的,RabbitMQ会一个一个的发送信息给下一个消费者(consumer),而不考虑每个任务的时长等等,且是一次性分配,并非一个一个分配。平均的每个消费者将会获得相等数量的消息。这样分发消息的方式叫做round-robin。
3.消息应答(message acknowledgments)
我们首先开启两个任务,然后执行发送任务的代码(NewTask.java),然后立即关闭第二个任务,两个加起来打印出来的数据会有缺失
一旦RabbItMQ交付了一个信息给消费者,会马上从内存中移除这个信息。在这种情况下,如果杀死正在执行任务的某个工作者,我们会丢失它正在处理的信息。我们也会丢失已经转发给这个工作者且它还未执行的消息。
为了保证消息永远不会丢失,RabbitMQ支持消息应答(message acknowledgments)。
- 消费者发送应答给RabbitMQ,告诉它信息已经被接收和处理,然后RabbitMQ可以自由的进行信息删除。
- 如果消费者被杀死而没有发送应答,RabbitMQ会认为该信息没有被完全的处理,然后将会重新转发给别的消费者。通过这种方式,你可以确认信息不会被丢失,即使消者偶尔被杀死。
- 这种机制并没有超时时间这么一说,RabbitMQ只有在消费者连接断开是重新转发此信息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。
消息应答默认是打开的。上面的代码中我们通过显示的设置autoAsk=true关闭了这种机制。
boolean ack = false ; //打开应答机制
channel.basicConsume(QUEUE_NAME, ack, consumer);
//另外需要在每次处理完成一个消息后,手动发送一次应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
public class Work
{
//队列名称
private final static String QUEUE_NAME = "workqueue";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
//区分不同工作进程的输出
int hashCode = Work.class.hashCode();
//创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(hashCode
+ " [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消费队列
boolean ack = false ; //打开应答机制
channel.basicConsume(QUEUE_NAME, ack, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(hashCode + " [x] Received '" + message + "'");
doWork(message);
System.out.println(hashCode + " [x] Done");
//发送应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
4.消息持久化(Message durability)
我们已经学习了即使消费者被杀死,消息也不会被丢失。但是如果此时RabbitMQ服务被停止,我们的消息仍然会丢失
当RabbitMQ退出或者异常退出,将会丢失所有的队列和信息,除非你告诉它不要丢失。
我们需要做两件事来确保信息不会被丢失:我们需要给所有的队列
和消息
设置持久化的标志。
第一, 我们需要确认RabbitMQ永远不会丢失我们的队列。为了这样,我们需要声明它为持久化的。
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
注:RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性。
第二, 我们需要标识我们的信息为持久化的。通过设置MessageProperties(implements BasicProperties)值为PERSISTENT_TEXT_PLAIN。
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
现在你可以执行一个发送消息的程序,然后关闭服务,再重新启动服务,运行消费者程序做下实验。
5.公平转发(Fair dispatch)
对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。
造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
public class NewTask
{
// 队列名称
private final static String QUEUE_NAME = "workqueue_persistence";
public static void main(String[] args) throws IOException
{
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
boolean durable = true;// 1、设置队列持久化
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 发送10条消息,依次在消息后面附加1-10个点
for (int i = 5; i > 0; i--)
{
String dots = "";
for (int j = 0; j <= i; j++)
{
dots += ".";
}
String message = "helloworld" + dots + dots.length();
// MessageProperties 2、设置消息持久化
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
// 关闭频道和资源
channel.close();
connection.close();
}
}
public class Work
{
// 队列名称
private final static String QUEUE_NAME = "workqueue_persistence";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
// 区分不同工作进程的输出
int hashCode = Work.class.hashCode();
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println(hashCode
+ " [*] Waiting for messages. To exit press CTRL+C");
//设置最大服务转发消息数量
int prefetchCount = 1;
channel.basicQos(prefetchCount);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消费队列
boolean ack = false; // 打开应答机制
channel.basicConsume(QUEUE_NAME, ack, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(hashCode + " [x] Received '" + message + "'");
doWork(message);
System.out.println(hashCode + " [x] Done");
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
/**
* 每个点耗时1s
*
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException
{
for (char ch : task.toCharArray())
{
if (ch == '.')
Thread.sleep(1000);
}
}
}
三:发布/订阅
工作队列中的一个任务只会发给一个工作者
就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。
为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,
一类程序发动日志,另一类程序接收和处理日志。
我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。
1:转发器(Exchanges)
RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。
相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。
可用的转发器类型:
- Direct
- Topic
- Headers
- Fanout
声明转发器类型的代码:
channel.exchangeDeclare("logs","fanout");
fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的
2、匿名转发器(nameless exchange)
前面说到生产者只能发送消息给转发器(Exchange),但是我们前两篇博客中的例子并没有使用到转发器,我们仍然可以发送和接收消息。
这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:
channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。
现在我们可以指定消息发送到的转发器:
channel.basicPublish( "logs","", null, message.getBytes());
3、临时队列(Temporary queues)
前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。
不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事:
第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
Java中我们可以使用queueDeclare()
方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。
String queueName = channel.queueDeclare().getQueue();
一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似
4、绑定(Bindings)
我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。
channel.queueBind(queueName, “logs”, ””)
参数1:队列名称 ;参数2:转发器名称
5、完整的例子
1.创建发送器
public class EmitLog
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//创建连接
Connection connection = factory.newConnection();
//创建频道
Channel channel = connection.createChannel();
// 声明转发器和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
//创建发送的数据
String message = new Date().toLocaleString()+" : log something";
// 往转发器上发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
2.创建接收器,数据写进文件里
public class ReceiveLogsToSave
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//创建连接
Connection connection = factory.newConnection();
//创建频道
Channel channel = connection.createChannel();
// 声明转发器和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建一个非持久的、唯一的且自动删除的队列,临时队列
String queueName = channel.queueDeclare().getQueue();
// 为转发器指定队列,设置binding,绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
print2File(message);
}
}
private static void print2File(String msg)
{
try
{
String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
String logFileName = new SimpleDateFormat("yyyy-MM-dd")
.format(new Date());
File file = new File(dir, logFileName+".txt");
FileOutputStream fos = new FileOutputStream(file, true);
fos.write((msg + "\r\n").getBytes());
fos.flush();
fos.close();
} catch (FileNotFoundException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
3.创建接收器,打印出信息
public class ReceiveLogsToConsole
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//创建连接
Connection connection = factory.newConnection();
//创建频道
Channel channel = connection.createChannel();
// 声明转发器和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建一个非持久的、唯一的且自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 为转发器指定队列,设置binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
四:路由(Routing)
需求:本篇博客我们准备给日志系统添加新的特性,让日志接收者能够订阅部分消息。例如,我们可以仅仅将致命的错误写入日志文件,然而仍然在控制面板上打印出所有的其他类型的日志消息。
1、绑定(Bindings)
在上一篇博客中我们已经使用过绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定表示转发器与队列之间的关系。
我们也可以简单的认为:队列对该转发器上的消息感兴趣。
绑定可以附带一个额外的参数routingKey
。为了与避免basicPublish
方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key
)。下面展示如何使用绑定键(binding key)来创建一个绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。
2、直接转发(Direct exchange)
上一篇的日志系统广播所有的消息给所有的消费者。
现在想:可能希望把致命类型的错误写入硬盘,而不把硬盘空间浪费在警告或者消息类型的日志上。
之前我们使用fanout
类型的转发器,但是并没有给我们带来更多的灵活性:仅仅可以愚蠢的转发。
我们将会使用direct
类型的转发器进行替代。direct
类型的转发器背后的路由转发算法很简单:
消息会被推送至绑定键(binding key
)和消息发布附带的选择键(routing key
)完全匹配的队列。
图解:
img
我们可以看到direct类型的转发器与两个队列绑定。
第一个队列与绑定键orange绑定
第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。
这样的话,当一个消息附带一个选择键(routing key) orange发布至转发器将会被导向到队列Q1。消息附带一个选择键(routing key)black或者green将会被导向到Q2.所有的其他的消息将会被丢弃。
3、多重绑定(multiple bindings)
img
使用一个绑定键(binding key)绑定多个队列是完全合法的。如上图,一个附带选择键(routing key)的消息将会被转发到Q1和Q2。
4、发送日志(Emittinglogs)
我们将消息发送到direct
类型的转发器而不是fanout
类型。我们将把日志的严重性作为选择键(routing key
)。这样的话,接收程序可以根据严重性来选择接收。我们首先关注发送日志的代码:
像以前一样,我们需要先创建一个转发器:
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
然后我们准备发送一条消息:
channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());
为了简化代码,我们假定‘severity’是‘info’,‘warning’,‘error’中的一个。
5、订阅
接收消息的代码和前面的博客的中类似,只有一点不同:我们给我们所感兴趣的严重性类型的日志创建一个绑定。
StringqueueName = channel.queueDeclare().getQueue();
for(Stringseverity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
6、完整的实例
1.创建发送者
public class EmitLogDirect
{
private static final String EXCHANGE_NAME = "ex_logs_direct";
private static final String[] SEVERITIES = { "info", "warning", "error" };
public static void main(String[] argv) throws java.io.IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//创建连接
Connection connection = factory.newConnection();
//创建频道
Channel channel = connection.createChannel();
// 声明转发器的类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送6条消息
for (int i = 0; i < 6; i++)
{
String severity = getSeverity();
String message = severity + "_log :" + UUID.randomUUID().toString();
// 发布消息至转发器,指定routingkey
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
channel.close();
connection.close();
}
/**
* 随机产生一种日志类型
*
* @return
*/
private static String getSeverity()
{
Random random = new Random();
int ranVal = random.nextInt(3);
return SEVERITIES[ranVal];
}
}
2.创建接受者
public class ReceiveLogsDirect
{
private static final String EXCHANGE_NAME = "ex_logs_direct";
private static final String[] SEVERITIES = { "info", "warning", "error" };
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置ip
factory.setHost("localhost");
//创建连接
Connection connection = factory.newConnection();
//创建频道
Channel channel = connection.createChannel();
// 声明direct类型转发器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 创建一个非持久的、唯一的且自动删除的队列,临时队列
String queueName = channel.queueDeclare().getQueue();
String severity = getSeverity();
// 指定binding_key
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println(" [*] Waiting for "+severity+" logs. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
/**
* 随机产生一种日志类型
*
* @return
*/
private static String getSeverity()
{
Random random = new Random();
int ranVal = random.nextInt(3);
return SEVERITIES[ranVal];
}
}
3.总结:
发送消息时可以设置routing_key,接收队列与转发器间可以设置binding_key,接收者接收与binding_key与routing_key相同的消息。
五:主题
1、 主题转发(Topic Exchange)
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
2.
3.完整例子
1.发送
public class EmitLogTopic
{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
{
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//指定topic的转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String[] routing_keys = new String[] { "kernal.info", "cron.warning","auth.info", "kernel.critical" };
for (String routing_key : routing_keys)
{
String msg = UUID.randomUUID().toString();
channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg.getBytes());
System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
}
channel.close();
connection.close();
}
}
2.接收1
public class ReceiveLogsTopicForCritical
{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
{
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明topic转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
// 接收所有与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey + ",msg = " + message + ".");
}
}
}
2.接收2
public class ReceiveLogsTopicForKernel
{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception
{
// 创建连接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明topic转发器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 随机生成一个队列
String queueName = channel.queueDeclare().getQueue();
//接收所有与kernel相关的消息
channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received routingKey = " + routingKey + ",msg = " + message + ".");
}
}
}
六:参考
http://blog.csdn.net/lmj623565791/article/details/37706355
http://blog.csdn.net/lmj623565791/article/details/37669573
http://blog.csdn.net/lmj623565791/article/details/37657225