安装依赖版本一览
java:1.8.0_144
ActiveMQ:5.15.0
安装包地址:https://pan.baidu.com/s/1hss2ltq
部署ActiveMQ
1. 加入你不想下载上面提供的地址,那么可以这么做(PS:java8环境必须先准备好)。
wget http://mirrors.hust.edu.cn/apache//activemq/5.15.0/apache-activemq-5.15.0-bin.tar.gz
tar zxf apache-activemq-5.15.0-bin.tar.gz
# 启动
cd [activemq_install_dir]/bin
./activemq console
# daemon方式启动|停止
cd [activemq_install_dir]/bin
./activemq start|stop
2. 如果你下载了上面提供的地址,那么上传到服务器后,直接解压运行即可。
3. 控制台管理
浏览器输入:http://your\_host:8161/admin。用户名和密码默认为admin/admin
然后你就可以看到ActiveMQ的管理界面了。
基本上,你所需要关心的主要在这三个tab。
- Queues,这是生产者/消费者传送队列消息的地方。他的特点就是可以达到消费者的负载均衡和故障转移的目的。里面可以看到消费者列表。
- Topics,这是发布者/订阅者通信的地方,每个订阅者都会收到一份消息的拷贝来进行消费。
- Subscribers,在这里可以看到主题和其订阅者的列表。5.8版本看不到发布者和非持久化订阅的订阅者。。。
上述的三个tab会在之后进行叙述。
生产者/消费者开发
注意,这仅仅是个demo,开发方式基于SpringBoot
先前准备
1. 首先是Maven
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
生产者
启动类
/*****************************************************************
* Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
*****************************************************************/
package boot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 类描述:启动类.
*
* @author leon.
*/
@SpringBootApplication(scanBasePackages = "publisher")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
生产者
src/publisher
/*****************************************************************
* Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
*****************************************************************/
package publisher;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
/**
* 类描述:发布者.
*
* @author leon.
*/
@Component
public class Publisher {
private static final Logger LOGGER = LoggerFactory.getLogger(Publisher.class);
private static final String DEFAULT_MSG = "hello";
private static final String DEFAULT_QUEUE = "test";
private static final String EMPTY = "";
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public String send(String queue, String msg) {
if (isBlank(queue)) {
queue = DEFAULT_QUEUE;
}
if (isBlank(msg)) {
msg = DEFAULT_MSG;
}
jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queue), msg);
return "SUCCESS";
}
private boolean isBlank(String str) {
if (null == str || str.trim().equals(EMPTY)) {
return true;
} else {
return false;
}
}
}
服务配置,application.yml
server.port: 8081
spring.activemq.broker-url: failover:(tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=3600000)
spring.activemq.user: admin
spring.activemq.password: admin
spring.activemq.in-memory: true
spring.activemq.pool.enabled: false
消息发送
/*****************************************************************
* Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
*****************************************************************/
package publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
/**
* 类描述:web.
*
* @author leon.
*/
@Controller
public class Web {
@Autowired
private Publisher publisher;
@RequestMapping("/send")
public void send(String queue, String msg) {
publisher.send(queue, msg);
}
}
通过web界面来发送消息即可
消费者
启动类
/*****************************************************************
* Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
*****************************************************************/
package boot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 类描述:启动类.
*
* @author leon.
*/
@SpringBootApplication(scanBasePackages = "subscriber")
public class Application {
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
消费者
/*****************************************************************
* Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
*****************************************************************/
package subscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 类描述:订阅者.
*
* @author leon.
*/
@Component
public class Subscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(Subscriber.class);
private AtomicInteger cnt = new AtomicInteger();
// #############################注解启动方式#############################
@JmsListener(destination = "test", containerFactory = "myFactory")
public String receive(Message message) throws JMSException {
String msg = ((TextMessage) message).getText();
LOGGER.info("get message: {}", msg);
cnt.addAndGet(1);
return msg;
}
public int getDealCnt() {
return cnt.get();
}
// #############################Container#############################
@Autowired
private Environment env;
@Autowired
private CtpListener ctpListener;
@Bean
public ActiveMQConnectionFactory myFactory() {
LOGGER.info(env.getProperty("spring.activemq.broker-url"));
ActiveMQConnectionFactory myFactory = new ActiveMQConnectionFactory();
myFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
myFactory.setAlwaysSyncSend(false);
myFactory.setUserName(env.getProperty("username"));
myFactory.setPassword(env.getProperty("password"));
return myFactory;
}
@Bean
public DefaultMessageListenerContainer myContainer() {
DefaultMessageListenerContainer myContainer = new DefaultMessageListenerContainer();
myContainer.setConnectionFactory(myFactory());
myContainer.setDestination(new ActiveMQQueue(env.getProperty("ctp.topic.queue")));
myContainer.setMessageListener(ctpListener);
myContainer.setSubscriptionDurable(true);
myContainer.setDurableSubscriptionName("ctp_name_2");
myContainer.setClientId("ctp_id_2");
myContainer.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
return myContainer;
}
}
上述消费者提供了两种启动方式
1. 基于JmsListener注解的监听器方式
2. 基于DefaultMessageListenerContainer的启动方式
使用时候可以随意挑选一种。
如果使用第二种方式,需要制定监听器:
/*****************************************************************
* Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
*****************************************************************/
package subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 订阅者.
*
* @author leon.
*/
@Component
public class CtpListener implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(CtpListener.class);
AtomicInteger cnt = new AtomicInteger();
private LinkedBlockingQueue queue = new LinkedBlockingQueue<MyTask>();
private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, queue);
@Override
public void onMessage(Message message) {
executor.execute(new MyTask(message));
}
public int getCnt() {
return cnt.get();
}
class MyTask implements Runnable {
private Message message;
public MyTask(Message message) {
this.message = message;
}
@Override
public void run() {
String msg = ((MapMessage) message).toString();
LOGGER.info("get message: {}", msg);
cnt.addAndGet(1);
}
}
}
这里的监听器使用了异步的方式,当然也可以配置Container的并发消费数量来实现多线程消费。
服务配置,application.yml
server.port: 8283
spring.activemq.broker-url: failover:(tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=3600000)
spring.activemq.user: admin
spring.activemq.password: admin
spring.activemq.in-memory: true
spring.activemq.pool.enabled: false
username: system
password: manager
ctp.topic.queue: test.queue
这里监听的是test.queue
虚拟主题的配置
如果你想了解虚拟主题的用处,可以参考这里。
ActiveMQ有两种方式来配置虚拟主题。
直接修改Topic名称法
这种方法的优点在于不需要修改ActiveMQ的配置,只需要发布者和订阅者协定好Topic名称即可。
缺点是如果发布者和订阅者已经存在,现在想使用虚拟主题,且可能发布者或订阅者不在一个平台内,那么就需要双方统一进行升级或者回滚,比较麻烦。
配置方式:
1. 虚拟主题的名称需要以『VirtualTopic.』这样的前缀来命名。比如说我们有一个topic名称为orders,那么就需要把这个主题命名为VirtualTopic.orders。然后生发布者将消息发布到VirtualTopic.orders这个主题中。订阅者需要以『Consumer.*.VirtualTopic.orders』这样的方式进行订阅,注意需要订阅的是队列。
拦截器法
这种方法的优点在于发布者和不需要改造的订阅者不需要做任何变动,需要改造的订阅者使用虚拟主题的方式进行订阅即可达到负载均衡和故障转移的目的。
缺点也明显,需要修改ActiveMQ的配置,也就是说需要重启ActiveMQ,这可能导致消息丢失。还有一个就是订阅者的上线和回滚也会比较麻烦。后面会介绍我们在上线时候采用的方案。
需要在ActiveMQ的配置文件中进行修改:
<active_mq>/conf/activemq.xml
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="orders" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
这个时候发布者只需要往orders主题上发布消息。订阅者通过订阅VirtualTopicConsumers.A.orders的队列的方式来消费就可以了。
假如你不需要负载均衡和故障转移(或者系统自己已经实现了),那么你仍然可以订阅orders主题来进行消费即可。