Java消息服务JMS详解

Wesley13
• 阅读 768

JMS:

Java消息服务(Java Message Service

JMS是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。

JMS的编程过程很简单,概括为:应用程序A发送一条消息到消息服务器的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A和应用程序B没有直接的代码关连,所以两者实现了解偶。

消息驱动bean(message-driven bean)

它是专门用于异步处理java消息的组件.具有处理大量并发消息的能力.

何时使用JMS

在某些情况下,由于SessionBean方法的执行时间比较长这就需要异步地调用该方法,否则客户端就需要等待比较长的时间。要实现异步调用, 就需要使用消息驱动Bean。

消息驱动Bean的基本原理是客户端向消息服务器发送一条消息后,消息服务器会将该消息保存在消息队列中。在这时消 息服务器中的某个消费者(读取并处理消息的对象)会读取该消息,并进行处理。发送消息的客户端被称为消息生产者。

JMS中的消息

消息传递系统的中心就是消息。一条 Message 由三个部分组成:

头(header),属性(property)和主体(body)。

消息有下面几种类型,他们都是派生自 Message 接口

StreamMessage:一种主体中包含 Java 基元值流的消息。其填充和读取均按顺序进行。

MapMessage:一种主体中包含一组名-值对的消息。没有定义条目顺序。

TextMessage:一种主体中包含 Java 字符串的消息(例如,XML 消息)。

ObjectMessage:一种主体中包含序列化 Java 对象的消息。

BytesMessage:一种主体中包含连续字节流的消息

JMS消息详解

消息的传递模型

JMS 支持两种消息传递模型:点对点(point-to-point,简称 PTP)和发布/订阅(publish/subscribe,简称 pub/sub)。

这两种消息传递模型非常相似,但有以下区别:

PTP 消息传递模型规定了一条消息只能传递给一个接收方。 采用javax.jms.Queue 表示。

Pub/sub 消息传递模型允许一条消息传递给多个接收方。采用javax.jms.Topic表示

这两种模型都通过扩展公用基类来实现。例如:javax.jms.Queue 和javax.jms.Topic 都扩展自javax.jms.Destination 类。

配置目标地址

**开始JMS编程前,我们需要先配置消息到达的目标地址(Destination)**,因为只有目标地址存在了,我们才能发送消息到这个地址。由于每个应用服务器关于目标地址的配置方式都有所不同,下面以jboss为例,配置一个queue类型的目标地址。

Xml代码   Java消息服务JMS详解

  1. <server>
  2. <mbean code="org.jboss.mq.server.jmx.Queue"
  3. name="jboss.mq.destination:service=Queue,name=foshanshop">
  4. <attribute name="JNDIName">queue/foshanshop</attribute>
  5. <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
  6. </mbean>
  7. </server>

(项目中用的这个:

Xml代码   Java消息服务JMS详解

  1. <server>
  2. <mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=InstanceQueue" code="org.jboss.jms.server.destination.QueueService">
  3. <attribute name="JNDIName">/queues/InstanceQueue</attribute>
  4. <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  5. <depends>jboss.messaging:service=PostOffice</depends>
  6. </mbean>
  7. </server>

放在**server\default\deploy\**queues\InstanceQueue-service.xml 中)

模版可以在E:\jboss-5.1.0.GA-jdk6\jboss-5.1.0.GA\docs\examples\jms中的example-destinations-service.xml中找到。

Jboss使用一个XML文件配置队列地址,文件的取名格式应遵守*-service.xml

属性指定了该目标地址的全局JNDI名称。如果你不指定JNDIName属性,jboss会为你生成一个默认的全局JNDI,其名称由“queue”+“/”+目标地址名称组成。另外在任何队列或主题被部署之前,应用服务器必须先部署Destination Manager Mbean,所以我们通过节点声明这一依赖。

在java类中发送消息

一般发送消息有以下步骤:

(1) 得到一个JNDI初始化上下文(Context)

InitialContext ctx = new InitialContext();

(2) 根据上下文查找一个连接工厂 QueueConnectionFactory 。该连接工厂是由JMS提供的,不需我们自己创建,每个厂商都为它绑定了一个全局JNDI,我们通过它的全局JNDI便可获取它;

QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");

(3) 从连接工厂得到一个连接 QueueConnection

conn = factory.createQueueConnection();

(4) 通过连接来建立一个会话(Session);

session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

这句代码意思是:建立不需要事务的并且能自动确认消息已接收的会话。

(5) 查找目标地址

Destination destination = (Destination ) ctx.lookup("queue/foshanshop"); //上面配置的那个目标地址

(6) 根据会话以及目标地址来建立消息生产者MessageProducer (QueueSender和TopicPublisher都扩展自MessageProducer接口)

例子对应代码:

Java代码   Java消息服务JMS详解

  1. MessageProducer producer = session.createProducer(destination);
  2. TextMessage msg = session.createTextMessage("您好,这是我的第一个消息驱动Bean");
  3. producer.send(msg);

项目用:

JMS工厂和队列JNDI配在配置文件jmsqueue.properties里,内容如下:

Xml代码   Java消息服务JMS详解

  1. connectionFactoryName=ConnectionFactory
  2. queueName=/queues/InstanceQueue

Java代码   Java消息服务JMS详解

  1. /**

  2. * 目的:读取jmsqueue.properties中消息队列的信息,初始化消息队列,提供发送消息的函数

  3. *

  4. */

  5. public class MsgQueueSender {

  6. private static final Logger logger = LoggerFactory.getLogger(MsgQueueSender.class);

  7. private static final MsgQueueSender ms = new MsgQueueSender();

  8. private Properties info = new Properties();

  9. /**

  10. * jms

  11. */

  12. private QueueConnection conn;

  13. private Queue que;

  14. private MsgQueueSender() {

  15. initJMSInfo();

  16. initMsgQueue();

  17. }

  18. /**

  19. * 初始化jndi队列

  20. *

  21. */

  22. private void initMsgQueue() {

  23. InitialContext iniCtx;

  24. try {

  25. iniCtx = new InitialContext();

  26. Object tmp = iniCtx.lookup(info.getProperty("connectionFactoryName", "ConnectionFactory"));

  27. QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;

  28. conn = qcf.createQueueConnection();

  29. que = (Queue) iniCtx.lookup(info.getProperty("queueName", "/queues/InstanceQueue"));

  30. conn.start();

  31. } catch (Exception e) {

  32. logger.error("[MsgQueueSender.initMsgQueue] \u65e0\u6cd5\u8fde\u63a5\u5230\u6d88\u606f\u670d\u52a1\u5668\uff0c\u8bf7\u68c0\u67e5jms\u914d\u7f6e\u548c\u670d\u52a1\u7aef" + e.getMessage(),e);

  33. }

  34. }

  35. /**

  36. * 读取jms配置

  37. */

  38. private void initJMSInfo() {

  39. InputStream is = this.getClass().getClassLoader().getResourceAsStream("jmsqueue.properties");

  40. if (is != null) {

  41. try {

  42. info.load(is);

  43. } catch (IOException e) {

  44. logger.error("[MsgQueueSender.initJMSInfo] \u8bfb\u53d6jmsqueue.properties\u51fa\u9519\uff0c\u5c06\u4f7f\u7528\u9ed8\u8ba4\u914d\u7f6e " + e.getMessage(),e);

  45. }

  46. }

  47. }

  48. public static MsgQueueSender getInstance() {

  49. return ms;

  50. }

  51. public void sendTextMsg(String msg) throws JMSException{

  52. QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);

  53. session.createSender(que).send(session.createTextMessage(msg));

  54. session.close();

  55. }

  56. public void sendObjMsg(Serializable obj) throws JMSException{

  57. QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);

  58. session.createSender(que).send(session.createObjectMessage(obj));

  59. session.close();

  60. }

  61. }

