RocketMQ入门案例

Stella981
• 阅读 739

  学习RocketMQ,先写一个Demo演示一下看看效果。

一、服务端部署

  因为只是简单的为了演示效果,服务端仅部署单Master模式 —— 一个Name Server节点,一个Broker节点。主要有以下步骤。

  1. 下载RocketMQ源码、编译(也可以网上下载编译好的文件),这里使用最新的4.4.0版本,下载好之后放在Linux上通过一下命令解压缩、编译。

    unzip rocketmq-all-4.4.0-source-release.zip
    cd rocketmq-all-4.4.0/
    mvn -Prelease-all -DskipTests clean install –U
    
  2. 编译之后到distribution/target/apache-rocketmq目录,后续所有操作都是在该路径下。

    cd distribution/target/apache-rocketmq
    
  3. 启动Name Server,查看日志确认启动成功。

    nohup sh bin/mqnamesrv &
    tail -f ~/logs/rocketmqlogs/namesrv.log
    
  4. 启动Broker,查看日志确认启动成功。

    nohup sh bin/mqbroker -n localhost:9876 &
    tail -f ~/logs/rocketmqlogs/broker.log
    

  Name Server和Broker都成功启动,服务器就部署完成了。更详细的参考官方文档手册,里面还包含在服务器上运行Producer、Customer示例,这里主要是在项目中使用。

  官网手册戳这里:Quick Start

