RocketMQ入门基础

Stella981
• 阅读 743

RocketMQ入门基础

点击箭头处

“JAVA日知录”

,关注并星标哟!!


概述&选型

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要用于三种典型场景:应用解耦流量消峰消息分发

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:

  • 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)

  • 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)

  • 支持18个级别的延迟消息(rabbitmq和kafka不支持)

  • 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)

  • 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)

  • 支持重复消费(rabbitmq不支持,kafka支持)

本文主要介绍RocketMQ的单机安装、双机主从高可用安装配置、运维管理平台搭建、与SpringBoot整合几个知识点,具备相关知识技能的同学请直接拉到最后点个 “在看” 即可。RocketMQ入门基础

文章开始之前需要先准备好JDK1.8或以上的服务器环境以及从rocketmq官网下载好二进制安装包,下载地址http://rocketmq.apache.org/dowloading/releases/

单机安装配置

工欲善其事必先利其器,要想深入了解RocketMQ得先把环境安装好,咱们先开始单机版RocketMQ的安装!

  • 解压安装
    unzip rocketmq-all-4.7.0-bin-release.zip

  • 启动 Name Server
    > nohup sh bin/mqnamesrv &

  • 查看 Name Server启动日志
    > tail -f ~/logs/rocketmqlogs/namesrv.logRocketMQ入门基础

  • 启动 Broker Server
    > nohup sh bin/mqbroker -n localhost:9876 &

  • 查看 Broker Server 启动日志
    > tail -f ~/logs/rocketmqlogs/broker.logRocketMQ入门基础

单机情况下安装使用RocketMQ很简单,只需要分别启动NameServer和Broker Server即可!

关闭RockerMQ需要使用下面的命令:

`# 先关闭Broker Server

sh bin/mqshutdown broker

再关闭NameServer

sh bin/mqshutdown namesrv
`

双机主从高可用搭建

为了消除单机故障,增加可靠性或增大吞吐量,可以在多台服务器上部署多个NameServer和Broker,并为每个Broker部署一个或多个Slave。本节将说明使用两台机器,搭建双主、双从、无单点故障的高可用RocketMQ集群。假设现在有两台服务器,IP地址分别为:192.168.100.43和192.168.100.44,部署架构如下:RocketMQ入门基础

启动多个NameServer 和 Broker

首先需要在两台服务器上分别启动NameServer(nohup sh bin/mqnamesrv &),这样我们就得到了一个无单点的NameServer服务,服务地址为192.168.100.43:9876和192.168.100.44:9876。

然后在两台服务器中RocketMQ的conf目录分别建立两个文件 broker-master.propertiesbroker-slave.properties,下面是不同服务器的配置说明:

  • 192.168.100.43 机器上的broker-master.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = SYNC_MASTER flushDiskType = ASYNC_FLUSH listenPort = 10911 storePathRootDir = /app/rocketmq/store-a

  • 192.168.100.43 机器上的broker-slave.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH listenPort = 11011 storePathRootDir = /app/rocketmq/store-b

  • 192.168.100.44 机器上的broker-master.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-b brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = SYNC_MASTER flushDiskType = ASYNC_FLUSH listenPort = 10911 storePathRootDir = /app/rocketmq/store-b

  • 192.168.100.44 机器上的broker-slave.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876 brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH listenPort = 11011 storePathRootDir = /app/rocketmq/store-a

然后分别使用如下命令启动两台服务器的主节点和从节点
nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &

这样一个高可用的RockerMQ集群就搭建好了,我们登陆可视化运维管理界面查看集群状态,集群正常启动。RocketMQ入门基础

重要参数说明

本节主要是对Broker的配置文件中用到的参数进行说明

  • namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
    指定NameServer的地址,可以是多个。

  • brokerClusterName = DefaultCluster
    Cluster地址,如果集群数量比较多,可以分成多个Cluster,每个Cluster供一个业务群使用。

  • brokerName = broker-a
    Broker的名称,Master 和Slave 通过使用相同的 Broker 名称来表明相互关系,以说明某个Slave 是哪个Master 的 Slave。

  • brokerId = 1
    一个Master可以有多个Slave,0表示Master,大于0的表示不同Slave的ID。

  • fileReservedTime = 48
    在磁盘上保存消息的时长,单位是小时,自动删除超时的消息。

  • deleteWhen = 04
    与 fileReservedTime 参数对应,表明在几点做消息删除动作,默认是凌晨4点。

  • brokerRole = SYNC_MASTER
    brokerRole的可选参数有 SYNC_MASTERASYNC_MASTERSLAVE三种。 SYNCASYNC 表示 MASTERSLAVE 之间同步消息的机制, SYNC的意思是当 SlaveMaster 的消息同步完成后再返回发送成功的状态。

  • flushDiskType = ASYNC_FLUSH
    flushDiskType 表示刷盘策略,可选值有ASYNC_FLUSH 和 SYNC_FLUSH两种,分别代表同步刷盘和异步刷盘。同步情况下,消息只有真正写入磁盘才返回成功状态;异步情况下,消息写入page_cache后就返回成功状态。

  • listenPort = 11011
    Broker监听的端口,一台服务器启动多个Broker,需要设置不同的监听端口避免端口冲突。

  • storePathRootDir = /app/rocketmq/store-a
    存储消息以及配置信息的根目录。