采用消息驱动Bean (Message Driven Bean)接收消息

消息驱动Bean(MDB)是设计用来专门处理基于消息请求的组件。它和无状态Session Bean一样也使用了实例池技术,容器可以使用一定数量的bean实例并发处理成百上千个JMS消息。正因为MDB具有处理大量并发消息的能力,所以非常适合应用在一些消息网关产品。如果一个业务执行的时间很长,而执行结果无需实时向用户反馈时,也很适合使用MDB。如订单成功后给用户发送一封电子邮件或发送一条短信等。

一个MDB通常要实现MessageListener接口,该接口定义了onMessage()方法。Bean通过它来处理收到的JMS消息

Java代码   Java消息服务JMS详解

  1. package javax.jms;
  2. public interface MessageListener {
  3. public void onMessage(Message message);
  4. }

当容器检测到bean守候的目标地址有消息到达时,容器调用onMessage()方法,将消息作为参数传入MDB。MDB在onMessage()中决定如何处理该消息。你可以使用注释指定MDB监听哪一个目标地址(Destination)。当MDB部署时,容器将读取其中的配置信息。

Java代码   Java消息服务JMS详解

  1. @MessageDriven(activationConfig =

  2. {

  3. @ActivationConfigProperty(propertyName="destinationType",

  4. propertyValue="javax.jms.Queue"),

  5. @ActivationConfigProperty(propertyName="destination",

  6. propertyValue="queue/foshanshop"),

  7. @ActivationConfigProperty(propertyName="acknowledgeMode",

  8. propertyValue="Auto-acknowledge")

  9. })

  10. public class PrintBean implements MessageListener {

  11. public void onMessage(Message msg) {

  12. }

  13. }

