ActiveMQ简述,使用

Wesley13
• 阅读 682

官网地址:http://activemq.apache.org/

参考文章:http://my.oschina.net/nk2011/blog/366395

JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。

ActiveMQ简述,使用

另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。

ActiveMQ简述,使用

ActiveMQ的安装

下载最新的安装包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是针对linux系统进行阐述,当然ActiveMQ也有win版的,这里就不赘述了),可以去官网下载,也可以在下方留言区留下你的邮箱,博主会发给你的~

下载之后解压: tar -zvxf apache-activemq-5.13.2-bin.tar.gz

ActiveMQ目录内容有:

  • bin目录包含ActiveMQ的启动脚本

  • conf目录包含ActiveMQ的所有配置文件

  • data目录包含日志文件和持久性消息数据

  • example: ActiveMQ的示例

  • lib: ActiveMQ运行所需要的lib

  • webapps: ActiveMQ的web控制台和一些相关的demo

运行命令:activemq start(在activemq/bin下运行)

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')

查看activemq是否运行命令:ps -aux | grep activemq

shr        986  1.2  9.7 1281720 201936 pts/5  Sl   19:43   0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start
shr       1501  0.0  0.0   5176   724 pts/5    S+   20:06   0:00 grep activemq

关闭命令: activemq stop

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '986' :
Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jre
  Heap sizes: current=63232k  free=62218k  max=932096k
    JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data
Extensions classpath:
  [/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]
ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf
ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data
Connecting to pid: 986
..Stopping broker: localhost
.. TERMINATED

ActiveMQ的默认服务端口为61616,这个可以在conf/activemq.xml配置文件中修改:

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

案例

在下载的apache-activemq-5.13.2-bin.tar.gz包中解压有一个jar包:activemq-all-5.13.2.jar,引入这个jar到你的项目中即可开始编写案例代码。

博主的activemq服务器地址为10.10.195.187,这个在下面代码中会有体现。

按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:

  1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。

  2. 利用factory构造JMS connection

  3. 启动connection

  4. 通过connection创建JMS session.

  5. 指定JMS destination.

  6. 创建JMS producer或者创建JMS message并提供destination.

  7. 创建JMS consumer或注册JMS message listener.

  8. 发送和接收JMS message.

  9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

下面来看代码举例(P2P式)。

通过Java实现的基于ActiveMQ的请求提交:

package com.zzh.activemq;
 
import java.io.Serializable;
import java.util.HashMap;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class RequestSubmit
{
    //消息发送者
    private MessageProducer producer;
    //一个发送或者接受消息的线程
    private Session session;
 
    public void init() throws Exception
    {
        //ConnectionFactory连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        //Connection:JMS客户端到JMS Provider的连接,从构造工厂中得到连接对象
        Connection connection = connectionFactory.createConnection();
        //启动
        connection.start();
        //获取连接操作
        session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destinatin = session.createQueue("RequestQueue");
        //得到消息生成(发送)者
        producer = session.createProducer(destinatin);
        //设置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }
 
    public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception
    {
        ObjectMessage message = session.createObjectMessage(requestParam);
        producer.send(message);
        session.commit();
    }
 
    public static void main(String[] args) throws Exception{
        RequestSubmit submit = new RequestSubmit();
        submit.init();
        HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>();
        requestParam.put("郏高阳", "zzh");
        submit.submit(requestParam);
    }
}

创建Session时有两个非常重要的参数,第一个boolean类型的参数用来表示是否采用事务消息。如果是事务消息,对于的参数设置为true,此时消息的提交自动有comit处理,消息的回滚则自动由rollback处理。加入消息不是事务的,则对应的该参数设置为false,此时分为三种情况:

  • Session.AUTO_ACKNOWLEDGE表示Session会自动确认所接收到的消息。

  • Session.CLIENT_ACKNOWLEDGE表示由客户端程序通过调用消息的确认方法来确认所接收到的消息。

  • Session.DUPS_OK_ACKNOWLEDGE使得Session将“懒惰”地确认消息,即不会立即确认消息,这样有可能导致消息重复投递。

提供Java实现的基于ActiveMQ的请求处理:

package com.zzh.activemq;
 
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class RequestProcessor
{
    public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception
    {
        System.out.println("requestHandler....."+requestParam.toString());
        for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet())
        {
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
    }
 
    public static void main(String[] args) throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("RequestQueue");
        //消息消费(接收)者
        MessageConsumer consumer = session.createConsumer(destination);
 
        RequestProcessor processor = new RequestProcessor();
 
        while(true)
        {
            ObjectMessage message = (ObjectMessage) consumer.receive(1000);
            if(null != message)
            {
                System.out.println(message);
                HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();
                processor.requestHandler(requestParam);
            }
            else
            {
                break;
            }
        }
    }
}