二、客户端搭建:Spring Boot项目中使用

  客户端分为消息生产者和消息消费者,这里通过日志打印输出查看效果,为了看起来更清晰,我新建了两个模块分别作为消息生产者和消息消费者。

  1. 添加依赖,在两个模块的pom文件中添加以下配置。

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.4.0</version>
    </dependency>
    
  2. 配置生产者模块。

    • application.yml文件中增加用来初始化producer的相关配置,这里只配了一部分,更详细的配置参数可以查看官方文档。

      # RocketMQ生产者
      rocketmq:
        producer:
          # Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER
          producerGroup: ${spring.application.name}
          # namesrv地址
          namesrvAddr: 192.168.101.213:9876
          # 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB
          maxMessageSize: 4096
          # 发送消息超时时间,单位毫秒。默认10000
          sendMsgTimeout: 5000
          # 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2
          retryTimesWhenSendFailed: 2
          # 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096
          compressMsgBodyOverHowmuch: 4096
          # 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
          createTopicKey: XIAO_LIU
      
    • 新增producer配置类,系统启动时读取yml文件的配置信息初始化producer。集群模式下,如果在同一个jvm中,要往多个的MQ集群发送消息,则需要创建多个的producer并设置不同的instanceName,默认不需要设置该参数。

      @Configuration
      public class ProducerConfiguration {
          private static final Logger LOGGER = LoggerFactory.getLogger(ProducerConfiguration.class);
      
          /**
           * Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER
           */
          @Value("${rocketmq.producer.producerGroup}")
          private String producerGroup;
          /**
           * namesrv地址
           */
          @Value("${rocketmq.producer.namesrvAddr}")
          private String namesrvAddr;
          /**
           * 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB
           */
          @Value("${rocketmq.producer.maxMessageSize}")
          private Integer maxMessageSize;
          /**
           * 发送消息超时时间,单位毫秒。默认10000
           */
          @Value("${rocketmq.producer.sendMsgTimeout}")
          private Integer sendMsgTimeout;
          /**
           * 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2
           */
          @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
          private Integer retryTimesWhenSendFailed;
          /**
           * 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096
           */
          @Value("${rocketmq.producer.compressMsgBodyOverHowmuch}")
          private Integer compressMsgBodyOverHowmuch;
          /**
           * 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
           */
          @Value("${rocketmq.producer.createTopicKey}")
          private String createTopicKey;
      
          @Bean
          public DefaultMQProducer getRocketMQProducer() {
      
              DefaultMQProducer producer = new DefaultMQProducer(this.producerGroup);
              producer.setNamesrvAddr(this.namesrvAddr);
              producer.setCreateTopicKey(this.createTopicKey);
      
              if (this.maxMessageSize != null) {
                  producer.setMaxMessageSize(this.maxMessageSize);
              }
              if (this.sendMsgTimeout != null) {
                  producer.setSendMsgTimeout(this.sendMsgTimeout);
              }
              if (this.retryTimesWhenSendFailed != null) {
                  producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
              }
              if (this.compressMsgBodyOverHowmuch != null) {
                  producer.setCompressMsgBodyOverHowmuch(this.compressMsgBodyOverHowmuch);
              }
              if (Strings.isNotBlank(this.createTopicKey)) {
                  producer.setCreateTopicKey(this.createTopicKey);
              }
      
              try {
                  producer.start();
      
                  LOGGER.info("Producer Started : producerGroup:[{}], namesrvAddr:[{}]"
                          , this.producerGroup, this.namesrvAddr);
              } catch (MQClientException e) {
                  LOGGER.error("Producer Start Failed : {}", e.getMessage(), e);
              }
              return producer;
          }
      
      }
      
    • 使用producer实例向MQ发送消息。

      @RunWith(SpringRunner.class)
      @SpringBootTest
      public class ProducerServiceApplicationTests {
          private static final Logger LOGGER = LoggerFactory.getLogger(ProducerServiceApplicationTests.class);
          @Autowired
          private DefaultMQProducer defaultMQProducer;
      
          @Test
          public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
              for (int i = 0; i < 100; i++) {
                  User user = new User();
                  user.setUsername("用户" + i);
                  user.setPassword("密码" + i);
                  user.setSex(i % 2);
                  user.setBirthday(new Date());
                  Message message = new Message("user-topic", "user-tag", JSON.toJSONString(user).getBytes(RemotingHelper.DEFAULT_CHARSET));
                  SendResult sendResult = defaultMQProducer.send(message);
                  LOGGER.info(sendResult.toString());
              }
          }
      }
      
  3. 配置消费者模块。

    • application.yml文件中增加用来初始化consumer的相关配置,同样参数这里只配了一部分,更详细的配置参数可以查看官方文档。

      # RocketMQ消费者
      rocketmq:
        consumer:
          # Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组。默认DEFAULT_CONSUMER
          consumerGroup: ${spring.application.name}
          # namesrv地址
          namesrvAddr: 192.168.101.213:9876
          # 消费线程池最大线程数。默认10
          consumeThreadMin: 10
          # 消费线程池最大线程数。默认20
          consumeThreadMax: 20
          # 批量消费,一次消费多少条消息。默认1
          consumeMessageBatchMaxSize: 1
          # 批量拉消息,一次最多拉多少条。默认32
          pullBatchSize: 32
          # 订阅的主题
          topics: user-topic
      
    • 新增consumer配置。

      @Configuration
      public class ConsumerConfiguration {
          private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerConfiguration.class);
      
          @Value("${rocketmq.consumer.consumerGroup}")
          private String consumerGroup;
          @Value("${rocketmq.consumer.namesrvAddr}")
          private String namesrvAddr;
          @Value("${rocketmq.consumer.consumeThreadMin}")
          private int consumeThreadMin;
          @Value("${rocketmq.consumer.consumeThreadMax}")
          private int consumeThreadMax;
          @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
          private int consumeMessageBatchMaxSize;
          @Value("${rocketmq.consumer.pullBatchSize}")
          private int pullBatchSize;
          @Value("${rocketmq.consumer.topics}")
          private String topics;
      
          private final ConsumeMsgListener consumeMsgListener;
      
          @Autowired
          public ConsumerConfiguration(final ConsumeMsgListener consumeMsgListener) {
              this.consumeMsgListener = consumeMsgListener;
          }
      
          @Bean
          public DefaultMQPushConsumer getRocketMQConsumer() {
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
              consumer.setNamesrvAddr(namesrvAddr);
              consumer.setConsumeThreadMin(consumeThreadMin);
              consumer.setConsumeThreadMax(consumeThreadMax);
              consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
              consumer.setPullBatchSize(pullBatchSize);
              consumer.registerMessageListener(consumeMsgListener);
      
              try {
                  /**
                   * 设置消费者订阅的主题和tag。subExpression参数为*表示订阅该主题下所有tag,
                   * 如果需要订阅该主题下的指定tag,subExpression设置为对应tag名称,多个tag以||分割,例如"tag1 || tag2 || tag3"
                   */
                  consumer.subscribe(topics, "*");
                  consumer.start();
      
                  LOGGER.info("Consumer Started : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr);
              } catch (Exception e) {
                  LOGGER.error("Consumer Start Failed : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr, e);
                  e.printStackTrace();
              }
              return consumer;
          }
      }
      
    • 新增消息监听器,监听到新消息后,执行对应的业务逻辑。

      @Component
      public class ConsumeMsgListener implements MessageListenerConcurrently {
          private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeMsgListener.class);
      
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
              if (CollectionUtils.isEmpty(msgs)) {
                  LOGGER.info("Msgs is Empty.");
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
              for (MessageExt msg : msgs) {
                  try {
                      if ("user-topic".equals(msg.getTopic())) {
                          LOGGER.info("{} Receive New Messages: {}", Thread.currentThread().getName(), new String(msg.getBody()));
                          // do something
                      }
                  } catch (Exception e) {
                      if (msg.getReconsumeTimes() == 3) {
                          // 超过3次不再重试
                          LOGGER.error("Msg Consume Failed.");
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      } else {
                          // 重试
                          return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                      }
                  }
              }
      
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      }
      

三、效果

  1. 运行生产者测试代码。系统启动时初始化Producer,然后执行测试代码,往MQ中发送消息。效果如下:
    RocketMQ入门案例

  2. 启动消费者服务。系统启动时先初始化Customer。此时1.已经往MQ中发送了一些消息,监听器监听到MQ中有消息,随即马上消费消息。
    RocketMQ入门案例

四、总结

  Demo很简单,但是里面还有很多东西需要慢慢研究。

  代码可以戳这里:spring-cloud-learn

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
RocketMq sendDefaultImpl call timeout异常解决
今天部署完RocketMq,使用客户端进行发送消息发现,异常如下:RemotingTooMuchRequestException:sendDefaultImplcalltimeout这肯定是produce没有连上Rocketmq的broker,按照rocketmq的官网实例进行检查,发现跟例子没啥区别,题外话:这里要吐槽一下阿里相关开源产品的文档
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
ES6 新增的数组的方法
给定一个数组letlist\//wu:武力zhi:智力{id:1,name:'张飞',wu:97,zhi:10},{id:2,name:'诸葛亮',wu:55,zhi:99},{id:3,name:'赵云',wu:97,zhi:66},{id:4,na
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这