SpringBoot2.X + 阿里云RocketMQ(78)

Stella981
• 阅读 658

《SpringBoot2.X心法总纲》

前景:目前很多公司已经都在使用阿里云产品,数据库、监控、k8s、短信业务、直播等,而接下来描述的则是其中之一,如何对接快速使用阿里RocketMQ,而使用的是ons-client

初级版本Springboot2.X+RocketMq(本地安装RocketMq)

1、pom依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>

<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.4.Final</version> </dependency>

_<_dependency_> _ _<_groupId_>cn.hutool</_groupId_> _ _<_artifactId_>hutool-all</_artifactId_> _ <_version_>5.4.4</_version_> </_dependency_>

2、配置文件

一般情况下,公司可不止一个topic

rocket: mq: aliyun: namesrvAddr: http://onsaddr.... accessKey: secretKey: mobile: topic: live_Channel tag: '*' group: GID_MDXL_MJT clearChannel: topic: live_clear tag: '*' group: GID_live_clea updateChannel: topic: live_update_channel_info tag: '*' group: GID_live_update updateAllChannel: topic: live_updateallchannels tag: '*' group: GID_live_updateallchannels

3、RocketMQ配置

配置里mobile、clearChannel、updateChannel、updateAllChannel则是对应多个topic

import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.Properties; /** * 配置 * @author mujiutian
 * @date 2020/10/26 */ @Data @Component @ConfigurationProperties(prefix = "rocket.mq.aliyun") public class RocketMqConfig {

private String accessKey;

private String secretKey; private String namesrvAddr; /** mobile结构同步 */ private Config mobile; private Config clearChannel; private Config updateChannel; private Config updateAllChannel; public Properties getProperties() { java.util.Properties properties = new java.util.Properties(); properties.setProperty(PropertyKeyConst.AccessKey, accessKey); properties.setProperty(PropertyKeyConst.SecretKey, secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); return properties; }

@Data

public static class Config { /** 主题 */ private String topic; /** 标签 */ private String tag; /** 组 */ private String group; } }

4、消费者配置

因为有多个topic,则配置以下多个bean对应topic的消费者

import com.aliyun.openservices.ons.api.*; import com.sports.live.mq.ClearChannelListener; import com.sports.live.mq.MobileStatusListener; import com.sports.live.mq.UpdateChannelListener; import org.springframework.context.annotation.*; import org.springframework.context.annotation.Bean; import javax.annotation.Resource; /** * 消费者配置 * @author mujiutian
 * @date 2020/10/26 */ @Configuration public class ConsumerConfig { @Resource private RocketMqConfig rocketMqConfig; @Resource private MobileStatusListener mobileStatusListener; @Resource ClearChannelListener clearChannelListener; @Resource UpdateChannelListener updateChannelListener; @Bean(initMethod = "start", destroyMethod = "shutdown", name = "mobileConsumer") public Consumer buildMobileConsumer() { return buildConsumer(rocketMqConfig.getMobile(), PropertyValueConst.BROADCASTING, mobileStatusListener); }

@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "clearConsumer")
public Consumer buildClearConsumer() {
    return buildConsumer(rocketMqConfig.getClearChannel(), PropertyValueConst.BROADCASTING, clearChannelListener);

}

@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "updateConsumer")
public Consumer buildUpdateChannelConsumer(){
    return buildConsumer(rocketMqConfig.getClearChannel(), PropertyValueConst.CLUSTERING, updateChannelListener);

}

private Consumer buildConsumer(RocketMqConfig.Config config, MessageListener messageListener) {
    return buildConsumer(config.getTopic(), config.getTag(), config.getGroup(), messageListener);

}

private Consumer buildConsumer(RocketMqConfig.Config config, String messageModel, MessageListener messageListener) {
    return buildConsumer(config.getTopic(), config.getTag(), config.getGroup(), messageModel, messageListener);

}

private Consumer buildConsumer(String topic, String tag, String groupId, MessageListener messageListener) {
    java.util.Properties properties = rocketMqConfig.getProperties();

properties.put(PropertyKeyConst.GROUP_ID, groupId); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, tag, messageListener); return consumer; }

private Consumer buildConsumer(String topic, String tag, String groupId, String messageModel, MessageListener messageListener){
    java.util.Properties properties = rocketMqConfig.getProperties();

properties.put(PropertyKeyConst.GROUP_ID, groupId); properties.put(PropertyKeyConst.MessageModel, messageModel); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, tag, messageListener); return consumer; } }

5、创建者配置

同消费者配置,多个topic也是配置多个bean对应生产者

