Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 为消息中间件系统为基础,介绍 Spring Cloud Stream 的使用。如果你没有用过消息中间件,可以到 RabbitMQ 的官网看一下,或者参考这个 http://rabbitmq.mr-ping.com/。理解了消息中间件的设计,才能更好的使用它。
消息中间的几大应用场景
1、异步处理
比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。
2、应用解耦:
假设公司有几个不同的系统,各系统在某些业务有联动关系,比如 A 系统完成了某些操作,需要触发 B 系统及 C 系统。如果 A 系统完成操作,主动调用 B 系统的接口或 C 系统的接口,可以完成功能,但是各个系统之间就产生了耦合。用消息中间件就可以完成解耦,当 A 系统完成操作将数据放进消息队列,B 和 C 系统去订阅消息就可以了。这样各系统只要约定好消息的格式就好了。
3、流量削峰
比如秒杀活动,一下子进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,所以针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,然后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。
4、日志处理
kafka 最开始就是专门为了处理日志产生的。
当碰到上面的几种情况的时候,就要考虑用消息队列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream。
使用 Spring Cloud Stream && RabbitMQ
介绍下面的例子之前,假定你已经对 RabbitMQ 有一定的了解。
首先来认识一下 Spring Cloud Stream 中的几个重要概念。
_Destination Binders_:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
_Destination Bindings_:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
_Message_:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
可能看完了上面的三个概念仍然是一头雾水,没有关系,实践过程中自然就明白了。
先来一个最简单的例子
因为用到的是 rabbitmq,所以在本地搭好 rabbitmq 环境,然后装好 rabbitmq-management 插件,这样就可以访问 web UI 界面了,默认是 15672 端口。
1、引用对应 rabbitmq 的 stream 包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2、在 application.yml 中增加配置
spring:
profiles: stream-rabbit-customer-group1
cloud:
stream:
bindings:
input:
destination: default.messages
binder: local_rabbit
output:
destination: default.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 32775
username: guest
password: guest
server:
port: 8201
理解配置文件很重要,基本上理解清楚了配置,也就明白 spring cloud stream 是怎么回事了。
spring.cloud.stream.binders
,上面提到了 stream 的 3 个重要概念的第一个 「Destination binders」。上面的配置文件中就配置了一个 binder,命名为 local_rabbit,指定 type 为 rabbit ,表示使用的是 rabbitmq 消息中间件,如果用的是 kafka ,则 type 设置为 kafka。environment 就是设置使用的消息中间件的配置信息,包括 host、port、用户名、密码等。可以设置多了个 binder,适配不同的场景。
spring.cloud.stream.bindings
,对应上面提到到 「Destination Bindings」。这里面可以配置多个 input 或者 output,分别表示消息的接收通道和发送通道,对应到 rabbitmq 上就是不同的 exchange。这个配置文件里定义了两个input 、两个output,名称分别为 input、log_input、output、log_output。这个名称不是乱起的,在我们的程序代码中会用到,用来标示某个方法接收哪个 exchange 或者发送到哪个 exchange 。
每个通道下的 destination 属性指 exchange 的名称,binder 指定在 binders 里设置的 binder,上面配置中指定了 local_rabbit 。
可以看到 input、output 对应的 destination 是相同的,log_input、log_output 对应的 destination 也相同, 也就是对应相同的 exchange。一个表示消息来源,一个表示消息去向。
另外还可以设置 group 。因为服务很可能不止一个实例,如果启动多个实例,那么没必要每个实例都消费同一个消息,只要把功能相同的实例的 group 设置为同一个,那么就会只有一个实例来消费消息,避免重复消费的情况。如果设置了 group,那么 group 名称就会成为 queue 的名称,如果没有设置 group ,那么 queue 就会根据 destination + 随机字符串的方式命名。
3、接下来做一个最简单的例子,来演示如何接收消息。
首先来介绍一下 stream 内置的简单消息通道(消息通道也就是指消息的来源和去向)接口定义,一个 Source 和 一个 Sink 。
Source.java
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
消息发送通道定义,定义了一个 MessageChannel 类型的 output() 方法,用 @Output
注解标示,并指定了 binding 的名称为 output。
Sink.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
消息接收通道定义,定义了一个 SubscribableChannel 类型的 input() 方法,表示订阅一个消息的方法,并用 @Input
注解标识,并且指定了 binging 的名称为 input 。
创建一个简单的消息接收方法:
@SpringBootApplication
@EnableBinding(value = {Processor.class})
@Slf4j
public class DefaultApplication {
public static void main(String[] args) {
SpringApplication.run(DefaultApplication.class, args);
}
}
在项目启动类上加上注解 @EnableBinding(value = {Processor.class})
,表明启用 stream ,并指定定义的 Channel 定义接口类。
然后,创建一个 service 服务类,用来订阅消息,并对消息进行处理。
@Slf4j
@Component
public class DefaultMessageListener {
@StreamListener(Processor.INPUT)
public void processMyMessage(String message) {
log.info("接收到消息:" + message);
}
}
在方法 processMyMessage()
上使用 @StreamListener
注解,表示对消息进行订阅监控,指定 binding 的名称,其中 Processor.INPUT 就是 Sink 的 input ,也就是字符串 input
,对应的上面的配置文件,就是 spring.cloud.stream.bindings.input。
启动 DefaultApplication ,可以在 rabbitmq 管理控制台的 exchanges 中看到增加的这几个 bindings 。
可以看到 exchange 的名称对应的就是 bindings 的两个 input 和 两个 output 的 destination 的值。
用 rabbitmq UI 控制台发送消息测试
点击上图的 default.input.messages 进入 exchange 详请页面,在 publish message 部分填写上 Payload ,然后点击 Publish message 按钮。
之后回到 DefaultApplication 的输出控制台,会看到消息已经被接收。
模拟一个日志处理
接下来模拟生产者和消费者处理消息的过程,模拟一个日志处理的过程。
- 原始日志发送到 kite.log.messages exchange
- 接收器在 kite.log.messages exchange 接收原始日志,经过处理格式化,发送到 kite.log.format.messages exchange
- 接收器在 kite.log.format.messages exchange 接收格式化后的日志
1、自定义消息通道接口,上面介绍了 stream 自带的 Sink 和 Source,也仅仅能做个演示,真正的业务中还是需要自己定义更加灵活的接口。
@Component
public interface MyProcessor {
String MESSAGE_INPUT = "log_input";
String MESSAGE_OUTPUT = "log_output";
String LOG_FORMAT_INPUT = "log_format_input";
String LOG_FORMAT_OUTPUT = "log_format_output";
@Input(MESSAGE_INPUT)
SubscribableChannel logInput();
@Output(MESSAGE_OUTPUT)
MessageChannel logOutput();
@Input(LOG_FORMAT_INPUT)
SubscribableChannel logFormatInput();
@Output(LOG_FORMAT_OUTPUT)
MessageChannel logFormatOutput();
}
2、创建消费者应用
**配置文件如下 **:
spring:
profiles: stream-rabbit-customer-group1
cloud:
stream:
bindings:
log_input:
destination: kite.log.messages
binder: local_rabbit
group: logConsumer-group1
log_output:
destination: kite.log.messages
binder: local_rabbit
group: logConsumer-group1
log_format_input:
destination: kite.log.format.messages
binder: local_rabbit
group: logFormat-group1
log_format_input:
destination: kite.log.format.messages
binder: local_rabbit
group: logFormat-group1
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 32775
username: guest
password: guest
server:
port: 8201
此配置文件要参照 MyProcessor 接口查看,定义了 4 个 binding,但是 destination 两两相同,也就是两个 exchange。
创建 spring boot 启动类
@SpringBootApplication
@EnableBinding(value = {MyProcessor.class})
@Slf4j
public class CustomerApplication {
public static void main(String[] args) {
SpringApplication.run(CustomerApplication.class, args);
}
}
用 @EnableBinding(value = {MyProcessor.class}) 注解引入 MyProcessor
创建消息接收处理服务
@Slf4j
@Component
public class LogMessageListener {
/**
* 通过 MyProcessor.MESSAGE_INPUT 接收消息
* 然后通过 SendTo 将处理后的消息发送到 MyProcessor.LOG_FORMAT_OUTPUT
* @param message
* @return
*/
@StreamListener(MyProcessor.MESSAGE_INPUT)
@SendTo(MyProcessor.LOG_FORMAT_OUTPUT)
public String processLogMessage(String message) {
log.info("接收到原始消息:" + message);
return "「" + message +"」";
}
/**
* 接收来自 MyProcessor.LOG_FORMAT_INPUT 的消息
* 也就是加工后的消息,也就是通过上面的 SendTo 发送来的
* 因为 MyProcessor.LOG_FORMAT_OUTPUT 和 MyProcessor.LOG_FORMAT_INPUT 是指向同一 exchange
* @param message
*/
@StreamListener(MyProcessor.LOG_FORMAT_INPUT)
public void processFormatLogMessage(String message) {
log.info("接收到格式化后的消息:" + message);
}
}
3、创建一个消息生产者,用于发送原始日志消息
配置文件:
spring:
cloud:
stream:
bindings:
log_output:
destination: kite.log.messages
binder: local_rabbit
group: logConsumer-group1
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 32775
username: guest
password: guest
server:
port: 8202
仅仅指定了一个 binding log_output,用来发送消息,如果只做生产者就不要指定 log_input,如果指定了 log_input ,应用就会认为这个生产者服务也会消费消息,如果这时没有在此服务中订阅消息,当消息被发送到这个服务时,因为并没有订阅消息,也就是没有 @StreamListener 注解的方法,就会出现如下异常:
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel
创建 spring boot 启动类
@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MyMessageController {
@Autowired
private MyProcessor myProcessor;
@GetMapping(value = "sendLogMessage")
public void sendLogMessage(String message){
Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build();
myProcessor.logOutput().send(stringMessage);
}
}
同样的引入 @EnableBinding(value = {MyProcessor.class})
创建一个 Controller 用来发送消息
@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MyMessageController {
@Autowired
private MyProcessor myProcessor;
@GetMapping(value = "sendLogMessage")
public void sendLogMessage(String message){
Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build();
myProcessor.logOutput().send(stringMessage);
}
}
之后,访问链接:
http://localhost:8202/sendLogMessage?message=原始日志
可以在消费服务端看到如下输出:
其他
消息除了可以是字符串类型,还可以是其他类型,也可以是实体类型,例如
@GetMapping(value = "sendObjectLogMessage")
public void sendObjectLogMessage() {
LogInfo logInfo = new LogInfo();
logInfo.setClientIp("192.168.1.111");
logInfo.setClientVersion("1.0");
logInfo.setUserId("198663383837434");
logInfo.setTime(Date.from(Instant.now()));
Message < LogInfo > stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(logInfo).build();
myProcessor.logOutput().send(stringMessage);
}
上面代码发送了一个 LogInfo 实体对象,在消费者端依然可以用字符串类型接收,因为 @StreamListener 注解会默认把实体转为 json 字符串。
另外,可以试着启动两个消费者端,把 group 设置成相同的,这时,发送的消息只会被一个消费者接收。
如果把 group 设置成不一样的,那么发送的消息会被两个消费者接收。
还可以看其他 Spring Cloud 系列:
Spring Cloud Eureka 实现高可用服务发现注册中心
Spring Cloud Config 配置中心 看这一篇就够了
服务注册发现、配置中心集一体的 Spring Cloud Consul
不要吝惜你的「推荐」呦
欢迎关注,不定期更新本系列和其他文章 古时的风筝
,进入公众号可以加入交流群