Spring Boot实践

Stella981
• 阅读 827

一. 认识JMS

1.1 概述

对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

简短来说,JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。

JMS1.0是jsr 194里规定的规范(关于jsr规范,请点击)。目前最新的规范是JSR 343,JMS2.0。

好了,说了这么多,其实只是在说,JMS只是sun公司为了统一厂商的接口规范,而定义出的一组api接口。

1.2 JMS体系结构

描述如下:

  • JMS提供者(JMS的实现者,比如activemq jbossmq等)
  • JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
  • JMS生产者,JMS消费者(生产者及负责创建并发送消息的客户,消费者是负责接收并处理消息的客户)
  • JMS消息(在JMS客户之间传递数据的对象)
  • JMS队列(一个容纳那些被发送的等待阅读的消息的区域)
  • JMS主题(一种支持发送消息给多个订阅者的机制)

1.3. JMS对象模型

  • 连接工厂(connectionfactory)客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
  • JMS连接 表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
  • JMS会话 session 标识JMS客户端和服务端的会话状态。会话建立在JMS连接上,标识客户与服务器之间的一个会话进程。
  • JMS目的 Destinatio 又称为消息队列,是实际的消息源
  • 生产者和消费者
  • 消息类型,分为队列类型(优先先进先出)以及订阅类型

二. ActiveMQ安装和思路

2.1. ActiveMQ的安装

从官网下载安装包, http://activemq.apache.org/download.html 赋予运行权限 chmod +x,windows可以忽略此步 运行 ./active start | stop 启动后,activeMQ会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。这里有个问题是启动mq后,很长时间管理界面才可以显示出来。

==》启动MQ服务器:

根据操作系统不同,进入相应win64/win32位目录,双击activemq.bat启动MQ。

ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用 进入ActionMQ服务监控地址:浏览器输入http://127.0.0.1:8161/admin

2.2 整合思路

Spring最厉害的地方就是它的Bean了,还有它特有的IOC(控制反转)和AOP(面向切面编程)技术。有了这些,我们就可以不用new关键字构造对象,同时,可以方便地使用注入往类中的属性进行初始化。如果你编写过ActiveMQ之类的JMS应用程序,无论对于消息的生产者还是消费者,最重要的接口有以下两个: 1.ConnectionFactory 2.Destination

ConnectionFactory是一切的基础,有了它才有了Connection,然后才有Session,只有通过Session对象,我们才能创建消息队列、构建生产者/消费者,继而发送/接收消息。 Destination是一切的归宿,它就像总线一样,生产者发出消息要发到它上面,消费者取消息也要从这上面取。

试想,如果这一切都能借助Spring强大的Bean管理的话,我们在编写程序的时候会更加的方便简洁。幸运的是,ActiveMQ官方提供了完美的Spring框架支持,一切只需要在xml文件中配置即可~

Spring官方提供了一个叫JmsTemplate的类,这个类就专门用来处理JMS的,在该类的Bean配置标签中有两个属性connectionFactory-ref和defaultDestination-ref正好对应JMS中的ConnectionFactory和Destination,如果你有兴趣查看源码的话,就可以发现JmsTemplate帮我们做了大量创建的工作,我们只需要用它来进行收发信息就ok了,而ActiveMQ官方也提供了对应的实现包。

三、Spring 整合

3.1 xml配置方式一

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/jms" xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd    
        http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd 
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"></bean>
    <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:targetConnectionFactory-ref="jmsConnectionFactory" p:sessionCacheSize="10"></bean>
    <bean id="queue_test" class="org.apache.activemq.command.ActiveMQQueue">
        <!--消息队列名称 -->
        <constructor-arg value="queue_test" />
    </bean>
    <!-- Spring JMS Template -->
    <!-- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="cachedConnectionFactory" p:defaultDestination-ref="queue_test"></bean> -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="jmsConnectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
        </property>
        <!-- Session.AUTO_ACKNOWLEDGE 1消息自动签收 Session.CLIENT_ACKNOWLEDGE 2客户端调用acknowledge方法手动签收 Session.DUPS_OK_ACKNOWLEDGE 3不必必须签收,消息可能会重复发送 -->
        <property name="sessionAcknowledgeMode" value="2" />
    </bean>

    <jms:listener-container container-type="default" connection-factory="jmsConnectionFactory" acknowledge="auto">
        <jms:listener destination="queue_test" ref="simpleMsgListener" method="onMessage"></jms:listener>
    </jms:listener-container>
</beans>

3.2 xml配置方式二:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:util="http://www.springframework.org/schema/util"
    xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.5.xsd
        ">

    <context:annotation-config />
    <context:component-scan base-package="com.troy" />

    <!-- 读取配置文件 -->
    <bean id="propertyPlaceholderConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <array>
                <value>classpath:conf/spring-config.properties</value>
            </array>
        </property>
    </bean>

    <!-- 连接 activemq -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq_url}" userName="${activemq_username}" password="${activemq_password}" />

    <!-- 这里可以采用连接池的方式连接PooledConnectionFactoryBean -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 配置连接 -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory" />
        <!-- 会话的最大连接数 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定义消息队列topic类型,queue的方式差不多 -->
    <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
        <!-- 定义名称 -->
        <constructor-arg index="0" value="topic" />
    </bean>

    <!-- 配置JMS模板(topic),Spring提供的JMS工具类,它发送、接收消息。 -->
    <!-- 为了测试发送消息,保留jmsTemplate的配置,实际不存在发送,只需要配置监听即可 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="topic" />
        <!-- 非pub/sub模型(发布/订阅),true为topic,false为queue -->
        <property name="pubSubDomain" value="true" />
    </bean>

    <!-- 监听方式,这种方式更实用,可以一直监听消息 -->
    <bean id="topicMessageListen" class="com.troy.activemq.TopicMessageListen" />
    <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <!-- 注册activemq名称 -->
        <property name="destination" ref="topic" />
        <property name="messageListener" ref="topicMessageListen" />
    </bean>

