1.maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
2.配置文件yml
rocketmq:
name-server: http://127.0.0.1:9876
producer:
group: GROUP
3.顺序发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 顺序消息
*
* @param message
* @param tag
* @param key
*/
public void send(String topic, String message, String tag, String key) {
rocketMQTemplate.send(topic,
MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.TAGS, tag)
.setHeader(RocketMQHeaders.KEYS, key)
.build());
}
4.异步发送消息
/**
* 异步消息
*
* @param topic
* @param msg
*/
public void sendASyncMsg(String topic, String msg) {
rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//成功回调
log.info("异步发送消息成功:{}", JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
//异常回调
log.info("异步发送消息失败:{}", e.getMessage());
}
});
}
5.延时消息
* 发送异步延迟消息
* 消息内容为json格式
*/
public void sendAsyncMsgByJsonDelay(String topic, String json,int level) throws JsonProcessingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(topic, json.getBytes(Charset.forName("utf-8")));
//设置延迟等级 0不延时从1开始分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(level);
//发送异步消息
this.rocketMQTemplate.getProducer().send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable.getMessage());
}
});
}
6.分组消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 分组消费消息
* @author
*/
@Slf4j
@Component
@RocketMQMessageListener(topic ="test-topic", consumerGroup = "test_group")
public class TestConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("received message: " + message);
}
}
7.tag消费,同一topic只能被一组消费 第一个tag消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* tag消费消息
* @author
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group1",selectorExpression ="tag1",selectorType = SelectorType.TAG)
public class TagOneConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("received message: " + message);
}
}
第二个tag消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* tag消费消息
* @author
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group2",selectorExpression ="tag2",selectorType = SelectorType.TAG)
public class TagTwoConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("received message: " + message);
}
}
8.消费多个tag消息,用||分隔
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* tag消费消息
* @author
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "group",selectorExpression ="tag1||tag2",selectorType = SelectorType.TAG)
public class TagConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("received message: " + message);
}
}