MQ的那些事儿——第二季

Wesley13
• 阅读 716

安装依赖版本一览

java:1.8.0_144

ActiveMQ:5.15.0

安装包地址:https://pan.baidu.com/s/1hss2ltq

完整demo下载:百度网盘CSDN

部署ActiveMQ

1. 加入你不想下载上面提供的地址,那么可以这么做(PS:java8环境必须先准备好)。

wget http://mirrors.hust.edu.cn/apache//activemq/5.15.0/apache-activemq-5.15.0-bin.tar.gz
tar zxf apache-activemq-5.15.0-bin.tar.gz

# 启动
cd [activemq_install_dir]/bin
./activemq console
# daemon方式启动|停止
cd [activemq_install_dir]/bin
./activemq start|stop

2. 如果你下载了上面提供的地址,那么上传到服务器后,直接解压运行即可。

3. 控制台管理

浏览器输入:http://your\_host:8161/admin。用户名和密码默认为admin/admin

然后你就可以看到ActiveMQ的管理界面了。

MQ的那些事儿——第二季

基本上,你所需要关心的主要在这三个tab。

  • Queues,这是生产者/消费者传送队列消息的地方。他的特点就是可以达到消费者的负载均衡和故障转移的目的。里面可以看到消费者列表。
  • Topics,这是发布者/订阅者通信的地方,每个订阅者都会收到一份消息的拷贝来进行消费。
  • Subscribers,在这里可以看到主题和其订阅者的列表。5.8版本看不到发布者和非持久化订阅的订阅者。。。

上述的三个tab会在之后进行叙述。

生产者/消费者开发

注意,这仅仅是个demo,开发方式基于SpringBoot

先前准备

1. 首先是Maven

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.7.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

生产者

启动类

/*****************************************************************
 * Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
 *****************************************************************/
package boot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 类描述:启动类.
 *
 * @author leon.
 */
@SpringBootApplication(scanBasePackages = "publisher")
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

生产者

src/publisher

/*****************************************************************
 * Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
 *****************************************************************/
package publisher;

import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * 类描述:发布者.
 *
 * @author leon.
 */
@Component
public class Publisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(Publisher.class);
    private static final String DEFAULT_MSG = "hello";
    private static final String DEFAULT_QUEUE = "test";
    private static final String EMPTY = "";

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    public String send(String queue, String msg) {
        if (isBlank(queue)) {
            queue = DEFAULT_QUEUE;
        }
        if (isBlank(msg)) {
            msg = DEFAULT_MSG;
        }
        jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queue), msg);
        return "SUCCESS";
    }

    private boolean isBlank(String str) {
        if (null == str || str.trim().equals(EMPTY)) {
            return true;
        } else {
            return false;
        }
    }

}

服务配置,application.yml

server.port: 8081

spring.activemq.broker-url: failover:(tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=3600000)
spring.activemq.user: admin
spring.activemq.password: admin
spring.activemq.in-memory: true
spring.activemq.pool.enabled: false

消息发送

/*****************************************************************
 * Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
 *****************************************************************/
package publisher;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

/**
 * 类描述:web.
 *
 * @author leon.
 */
@Controller
public class Web {

    @Autowired
    private Publisher publisher;

    @RequestMapping("/send")
    public void send(String queue, String msg) {
        publisher.send(queue, msg);
    }

}

通过web界面来发送消息即可

消费者

启动类

/*****************************************************************
 * Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
 *****************************************************************/
package boot;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 类描述:启动类.
 *
 * @author leon.
 */
@SpringBootApplication(scanBasePackages = "subscriber")
public class Application {

    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }



}

消费者

/*****************************************************************
 * Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
 *****************************************************************/
package subscriber;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 类描述:订阅者.
 *
 * @author leon.
 */
@Component
public class Subscriber {

    private static final Logger LOGGER = LoggerFactory.getLogger(Subscriber.class);
    private AtomicInteger cnt = new AtomicInteger();

    // #############################注解启动方式#############################
    @JmsListener(destination = "test", containerFactory = "myFactory")
    public String receive(Message message) throws JMSException {
        String msg = ((TextMessage) message).getText();
        LOGGER.info("get message: {}", msg);
        cnt.addAndGet(1);
        return msg;
    }

    public int getDealCnt() {
        return cnt.get();
    }


    // #############################Container#############################
    @Autowired
    private Environment env;
    @Autowired
    private CtpListener ctpListener;

    @Bean
    public ActiveMQConnectionFactory myFactory() {
        LOGGER.info(env.getProperty("spring.activemq.broker-url"));
        ActiveMQConnectionFactory myFactory = new ActiveMQConnectionFactory();
        myFactory.setBrokerURL(env.getProperty("spring.activemq.broker-url"));
        myFactory.setAlwaysSyncSend(false);
        myFactory.setUserName(env.getProperty("username"));
        myFactory.setPassword(env.getProperty("password"));
        return myFactory;
    }

