#1 前言 前一篇介绍了JMS有两种通信模型,一种是点对点通信,另一种是发布/订阅模型,本篇将会继续探讨这两种模型。本篇文章需要按照严谨的实验顺序才能获得相同的结果,这是因为消息持久化和持久订阅这两个特性的原因,在文章结尾和下一篇文章会做解答。** 所有的实验在启动之前都必须到管理后台删除相关的队列或者topic,否则数据也可能不同 **
在 ActiveMQ-5.11.x版本单机安装 中,由于设置了JMS通信端口的密码,所以下文在建立连接工厂时,都使用了密码。
为了方便,提取了一个专门创建connection的管理类。
public class ActiveMQManager {
private static ConnectionFactory connectionFactory;
static {
// connectionFactory = new ActiveMQConnectionFactory("admin", "123456", "tcp://192.168.88.18:61616");
connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
}
public static Connection createConnection() throws JMSException {
return connectionFactory.createConnection();
}
}
OK,下面开始正题吧。
#2 点对点模型 本节将实现消费者通过队列"test-queue"给消费者发布消息,消费者以异步方式获取消息并打印消息。先把消息生产者和消费者的代码贴一下。
##2.1 实验代码 ** 消息生产者 **:
public class Producer {
public static final String QUEUE_NAME = "test-queue";
public static void main(String[] args) {
System.out.println("Producer started!");
String message_body = "消息 : " + System.currentTimeMillis();
try {
//获取连接
Connection connection = ActiveMQManager.createConnection();
//启动连接
connection.start();
//开启会话,第一个参数指定是否使用事务,第二个参数指示消费者是否需要手动应答自己已经接收到消息
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//建立队列
Queue queue = session.createQueue(QUEUE_NAME);
//获取消息生产者对象
MessageProducer producer = session.createProducer(queue);
//建立消息对象
Message message = session.createTextMessage(message_body);
//发送消息
producer.send(message);
System.out.println("成功发送消息:" + message_body);
producer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Producer end!");
}
}
** 消息消费者 **:
public class Consumer {
public static void main(String[] args) throws IOException {
System.out.println("Consumer started!");
try {
//获取连接
Connection connection = ActiveMQManager.createConnection();
//启动连接
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立队列,与消息生产者使用的队列名一致
Queue queue = session.createQueue(Producer.QUEUE_NAME);
//建立消费者
MessageConsumer consumer = session.createConsumer(queue);
//监听消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Consumer 获取消息 ---->" + text);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 线程一直等待
System.in.read();
consumer.close();
session.close();
connection.stop();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer end!");
}
}
2.2 启动程序和小结论
** (1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动消息消费者 **
消息可以被消费者正常接收。
** (2)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动消息消费者 ** 消息可以被消费者正常接收,通过查看消息内容,可以知道该消息是停止MQ服务之前发送的。
** (3)顺序:启动MQ broker服务 -> 启动消息消费者 -> 启动消息发布者 ** 消息可以被消费者正常接收。
看到这里,大家可能会感到奇怪,为什么MQ服务重启后,之前发布的消息仍然可以被后面启动的消费者收到呢?这与activeMQ的持久化消息机制有关。下一篇会对此做解答。
#3 发布/订阅模型 本节将定义两个消息订阅者,它们共同订阅名叫"test-topic"这个主题,消息生产者发布消息后,两个消费者都能接收到同一个消息。
##2.1 实验代码 下面是生产者和订阅者的代码。
** 消息生产者 **
public class Producer {
public static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) {
System.out.println("Producer started!");
String message_body = "消息 : " + System.currentTimeMillis();
try {
//获取连接
Connection connection = ActiveMQManager.createConnection();
//启动连接
connection.start();
//开启会话,第一个参数指定是否使用事务,第二个参数指示消费者是否需要手动应答自己已经接收到消息
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//建立话题
Topic topic = session.createTopic(TOPIC_NAME);
//获取消息生产者对象
MessageProducer producer = session.createProducer(topic);
//建立消息对象
Message message = session.createTextMessage(message_body);
//发送消息
producer.send(message);
System.out.println("成功发送消息:" + message_body);
producer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Producer end!");
}
}
** 消息订阅者 ** :
public class Subscriber1 {
public static void main(String[] args) throws IOException {
System.out.println("Subscriber1 started!");
try {
//获取连接
Connection connection = ActiveMQManager.createConnection();
//启动连接
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立话题
Topic topic = session.createTopic(Producer.TOPIC_NAME);
//建立消费者
MessageConsumer consumer = session.createConsumer(topic);
//监听消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Subscriber1 获取消息 ---->" + text);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 线程一直等待
System.in.read();
consumer.close();
session.close();
connection.stop();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Subscriber1 end!");
}
}
3.2 启动程序和小结论
** (1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动两个消息订阅者 **
消息订阅者没有接收到任何消息。
** (2)顺序:启动MQ broker服务 -> 启动两个消息订阅者 -> 启动消息发布者 **
两个消息订阅者都可以被消费者正常接收。
** (3)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动两个消息订阅者 **
wtf!消息没有接收到,如果是重要的消息,那岂不是要哭死了。
对于第三条,由于MQ服务重启导致订阅者消息获取不到,activeMQ提供了解决方法,即持久主题订阅(durable topic subscription)机制,嗯,下一篇将会对该机制详细解读。