</beans>   
        



import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;
@Component(value = "simpleMsgListener")
public class SimpleMsgListener implements MessageListener {
    //收到信息时的动作
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("收到的信息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

四、spring boot配置

4.1、引入依赖配置

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>

4.2 application.properties配置

# ActiveMQ--------------------------------------------------------------------------------------------------------------
# Specify if the default broker URL should be in memory. Ignored if an explicit broker has been specified.
#spring.activemq.in-memory=false
# URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
spring.activemq.broker-url=tcp://127.0.0.1:61616
# Login user of the broker.
spring.activemq.user=admin
# Login password of the broker.
spring.activemq.password=admin
# Trust all packages.
#spring.activemq.packages.trust-all=false
# Comma-separated list of specific packages to trust (when not trusting all packages).
#spring.activemq.packages.trusted=
# See PooledConnectionFactory.
#spring.activemq.pool.configuration.*=
# Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory.
spring.activemq.pool.enabled=true
# Maximum number of pooled connections.
spring.activemq.pool.max-connections=50
# Connection expiration timeout in milliseconds.
spring.activemq.pool.expiry-timeout=10000
# Connection idle timeout in milliseconds.
spring.activemq.pool.idle-timeout=30000
spring.jms.pub-sub-domain=false

4.3 boot类配置

import javax.jms.Queue;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration
public class ActiveMQConfig {
    @Value("${queueName}")
    private String queueName;

    @Value("${topicName}")
    private String topicName;

    @Value("${spring.activemq.user}")
    private String usrName;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;
    
    /**
     * <p>@describe: 配置队列 </p>
     * <p>@author: whh </p>
     * <p>@date: 2019年8月9日 下午5:56:22</p>
     * @return
     */
    @Bean
    public Queue queue() {
        return new ActiveMQQueue(queueName);
    }
    
    /**
     * <p>@describe:配置主题  </p>
     * <p>@author: whh </p>
     * <p>@date: 2019年8月9日 下午5:56:40</p>
     * @return
     */
    @Bean
    public Topic topic() {
        return new ActiveMQTopic(topicName);
    }
    
    /**
     * <p>@describe:连接工厂 </p>
     * <p>@author: whh </p>
     * <p>@date: 2019年8月9日 下午5:57:14</p>
     * @return
     */
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    }
    
    /**
     * <p>@describe: 监听队列bean </p>
     * <p>@author: whh </p>
     * <p>@date: 2019年8月9日 下午5:57:43</p>
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
    
    /**
     * <p>@describe:监听主题 bean  </p>
     * <p>@author: whh </p>
     * <p>@date: 2019年8月9日 下午5:58:17</p>
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //设置为发布订阅方式, 默认情况下使用的生产消费者方式
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }
}

五、发送者和接收者使用

5.1 发送者

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;


@Service
public class JmsSendDemo {
    private  static final Logger LOGGER = LoggerFactory.getLogger(JmsSendDemo.class);    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    public void execute(){
        LOGGER.debug("执行定时任务-----------------直接违规数据定时:下发至医院端首页进行展示-----------------");

        Destination destination = (Destination) AppContext.getBean("queue_test");
        jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello"));
        //启动另一个方法
        boolean flag=true;
        LOGGER.debug("执行定时任务结束------------------------------");
    }
    
    
    public void sendMessage(Destination destination,final String message) {
        LOGGER.debug("发送消息到AMQ,消息内容为:"+message);
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

5.1 接收者

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    @JmsListener(destination = "mailbox", containerFactory = "myFactory")
    public void receiveMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
ActiveMQ的使用
ActiveMQ是Apache出品的开源消息总线。完全支持JMS1.1规范首先我们要了解一下JMSJMS简介Java消息服务(JavaMessageService,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异
Wesley13 Wesley13
3年前
JavaAPI
1.ActiveMQ是什么ActiveMQ是一个消息队列应用服务器(推送服务器)。支持JMS规范。1.1JMS概述全称:JavaMessageService,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)实现了JMS标准的系统,称之
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
JMS是一种应用于异步消息传递的标准API
JMS是一种应用于异步消息传递的标准API,作为Java平台的一部分,JMS可以允许不同应用、不同模块之间实现可靠、异步数据通信。一些概念JMS provider    An implementation of the JMS interface for a Message Oriented Middleware (MOM
Wesley13 Wesley13
3年前
Java消息服务JMS详解
JMS:Java消息服务(JavaMessageService)JMS是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。JMS的编程过程很简单,概括为:应用程序A发送一条消息到消息服务器的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应
Stella981 Stella981
3年前
JMS(Java消息服务)与消息队列ActiveMQ基本使用(一)
最近的项目中用到了mq,之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下,把自己所有看下来的文档和了解总结一下。一.认识JMS1.概述对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(JavaMessageService)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这