    @Bean
    public DefaultMessageListenerContainer myContainer() {
        DefaultMessageListenerContainer myContainer = new DefaultMessageListenerContainer();
        myContainer.setConnectionFactory(myFactory());
        myContainer.setDestination(new ActiveMQQueue(env.getProperty("ctp.topic.queue")));
        myContainer.setMessageListener(ctpListener);
        myContainer.setSubscriptionDurable(true);
        myContainer.setDurableSubscriptionName("ctp_name_2");
        myContainer.setClientId("ctp_id_2");
        myContainer.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
        return myContainer;
    }

}

上述消费者提供了两种启动方式

1. 基于JmsListener注解的监听器方式

2. 基于DefaultMessageListenerContainer的启动方式

使用时候可以随意挑选一种。

如果使用第二种方式,需要制定监听器:

/*****************************************************************
 * Copyright (c) 2017 www.noryar.com Inc. All rights reserved.
 *****************************************************************/
package subscriber;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 订阅者.
 *
 * @author leon.
 */
@Component
public class CtpListener implements MessageListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(CtpListener.class);
    AtomicInteger cnt = new AtomicInteger();
    private LinkedBlockingQueue queue = new LinkedBlockingQueue<MyTask>();
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, queue);

    @Override
    public void onMessage(Message message) {
        executor.execute(new MyTask(message));
    }

    public int getCnt() {
        return cnt.get();
    }

    class MyTask implements Runnable {

        private Message message;

        public MyTask(Message message) {
            this.message = message;
        }

        @Override
        public void run() {
            String msg = ((MapMessage) message).toString();
            LOGGER.info("get message: {}", msg);
            cnt.addAndGet(1);
        }
    }
}

这里的监听器使用了异步的方式,当然也可以配置Container的并发消费数量来实现多线程消费。

服务配置,application.yml

server.port: 8283

spring.activemq.broker-url: failover:(tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=3600000)
spring.activemq.user: admin
spring.activemq.password: admin
spring.activemq.in-memory: true
spring.activemq.pool.enabled: false

username: system
password: manager
ctp.topic.queue: test.queue

这里监听的是test.queue

虚拟主题的配置

如果你想了解虚拟主题的用处,可以参考这里

ActiveMQ有两种方式来配置虚拟主题。

直接修改Topic名称法

这种方法的优点在于不需要修改ActiveMQ的配置,只需要发布者和订阅者协定好Topic名称即可。

缺点是如果发布者和订阅者已经存在,现在想使用虚拟主题,且可能发布者或订阅者不在一个平台内,那么就需要双方统一进行升级或者回滚,比较麻烦。

配置方式:

1. 虚拟主题的名称需要以『VirtualTopic.』这样的前缀来命名。比如说我们有一个topic名称为orders,那么就需要把这个主题命名为VirtualTopic.orders。然后生发布者将消息发布到VirtualTopic.orders这个主题中。订阅者需要以『Consumer.*.VirtualTopic.orders』这样的方式进行订阅,注意需要订阅的是队列。

拦截器法

这种方法的优点在于发布者和不需要改造的订阅者不需要做任何变动,需要改造的订阅者使用虚拟主题的方式进行订阅即可达到负载均衡和故障转移的目的。

缺点也明显,需要修改ActiveMQ的配置,也就是说需要重启ActiveMQ,这可能导致消息丢失。还有一个就是订阅者的上线和回滚也会比较麻烦。后面会介绍我们在上线时候采用的方案。

需要在ActiveMQ的配置文件中进行修改:

<active_mq>/conf/activemq.xml

<destinationInterceptors> 
    <virtualDestinationInterceptor> 
        <virtualDestinations> 
            <virtualTopic name="orders" prefix="VirtualTopicConsumers.*." selectorAware="false"/>    
        </virtualDestinations>
    </virtualDestinationInterceptor> 
</destinationInterceptors>

这个时候发布者只需要往orders主题上发布消息。订阅者通过订阅VirtualTopicConsumers.A.orders的队列的方式来消费就可以了。

假如你不需要负载均衡和故障转移(或者系统自己已经实现了),那么你仍然可以订阅orders主题来进行消费即可。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
HIVE 时间操作函数
日期函数UNIX时间戳转日期函数: from\_unixtime语法:   from\_unixtime(bigint unixtime\, string format\)返回值: string说明: 转化UNIX时间戳(从19700101 00:00:00 UTC到指定时间的秒数)到当前时区的时间格式举例:hive   selec
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这