import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.springframework.context.annotation.*; import javax.annotation.Resource; import java.util.Properties; /** * 生产者配置 * @author mujiutian
 * @date 2020/10/26 */ @Configuration public class ProducerConfig { @Resource private RocketMqConfig rocketMqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown",name = "mobileProducer") public Producer buildMobileProducer() { Properties properties = rocketMqConfig.getProperties(); properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getMobile().getGroup()); return ONSFactory.createProducer(properties); }

@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "clearProducer")
public Producer buildClearChannelProducer(){
    Properties properties = rocketMqConfig.getProperties();

properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getClearChannel().getGroup()); return ONSFactory.createProducer(properties); }

@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "updateProducer")
public Producer buildUpdateChannelProducer(){
    Properties properties = rocketMqConfig.getProperties();

properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getUpdateChannel().getGroup()); return ONSFactory.createProducer(properties); }

@Bean(initMethod \= "start", destroyMethod \= "shutdown",name \= "updateAllProducer")
public Producer buildUpdateAllChannelProducer(){
    Properties properties = rocketMqConfig.getProperties();

properties.put(PropertyKeyConst.GROUP_ID, rocketMqConfig.getUpdateAllChannel().getGroup()); return ONSFactory.createProducer(properties); } }

6、创建者发送消息,监听器接收消息处理

import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import tv.zhangyu.onsconfig.JacksonUtil; import javax.annotation.Resource; import java.util.HashMap; /** * 通知其他机器清除内存中的信息 * @author mujiutian
 * @date 2020/10/26 */ @Slf4j @Component public class ClearChannelProducer { @Value("${rocket.mq.aliyun.clearChannel.topic}") private String topic; private static final String TAGS = "clear_channel_info"; @Resource Producer clearProducer; public void send(String uid){ java.util.Map<String,String> map = new HashMap<>(); map.put("channel_info_uid", uid); String body = JacksonUtil.toJson(map); String messageId = null; boolean ret = true; try { Message msg = new Message(topic, TAGS, body.getBytes()); SendResult sendResult = clearProducer.send(msg); messageId = sendResult.getMessageId(); } catch (Exception e) { ret = false; log.error("doChannelInfoClearProducer send error", e); e.printStackTrace(); } log.info("zhangyu_clear_channel_info MessageId:{}, body:{},ret:{}", messageId, body, ret); } }

import cn.hutool.json.JSONObject; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.sports.live.service.impl.ChannelRoomService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * 接受消息,清除内存中的信息 * @author mujiutian
 * @date 2020/10/26 */ @Slf4j @Component public class ClearChannelListener implements MessageListener {

@Resource

ChannelRoomService channelRoomService; @Override public Action consume(Message message, ConsumeContext consumeContext) { String messageId = message.getMsgID(); String result = new String(message.getBody()); log.info("ChannelInfoClearConsumer start messageId{}, result{}", new Object[]{messageId, result}); return doUpdate(result); }

private Action doUpdate(String json){
    //参数判断

if(json == null){ return Action.CommitMessage; }

    try {
        JSONObject obj = cn.hutool.json.JSONUtil.parseObj(json.trim());

String uid = obj.getStr("channel_info_uid"); if (org.apache.commons.lang3.StringUtils.isNotEmpty(uid)) { channelRoomService.clearChannelInfoFromMem(uid); } } catch (Exception e) { log.error("doChannelInfoClearConsumer error", e); e.printStackTrace(); } return Action.CommitMessage; } }

以上主要是业务逻辑了,其他的service不用考虑缺失与否。

7、自己测试发送消息

@Resource Producer mobileProducer; @GetMapping(value = "/test1") @ResponseBody public String test1(){ mobileProducer.start(); Message message = new Message(); message.setTopic("live_Channel..."); message.setTag("111"); java.util.Map<String,String> map = new HashMap<>(2); map.put("name","chengjian"); map.put("tag","mobile8888"); message.setBody(cn.hutool.json.JSONUtil.toJsonStr(map).getBytes()); mobileProducer.send(message); return "OK"; }

这篇博客的精髓在RocketMqConfig枚举类的使用以及多个topic的配置,它会让你会更接近java的面向编程、抽象思维开发。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
SpringBoot2.X + 阿里云RocketMQ(78)
《SpringBoot2.X心法总纲》(https://my.oschina.net/mdxlcj/blog/3118723)前景:目前很多公司已经都在使用阿里云产品,数据库、监控、k8s、短信业务、直播等,而接下来描述的则是其中之一,如何对接快速使用阿里RocketMQ,而使用的是onsclient初级版本Springboot2.XRo
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这