RabbitMQ
安装RabitMQ
1:安装RabbitMQ需要先安装Erlang语言开发包。下载地址 http://www.erlang.org/download.html 在win7下安装Erlang最好默认安装。
配置环境变量 ERLANG_HOME C:\Program Files (x86)\erl5.9
添加到PATH %ERLANG_HOME%\bin;
2:安装RabbitMQ 下载地址 http://www.rabbitmq.com/download.html 安装教程:http://www.rabbitmq.com/install-windows.html
配置环境变量 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-2.8.0
添加到PATH %RABBITMQ_SERVER%\sbin;
3:进入%RABBITMQ_SERVER%\sbin 目录以管理员身份运行 rabbitmq-plugins.bat
rabbitmq-plugins.bat enable rabbitmq_management
安装完成之后以管理员身份启动 rabbitmq-service.bat
rabbitmq-service.bat stop
rabbitmq-service.bat install
rabbitmq-service.bat start
4:浏览器访问localhost:15672 默认账号:guest 密码:guest
Java 开发RabitMQ
1.下载jar
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.5.4/rabbitmq-java-client-bin-3.5.4.zip
2.生产者
package com.rabbit;
import com.rabbitmq.client.*;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
3.消费者
package com.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Reqv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
Spring RabitMQ
- 先看一个帖子
1.首先是生产者配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:context
=
"http://www.springframework.org/schema/context"
xmlns:rabbit
=
"http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<
rabbit:connection-factory
id
=
"connectionFactory"
host
=
"localhost"
username
=
"guest"
password
=
"guest"
port
=
"5672"
/>
<
rabbit:admin
connection-factory
=
"connectionFactory"
/>
<!-- queue 队列声明-->
<
rabbit:queue
id
=
"queue_one"
durable
=
"true"
auto-delete
=
"false"
exclusive
=
"false"
name
=
"queue_one"
/>
<!-- exchange queue binging key 绑定 -->
<
rabbit:direct-exchange
name
=
"my-mq-exchange"
durable
=
"true"
auto-delete
=
"false"
id
=
"my-mq-exchange"
>
<
rabbit:bindings
>
<
rabbit:binding
queue
=
"queue_one"
key
=
"queue_one_key"
/>
</
rabbit:bindings
>
</
rabbit:direct-exchange
>
<-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<
bean
id
=
"jsonMessageConverter"
class
=
"mq.convert.FastJsonMessageConverter"
></
bean
>
<-- spring template声明-->
<
rabbit:template
exchange
=
"my-mq-exchange"
id
=
"amqpTemplate"
connection-factory
=
"connectionFactory"
message-converter
=
"jsonMessageConverter"
/>
</
beans
>
2.fastjson messageconver插件实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import
org.apache.commons.logging.Log;
import
org.apache.commons.logging.LogFactory;
import
org.springframework.amqp.core.Message;
import
org.springframework.amqp.core.MessageProperties;
import
org.springframework.amqp.support.converter.AbstractMessageConverter;
import
org.springframework.amqp.support.converter.MessageConversionException;
import
fe.json.FastJson;
public
class
FastJsonMessageConverter
extends
AbstractMessageConverter {
private
static
Log log = LogFactory.getLog(FastJsonMessageConverter.
class
);
public
static
final
String DEFAULT_CHARSET =
"UTF-8"
;
private
volatile
String defaultCharset = DEFAULT_CHARSET;
public
FastJsonMessageConverter() {
super
();
//init();
}
public
void
setDefaultCharset(String defaultCharset) {
this
.defaultCharset = (defaultCharset !=
null
) ? defaultCharset
: DEFAULT_CHARSET;
}
public
Object fromMessage(Message message)
throws
MessageConversionException {
return
null
;
}
public
<T> T fromMessage(Message message,T t) {
String json =
""
;
try
{
json =
new
String(message.getBody(),defaultCharset);
}
catch
(UnsupportedEncodingException e) {
e.printStackTrace();
}
return
(T) FastJson.fromJson(json, t.getClass());
}
protected
Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws
MessageConversionException {
byte
[] bytes =
null
;
try
{
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes(
this
.defaultCharset);
}
catch
(UnsupportedEncodingException e) {
throw
new
MessageConversionException(
"Failed to convert Message content"
, e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(
this
.defaultCharset);
if
(bytes !=
null
) {
messageProperties.setContentLength(bytes.length);
}
return
new
Message(bytes, messageProperties);
}
}
3.生产者端调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import
java.util.List;
import
org.springframework.amqp.core.AmqpTemplate;
public
class
MyMqGatway {
@Autowired
private
AmqpTemplate amqpTemplate;
public
void
sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend(
"queue_one_key"
, obj);
}
}
4.消费者端配置(与生产者端大同小异)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:context
=
"http://www.springframework.org/schema/context"
xmlns:rabbit
=
"http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<
rabbit:connection-factory
id
=
"connectionFactory"
host
=
"localhost"
username
=
"guest"
password
=
"guest"
port
=
"5672"
/>
<
rabbit:admin
connection-factory
=
"connectionFactory"
/>
<!-- queue 队列声明-->
<
rabbit:queue
id
=
"queue_one"
durable
=
"true"
auto-delete
=
"false"
exclusive
=
"false"
name
=
"queue_one"
/>
<!-- exchange queue binging key 绑定 -->
<
rabbit:direct-exchange
name
=
"my-mq-exchange"
durable
=
"true"
auto-delete
=
"false"
id
=
"my-mq-exchange"
>
<
rabbit:bindings
>
<
rabbit:binding
queue
=
"queue_one"
key
=
"queue_one_key"
/>
</
rabbit:bindings
>
</
rabbit:direct-exchange
>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<
rabbit:listener-container
connection-factory
=
"connectionFactory"
acknowledge
=
"auto"
task-executor
=
"taskExecutor"
>
<
rabbit:listener
queues
=
"queue_one"
ref
=
"queueOneLitener"
/>
</
rabbit:listener-container
>
</
beans
>
5.消费者端调用
1
2
3
4
5
6
7
8
9
import
org.springframework.amqp.core.Message;
import
org.springframework.amqp.core.MessageListener;
public
class
QueueOneLitener
implements
MessageListener{
@Override
public
void
onMessage(Message message) {
System.out.println(
" data :"
+ message.getBody());
}
}
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
网上的这篇帖子,将路由设置成了direct,这样子保证了一对一的读取和发布,对于路由的exchange请参照这里的说明:http://melin.iteye.com/blog/691265
要特别注意的是,queue队列,其实只是在xml中的配置,并没有实际的意义,读取的时候使用的键值其实就是key,在发送的时候 conv``ertAndSend(``"queue_one_key"``, obj),就是键值,不要混淆了
- 下面介绍我在项目中的应用,这里还可以实现如果有异常,实现了自动重发功能
1、生产者
app.xml配置,将其引入到spring.xml中即可
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}"
username="${rabbitmq.username}" password="${rabbitmq.password}"/>
<!-- 创建rabbitAdmin 代理类 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 创建rabbitTemplate 消息模板类 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
发送数据
package com.zefun.web.service;
import java.util.HashMap;
import java.util.Map;
import net.sf.json.JSONObject;
import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import com.zefun.common.consts.App;
/**
* 消息队列服务类
* @author
* @date Aug 24, 2015 3:51:04 PM
*/
@Service
public class RabbitService {
/** 日志对象 */
private static Logger logger = Logger.getLogger(RabbitService.class);
/** rabbit队列模版方法 */
@Autowired()
private RabbitTemplate rabbitTemplate;
/**
* 发送优惠券队列
* @author 高国藩
* @date 2015年9月16日 上午11:34:12
* @param map 参数
*/
public void sendCoupons(Map<String, Object> map) {
//App.Queue.SEND_COUPONS 就是对应key值,在消费的时候使用该值
rabbitTemplate.convertAndSend(App.Queue.SEND_COUPONS, map);
}
}
1、消费者
app.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" />
<!-- 创建rabbitAdmin 代理类 -->
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue id="queue_member_service_coupon" name="${rabbitmq.wechat.template.notice.coupon}" durable="true"
auto-delete="false" exclusive="false" />
<!--路由设置 将队列绑定,属于direct类型 -->
<rabbit:direct-exchange id="directExchange"
name="directExchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue_member_service_coupon" key="${rabbitmq.wechat.template.notice.coupon}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="ackManual"
class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
<property name="staticField"
value="org.springframework.amqp.core.AcknowledgeMode.MANUAL" />
</bean>
<!-- 优惠券发送通知 -->
<bean
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="acknowledgeMode" ref="ackManual" />
<property name="queueNames" value="${rabbitmq.wechat.template.notice.coupon}" />
<property name="messageListener">
<bean class="com.zefun.wechat.listener.MemberTranscationNoitceCoupon" />
</property>
<property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers}" />
<property name="adviceChain">
<bean
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer">
<bean class="com.zefun.wechat.utils.MQRepublishMessageRecoverer"/>
</property>
<property name="retryOperations">
<bean class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean
class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
</property>
</bean>
</property>
<property name="errorHandler">
<bean class="com.zefun.wechat.utils.MQErrorHandler"/>
</property>
</bean>
<bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
</beans>
消费类
package com.zefun.wechat.listener;
import java.util.Map;
import net.sf.json.JSONObject;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import com.rabbitmq.client.Channel;
import com.zefun.wechat.service.RedisService;
import com.zefun.wechat.utils.App;
import com.zefun.wechat.utils.HttpClientUtil;
public class MemberTranscationNoitceCoupon implements ChannelAwareMessageListener{
@Autowired
private MessageConverter msgConverter;
@Autowired
private RedisService redisService;
private static final Logger logger = Logger.getLogger(EmployeeServiceNoticeListener.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Object obj = null;
try {
obj = msgConverter.fromMessage(message);
} catch (MessageConversionException e) {
logger.error("convert MQ message error.", e);
} finally {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (deliveryTag != App.DELIVERIED_TAG) {
channel.basicAck(deliveryTag, false);
message.getMessageProperties().setDeliveryTag(App.DELIVERIED_TAG);
logger.info("revice and ack msg: " + (obj == null ? message : new String((byte[]) obj)));
}
}
if (obj == null) {
return;
}
Map<?, ?> map = (Map<?, ?>) obj;
HttpClientUtil.sendPost(getTemplSendUrl(map.get("storeId").toString()), JSONObject.fromObject(map).toString(), null);
boolean flag = false;
if (!flag) {
logger.error("hanler message " + obj + " failed, throw a exception, and it will be retried.");
throw new RuntimeException("hanler message " + obj + " failed.");//如果此处抛出了异常,那么在消息转换类中会接受到,触发重新加入队列中的时间
}
}
private String getTemplSendUrl(String storeId) {
String accessToken = redisService.hget(App.Redis.STORE_WECHAT_ACCESS_TOKEN_KEY_HASH, storeId);
return "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=" + "HwqNKfoP287p4ddayRVX0PN1-8GFukq776MmwQqaL4OI2oEb4WGzclCaPgQIZZd4I42Xo-beX4XrW5Og3NblI_Auf5dGj1hdPrBuhz5OYHE";
}
}
消息转换类
package com.zefun.wechat.utils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
public class MQRepublishMessageRecoverer implements MessageRecoverer {
private static final Logger logger = Logger.getLogger(MQRepublishMessageRecoverer.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageConverter msgConverter;
@Override
public void recover(Message message, Throwable cause) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put("x-exception-stacktrace", getStackTraceAsString(cause));
headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
//重新将数据放回队列中
this.rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message);
logger.error("handler msg (" + msgConverter.fromMessage(message) + ") err, republish to mq.", cause);
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
}
package com.zefun.wechat.utils;
import java.lang.reflect.Field;
import java.util.Date;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ErrorHandler;
import com.zefun.wechat.service.RedisService;
public class MQErrorHandler implements ErrorHandler {
private static final Logger logger = Logger.getLogger(MQErrorHandler.class);
@Autowired
private RedisService redisService;
@Autowired
private MessageConverter msgConverter;
@Override
public void handleError(Throwable cause) {
Field mqMsgField = FieldUtils.getField(MQListenerExecutionFailedException.class, "mqMsg", true);
if (mqMsgField != null) {
try {
Message mqMsg = (Message) mqMsgField.get(cause);
Object msgObj = msgConverter.fromMessage(mqMsg);
logger.error("handle MQ msg: " + msgObj + " failed, record it to redis.", cause);
redisService.zadd(App.MsgErr.MQ_MSG_ERR_RECORD_KEY, new Double(new Date().getTime()), msgObj.toString());
} catch (Exception e) {
e.printStackTrace();
}
} else {
logger.error("An error occurred.", cause);
}
}
}
package com.zefun.wechat.utils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException;
public class MQListenerExecutionFailedException extends
ListenerExecutionFailedException {
private static final long serialVersionUID = 1L;
private Message mqMsg;
public MQListenerExecutionFailedException(String msg, Throwable cause) {
super(msg, cause);
}
public MQListenerExecutionFailedException(String msg, Message mqMsg, Throwable cause) {
this(msg, cause);
this.mqMsg = mqMsg;
}
public Message getMqMsg() {
return mqMsg;
}
}
package com.zefun.wechat.utils;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
public class MQRejectAndDontRequeueRecoverer extends
RejectAndDontRequeueRecoverer {
@Override
public void recover(Message message, Throwable cause) {
throw new MQListenerExecutionFailedException("Retry Policy Exhausted", message,
new AmqpRejectAndDontRequeueException(cause));
}
}
properties内容
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.concurrentConsumers=5
rabbitmq.channel.cache.size=50
rabbitmq.wechat.template.notice.coupon=queue_member_service_coupon
maven
<!-- springmvc rabbitmq -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.0.3.RELEASE</version>
</dependency>