前景:目前很多公司已经都在使用阿里云产品,数据库、监控、k8s、短信业务、直播等,而接下来描述的则是其中之一,如何对接快速使用阿里RocketMQ,而使用的是ons-client
初级版本Springboot2.X+RocketMq(本地安装RocketMq)
1、pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.4.Final</version> </dependency>
_<_dependency_> _ _<_groupId_>cn.hutool</_groupId_> _ _<_artifactId_>hutool-all</_artifactId_> _ <_version_>5.4.4</_version_> </_dependency_>
2、配置文件
一般情况下,公司可不止一个topic
rocket: mq: aliyun: namesrvAddr: http://onsaddr.... accessKey: secretKey: mobile: topic: live_Channel tag: '*' group: GID_MDXL_MJT clearChannel: topic: live_clear tag: '*' group: GID_live_clea updateChannel: topic: live_update_channel_info tag: '*' group: GID_live_update updateAllChannel: topic: live_updateallchannels tag: '*' group: GID_live_updateallchannels
3、RocketMQ配置
配置里mobile、clearChannel、updateChannel、updateAllChannel则是对应多个topic
import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.Properties; /** * 配置 * @author mujiutian * @date 2020/10/26 */ @Data @Component @ConfigurationProperties(prefix = "rocket.mq.aliyun") public class RocketMqConfig {
private String accessKey;
private String secretKey; private String namesrvAddr; /** mobile结构同步 */ private Config mobile; private Config clearChannel; private Config updateChannel; private Config updateAllChannel; public Properties getProperties() { java.util.Properties properties = new java.util.Properties(); properties.setProperty(PropertyKeyConst.AccessKey, accessKey); properties.setProperty(PropertyKeyConst.SecretKey, secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); return properties; }
@Data
public static class Config { /** 主题 */ private String topic; /** 标签 */ private String tag; /** 组 */ private String group; } }
4、消费者配置
因为有多个topic,则配置以下多个bean对应topic的消费者
import com.aliyun.openservices.ons.api.*; import com.sports.live.mq.ClearChannelListener; import com.sports.live.mq.MobileStatusListener; import com.sports.live.mq.UpdateChannelListener; import org.springframework.context.annotation.*; import org.springframework.context.annotation.Bean; import javax.annotation.Resource; /** * 消费者配置 * @author mujiutian * @date 2020/10/26 */ @Configuration public class ConsumerConfig { @Resource private RocketMqConfig rocketMqConfig; @Resource private MobileStatusListener mobileStatusListener; @Resource ClearChannelListener clearChannelListener; @Resource UpdateChannelListener updateChannelListener; @Bean(initMethod = "start", destroyMethod = "shutdown", name = "mobileConsumer") public Consumer buildMobileConsumer() { return buildConsumer(rocketMqConfig.getMobile(), PropertyValueConst.BROADCASTING, mobileStatusListener); }
@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "clearConsumer")
public Consumer buildClearConsumer() {
return buildConsumer(rocketMqConfig.getClearChannel(), PropertyValueConst.BROADCASTING, clearChannelListener);
}
@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "updateConsumer")
public Consumer buildUpdateChannelConsumer(){
return buildConsumer(rocketMqConfig.getClearChannel(), PropertyValueConst.CLUSTERING, updateChannelListener);
}
private Consumer buildConsumer(RocketMqConfig.Config config, MessageListener messageListener) {
return buildConsumer(config.getTopic(), config.getTag(), config.getGroup(), messageListener);
}
private Consumer buildConsumer(RocketMqConfig.Config config, String messageModel, MessageListener messageListener) {
return buildConsumer(config.getTopic(), config.getTag(), config.getGroup(), messageModel, messageListener);
}
private Consumer buildConsumer(String topic, String tag, String groupId, MessageListener messageListener) {
java.util.Properties properties = rocketMqConfig.getProperties();
properties.put(PropertyKeyConst.GROUP_ID, groupId); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, tag, messageListener); return consumer; }
private Consumer buildConsumer(String topic, String tag, String groupId, String messageModel, MessageListener messageListener){
java.util.Properties properties = rocketMqConfig.getProperties();
properties.put(PropertyKeyConst.GROUP_ID, groupId); properties.put(PropertyKeyConst.MessageModel, messageModel); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, tag, messageListener); return consumer; } }
5、创建者配置
同消费者配置,多个topic也是配置多个bean对应生产者
import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.springframework.context.annotation.*; import javax.annotation.Resource; import java.util.Properties; /** * 生产者配置 * @author mujiutian * @date 2020/10/26 */ @Configuration public class ProducerConfig { @Resource private RocketMqConfig rocketMqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown",name = "mobileProducer") public Producer buildMobileProducer() { Properties properties = rocketMqConfig.getProperties(); properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getMobile().getGroup()); return ONSFactory.createProducer(properties); }
@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "clearProducer")
public Producer buildClearChannelProducer(){
Properties properties = rocketMqConfig.getProperties();
properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getClearChannel().getGroup()); return ONSFactory.createProducer(properties); }
@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "updateProducer")
public Producer buildUpdateChannelProducer(){
Properties properties = rocketMqConfig.getProperties();
properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getUpdateChannel().getGroup()); return ONSFactory.createProducer(properties); }
@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "updateAllProducer")
public Producer buildUpdateAllChannelProducer(){
Properties properties = rocketMqConfig.getProperties();
properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getUpdateAllChannel().getGroup()); return ONSFactory.createProducer(properties); } }
6、创建者发送消息,监听器接收消息处理
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import tv.zhangyu.onsconfig.JacksonUtil; import javax.annotation.Resource; import java.util.HashMap; /** * 通知其他机器清除内存中的信息 * @author mujiutian * @date 2020/10/26 */ @Slf4j @Component public class ClearChannelProducer { @Value("${rocket.mq.aliyun.clearChannel.topic}") private String topic; private static final String TAGS = "clear_channel_info"; @Resource Producer clearProducer; public void send(String uid){ java.util.Map<String,String> map = new HashMap<>(); map.put("channel_info_uid", uid); String body = JacksonUtil.toJson(map); String messageId = null; boolean ret = true; try { Message msg = new Message(topic, TAGS, body.getBytes()); SendResult sendResult = clearProducer.send(msg); messageId = sendResult.getMessageId(); } catch (Exception e) { ret = false; log.error("doChannelInfoClearProducer send error", e); e.printStackTrace(); } log.info("zhangyu_clear_channel_info MessageId:{}, body:{},ret:{}", messageId, body, ret); } }
import cn.hutool.json.JSONObject; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.sports.live.service.impl.ChannelRoomService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 接受消息,清除内存中的信息 * @author mujiutian * @date 2020/10/26 */ @Slf4j @Component public class ClearChannelListener implements MessageListener {
@Resource
ChannelRoomService channelRoomService; @Override public Action consume(Message message, ConsumeContext consumeContext) { String messageId = message.getMsgID(); String result = new String(message.getBody()); log.info("ChannelInfoClearConsumer start messageId{}, result{}", new Object[]{messageId, result}); return doUpdate(result); }
private Action doUpdate(String json){
//参数判断
if(json == null){ return Action.CommitMessage; }
try {
JSONObject obj = cn.hutool.json.JSONUtil.parseObj(json.trim());
String uid = obj.getStr("channel_info_uid"); if (org.apache.commons.lang3.StringUtils.isNotEmpty(uid)) { channelRoomService.clearChannelInfoFromMem(uid); } } catch (Exception e) { log.error("doChannelInfoClearConsumer error", e); e.printStackTrace(); } return Action.CommitMessage; } }
以上主要是业务逻辑了,其他的service不用考虑缺失与否。
7、自己测试发送消息
@Resource Producer mobileProducer; @GetMapping(value = "/test1") @ResponseBody public String test1(){ mobileProducer.start(); Message message = new Message(); message.setTopic("live_Channel..."); message.setTag("111"); java.util.Map<String,String> map = new HashMap<>(2); map.put("name","chengjian"); map.put("tag","mobile8888"); message.setBody(cn.hutool.json.JSONUtil.toJsonStr(map).getBytes()); mobileProducer.send(message); return "OK"; }
这篇博客的精髓在RocketMqConfig枚举类的使用以及多个topic的配置,它会让你会更接近java的面向编程、抽象思维开发。