项目中用:

META-INF下

ejb-jar.xml

Xml代码   Java消息服务JMS详解

  1. <ejb-jar xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xmlns="http://java.sun.com/xml/ns/javaee" xmlns:ejb="http://java.sun.com/xml/ns/javaee/ejb-jar\_3\_0.xsd"
  3. xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar\_3\_0.xsd"
  4. version="3.0">
  5. <display-name>msgreceiver</display-name>
  6. <enterprise-beans>
  7. <message-driven>
  8. <display-name>instanceMDB</display-name>
  9. <ejb-name>instanceMDB</ejb-name>
  10. <ejb-class>com.project.soa.msgreceiver.InstanceReceiver</ejb-class>
  11. <activation-config>
  12. <activation-config-property>
  13. <activation-config-property-name>destinationType</activation-config-property-name>
  14. <activation-config-property-value>javax.jms.Queue</activation-config-property-value>
  15. </activation-config-property>
  16. <activation-config-property>
  17. <activation-config-property-name>destination</activation-config-property-name>
  18. <activation-config-property-value>/queues/InstanceQueue</activation-config-property-value>
  19. </activation-config-property>
  20. </activation-config>
  21. </message-driven>
  22. </enterprise-beans>
  23. </ejb-jar>

persistence.xml

Xml代码   Java消息服务JMS详解

  1. <persistence xmlns="http://java.sun.com/xml/ns/persistence"

  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  3. xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence\_1\_0.xsd"

  4. version="1.0">

  5. <persistence-unit name="msgreceiver-ds">

  6. <jta-data-source>java:/datasources/visesbdb</jta-data-source>

  7. <jar-file>com.project.soa.bean-2.2.0.jar</jar-file>

  8. <properties>

  9. <property name="hibernate.hbm2ddl.auto" value="none" />

  10. <property name="hibernate.show_sql" value="false" />

  11. <property name="hibernate.format_sql" value="false" />

  12. </properties>

  13. </persistence-unit>

  14. </persistence>

Java代码   Java消息服务JMS详解

  1. public class InstanceReceiver implements javax.jms.MessageListener {

  2. private static final Logger logger = LoggerFactory.getLogger(InstanceReceiver.class);

  3. @EJB(name="InstanceService")

  4. private InstanceService is;

  5. @Override

  6. public void onMessage(Message msg) {

  7. try {

  8. is.processMsg(((ObjectMessage)msg).getObject());

  9. } catch (JMSException e) {

  10. logger.error("[InstanceReceiver.onMessage] " + e.getMessage());

  11. e.printStackTrace();

  12. }

  13. }

  14. }

  15. @Stateless

  16. @Local ({InstanceService.class})

  17. public class InstanceServiceImpl implements InstanceService{

  18. private static final Logger logger = LoggerFactory.getLogger(InstanceServiceImpl.class);

  19. @PersistenceContext

  20. private EntityManager em;

  21. ...

  22. }

JMS中消息的 同步消费 和 异步消费

同步消费 比如

connection.start();

Message message=queueReceiver.receive();

同步消费 receive 就执行一次,并返回message对象。

同步消费中,消息的接收者会一直等待下去,直到有消息到达,或者超时

异步消费 比如

connection.start();

receiver.setMessageListener(new MyMessageListener());  //MyMessageListener实现了MessageListener接口

System.in.read();  //这句话是为了人为的阻塞程序不然 还没接收到消息 ,程序一下子就执行完了,关闭了。

异步消费会注册一个监听器,当有消息到达的时候,会回调它的onMessage()方法,没有次数限制

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Wesley13 Wesley13
3年前
ActiveMQ的使用
ActiveMQ是Apache出品的开源消息总线。完全支持JMS1.1规范首先我们要了解一下JMSJMS简介Java消息服务(JavaMessageService,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异
Wesley13 Wesley13
3年前
JavaAPI
1.ActiveMQ是什么ActiveMQ是一个消息队列应用服务器(推送服务器)。支持JMS规范。1.1JMS概述全称:JavaMessageService,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)实现了JMS标准的系统,称之
Stella981 Stella981
3年前
Spring Boot实践
一.认识JMS1.1概述对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(JavaMessageService)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
JMS 实例讲解
1\.JMS基本概念    JMS(JavaMessageService)即Java消息服务。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通信模型:点到点(pointtopoint)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P模型规定了一个消息只能有一个接收者;Pub/Sub模型允许一个消息可以
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这