输出结果:

ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
requestHandler.....{郏高阳=zzh}
郏高阳:zzh

可以通过页面查看队列的使用情况,在浏览器中输入http://10.10.195.187:8161/admin/queues.jsp,用户名和密码都是:admin,看到以下页面:

ActiveMQ简述,使用

这个是在jetty服务器下跑的,可以修改conf/jetty.xml来修改相关jetty配置。

上面的例子是关于P2P模式的,不过有个不妥之处,就是没有资源的释放。下面举一个Pub/Sub模式的。

通过JMS创建ActiveMQ的topic,并给topic发送消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Produce;
 
public class TopicRequest
{
    //消息发送者
    private MessageProducer producer;
    //一个发送或者接受消息的线程
    private Session session;
    //Connection:JMS客户端到JMS Provider的连接
    private Connection connection;
 
    public void init() throws Exception
    {
        //ConnectionFactory连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        //从构造工厂中得到连接对象
        connection = connectionFactory.createConnection();
        //启动
        connection.start();
        //获取连接操作
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MessageTopic");
        producer = session.createProducer(topic);
        //设置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }
 
    public void submit(String mess) throws Exception
    {
        TextMessage message = session.createTextMessage();
        message.setText(mess);
        producer.send(message);
    }
 
    public void close()
    {
        try
        {
            if(session != null)
                session.close();
            if(producer != null)
                producer.close();
            if(connection !=null )
                connection.close();
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws Exception
    {
        TopicRequest topicRequest = new TopicRequest();
        topicRequest.init();
        topicRequest.submit("I'm first");
        topicRequest.close();
    }
}

消息发送到对应的topic后,需要将listener注册到需要订阅的topic上,以便能够接收该topic的消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class TopicReceive
{
    private MessageConsumer consumer;
    private Session session;
 
    public void init() throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MessageTopic");
        consumer = session.createConsumer(topic);
 
        consumer.setMessageListener(new MessageListener(){
            @Override
            public void onMessage(Message message)
            {
                TextMessage tm = (TextMessage) message;
                System.out.println(tm);
                try
                {
                    System.out.println(tm.getText());
                }
                catch (JMSException e)
                {
                    e.printStackTrace();
                }
            }
        });
    }
 
    public static void main(String[] args) throws Exception
    {
        TopicReceive receive = new TopicReceive();
        receive.init();
    }
}

输出结果:

ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}
I'm first
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
AvctiveMQ和RabbitMQ的区别
ActiveMQ:传统的消息队列,使用Java语言编写。基于JMS(JavaMessageService),采用多线程并发,资源消耗比较大。支持P2P和发布订阅两种模式。RabbitMQ:是使用Erlang语言开发的开源消息队列系统。基于AMQP协议来实现的。AMQP的主要特征是面向消息、队列、路由(
Wesley13 Wesley13
3年前
JMS 实例讲解
1\.JMS基本概念    JMS(JavaMessageService)即Java消息服务。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通信模型:点到点(pointtopoint)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P模型规定了一个消息只能有一个接收者;Pub/Sub模型允许一个消息可以
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
JMS介绍
JMS消息传送模型    点对点消息传送模型  在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。点对点消息模型有一些特性,如下:每个消息只有一个接收者;消息发送者和接收者并没有时间依
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这