发送消息:
1 package org.study.workfair;
2
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection;
5 import org.junit.Test;
6 import org.study.utils.ConnectionUtils;
7
8 import java.io.IOException;
9 import java.util.concurrent.TimeoutException;
10
11 public class Sender {
12 public static final String QUEUE_NAME = "test_simple_queue";
13
14 @Test
15 public void send() throws IOException, TimeoutException, InterruptedException {
16 // 获取连接
17 Connection conn = ConnectionUtils.getConnection();
18 // 获取通道
19 Channel channel = conn.createChannel();
20 //创建队列
21 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
22 //每个消费者发送确认消息前,只发送一条消息
23 channel.basicQos(1);
24 String msg = "hello rabbitmq!";
25
26 for (int i = 0; i < 50; i++) {
27 String tempStr = i + " " + msg;
28 //发送消息
29 channel.basicPublish("", QUEUE_NAME, null, tempStr.getBytes());
30 System.out.println("[send] msg " + i + ": " + msg);
31 Thread.sleep(100);
32 }
33
34 channel.close();
35 conn.close();
36 }
37 }
接受消息:
1 package org.study.workfair;
2
3 import com.rabbitmq.client.*;
4 import org.junit.Test;
5 import org.study.utils.ConnectionUtils;
6
7 import java.io.IOException;
8 import java.util.concurrent.TimeoutException;
9
10 public class Recv {
11 public static final String QUEUE_NAME = "test_simple_queue";
12
13 @Test
14 public void recv() throws IOException, TimeoutException, InterruptedException {
15 Connection conn = ConnectionUtils.getConnection();
16 Channel channel = conn.createChannel();
17 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
18 channel.basicQos(1);
19
20 //定义消费者
21 DefaultConsumer consumer = new DefaultConsumer(channel) {
22 //重写获取到达消息
23 @Override
24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25 // super.handleDelivery(consumerTag, envelope, properties, body);
26 String msg = new String(body, "utf-8");
27 System.out.println("[1] recv: " + msg);
28
29 try {
30 Thread.sleep(1000);
31 } catch (InterruptedException e) {
32 e.printStackTrace();
33 }finally {
34 System.out.println("[1] done!");
35 channel.basicAck(envelope.getDeliveryTag(),false);
36 }
37 }
38 };
39
40 while (true) {
41 //监听队列
42 channel.basicConsume(QUEUE_NAME, false, consumer);
43 Thread.sleep(100);
44 }
45
46
47 }
48 }