可视化管理平台

RocketMQ可以使用rocketmq-externals作为运维管理平台,Github地址https://github.com/apache/rocketmq-externals,我们需要将源码下载下来后再进行手动编译,过程如下:

  • 下载
    从github(https://github.com/apache/rocketmq-externals) 下载RocketMQ可视化管理工具 rocketmq-externals 的源码;

  • 打包
    下载完成后切换进rocketmq-console目录,使用maven命令对其打包 mvn clean package -Dmaven.test.skip=true ,打包完成后生成可执行文件rocketmq-console-ng-1.0.1.jar

  • 运行
    使用 java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=xxxx.xxx.xxx.xxx:9876 命令启动,这里注意需要设置两个参数: 
    --server.port 为运行的这个web应用的端口,如果不设置的话默认为8080;
    --rocketmq.config.namesrvAddr 为RocketMQ命名服务地址,若NameServer为集群则使用英文 ; 分割

  • 访问
    浏览器访问 xxx.xxx.xxx.xxx:8080 进入控制台界面,效果如下RocketMQ入门基础

SpringBoot整合RocketMQ

在SpringBoot中整合RocketMQ主要用到 rocketmq-spring-boot-starter 组件,下面是详细整合过程。

  • 引入组件 rocketmq-spring-boot-starter 依赖

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>

  • 修改application.yml,添加RocketMQ相关配置

rocketmq: name-server: 192.168.100.43:9876;192.168.100.44:9876 producer: group: test-group send-message-timeout: 3000

如果是集群,多个name-server使用英文 ; 分割。

  • 编写消息生产者 MessageProduce

`/**

  • Description:
  • rocketMQ消息发送方法
  • @author javadaily

*/
@Component
public class MessageProduce {

@Autowired  
private RocketMQTemplate rocketMQTemplate;  

/**  
 * 发送消息  
 * @param topic 主题  
 * @param message 消息体  
 */  
public void sendMessage(String topic,String message){  
    this.rocketMQTemplate.convertAndSend(topic,message);  
}  

}
`

使用RocketMQTemplate发送消息

  • 编写消息消费者 MessageConsumer

@Slf4j @Component @RocketMQMessageListener( topic = "test-topic", consumerGroup = "test-group", selectorExpression = "*" ) public class MessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("received message is {}", message); } }

消费者只需要继承RocketMQListener类即可,主要关注实现类上的 @RocketMQMessageListener 注解,配置的 topicconsumerGroup 需要跟消息生产者的配置保持一致。

  • 编写单元测试发送消息

`@RunWith(SpringRunner.class)
@SpringBootTest
public class MessageProduceTest {
@Autowired
private MessageProduce messageProduce;

@Test  
public void testSendMessage() {  
    messageProduce.sendMessage("test-topic","Hello,JAVA日知录");  
}  

}
`

  • 测试 先启动springboot应用,再执行测试用例。 RocketMQ入门基础

好了,各位朋友们,本期的内容到此就全部结束啦,能看到这里的同学都是优秀的同学,下一个升职加薪的就是你了!
如果觉得这篇文章对你有所帮助的话请扫描下面二维码加个关注。" 转发 " 加 " 在看 ",养成好习惯!咱们下期再见!

RocketMQ入门基础

热文推荐

☞ 数据库优化之SQL优化
☞ 数据库优化之实例优化
☞ Docker基础与实战,看这一篇就够了!
☞ Docker-Compose基础与实战,看这一篇就够了!
OAuth2.0最简向导(多图预警)
☞ 构建三维一体立体化监控体系
☞ SpringCloud实战系列

RocketMQ入门基础

JAVA日知录

长按左边二维码关注我们,精彩文章第一时间推送

         >>>技术交流群<<<

朕已阅 RocketMQ入门基础

本文分享自微信公众号 - JAVA日知录(javadaily)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这