什么是消息总线?
在微服务架构的系统中, 我们通常会使用轻量级的消息代理来构建 一 个共用的消息主 题让系统中所有微服务实例都连接上来, 由于该主题中产生的消息会被所有实例监听和消 费, 所以我们称它为消息总线。 在总线上的各个实例都可以方便地广播 一 些需要让其他连 接在该主题上的实例都知道的消息, 例如配置信息的变更或者其他 一 些管理操作等。 由于消息总线在微服务架构系统中被广泛使用, 所以它同配置中心 一 样, 几乎是微服 务架构中的必备组件。 Spring Cloud 作为微服务架构综合性的解决方案,对此自然也有自己 的实现, 这就是本章我们将要具体介绍的 Spring Cloud Bus 。 通过使用 Spring Cloud Bus,
可以非常容易地搭建起消息总线, 同时实现了 一 些消息总线中的常用功能, 比如, 配合 Spring Cloud Config 实现微服务应用配置信息的动态更新等。
消息代理
消息代理 (Message Broker) 是 一 种消息验证、 传输、 路由的架构模式。 它在应用程序 之间起到通信调度并最小化应用之间的依赖的作用, 使得应用程序可以高效地解耦通信过 程。 消息代理是 一 个中间件产品, 它的核心是 一 个消息的路由程序, 用来实现接收和分发 消息, 并根据设定好的消息处理流来转发给正确的应用。 它包括独立的通信和消息传递协 议, 能够实现组织内部和组织间的网络通信。
• 将消息路由到 一 个或多个目的地。
• 消息转化为其他的表现方式。
• 执行消息的聚集、 消息的分解, 并将结果发送到它们的目的地, 然后重新组合响应
返回给消息用户。
• 调用Web服务来检索数据。
• 响应事件或错误。
• 使用发布-订阅模式来提供内容或基千主题的消息路由。
目前已经有非常多的开源产品可以供大家使用, 比如:
• ActiveMQ
• Kafka
• RabbitMQ
• RocketMQ
当前版本的Spring Cloud Bus仅支待两款中间件产品: RabbitMQ和Kafka。
RabbitMQ整合Spring Cloud Bus
因为SpringCloud基千Spring Boot, 在上 一 节中我们已经体验了Spring Boot与 RabbitMQ的整 合, 所以在SpringCloud Bus中使用RabbitMQ也是非常容易配置的。
下面我们来具体动手尝试整个配置过程。
• 准备工作: 这里我们不创建新的应用, 但需要用到上 一 章中已经实现的 关于Spring Cloud Config的几个工程, 若读者对其还不了解, 建议先阅读第8章的内容。
• con巨g-repo: 定义在Git仓库中的 一 个目录,其中存储了应用名为中dispace 的多环境配置文件, 配置文件中有 一 个 from参数。
• config-server-eureka: 配置了Git仓库, 并注册到了Eureka的服务端。
• con巨g-c巨ent-eureka: 通过 Eureka发现ConfigServer的客户端, 应用名 为 中也space, 用来访问配置服务器以获取配置信息。 该应用中提供了 一 个 /from接口, 它会获取 con巨g-repo/di尘space-dev.properties中的 from属性并返回。
306 第9章 · 消息总线: Spring Cloud Bus
• 扩展con巨g-c巨ent-eureka应用 。
• 修改pom.xml, 增加spring-cloud-s迳江er-bus-amqp模块(注意spring- boot-starter-actuator模块也是必需的, 用 来提供刷新端点)。
• 在配置文件中增加关千RabbitMQ的连接和用户信息。
spring.rabbitrnq.host=localhost
spring.rabbitrnq.port=5672
spring.rabbitrnq.usernarne = springcloud
spring.rabbi 七 mq.password = l23456
• 启动 config-server-eureka, 再启动 两个con丘g-c巨ent-eureka C分别在
不同的端口上, 比如7002、 7003)。 我们可以在 config-c巨ent-eureka中的控 制台中看到如下内容, 在启动时, 客户端程序多了 一 个/bus/refresh请求。
o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/bus/refresh],methods=[POST]}" onto public void org.springfrarnework.cloud.bus.endpoint.RefreshBusEndpoint.refresh(java.lang.String)
• 先访问两个con巨g-c止en七-eureka的/from 请求, 会返回当前config repo/d沁ispace-dev.proper巨es中的from属性。
• 接着, 修改con巨g-repo/di中space-dev .properties中的from属性值, 并发送POST请求到其中的 一 个/bus/refresh。
• 最后, 再分别访问启动的两个con巨g-c归ent-eureka的/from请求, 此时这 两个请求都会返回最新的 con丘g-repo/di中space-dev.proper巨es 中的 from属性。 到 这里, 我们已经能够通过SpringCloud Bus来实时更新 总线上的属性配置了 。
原理分析
当我们将系统启动起来之后, 图中"Service A"的三个实例会请求ConfigServer以 获 此时, 若我们需要修改"Service A"的属性。
首先, 通过Git管理工具去仓库中修改 对应的属性值,但是这个修改并不会触发"Service A"实例的属性更新。我们向"Service A" 的实例3发送POST请求, 访问/bus/refresh接口。 此时,"Service A"的实例3就会 将刷新请求发送到消息总线中, 该消息事件会被"Service A"的实例l和实例2从总线中 获取到, 并重新从 ConfigServer中获取它们的配置信息, 从而实现配置信息的动态更新。
而从Git仓库中配置的修改到发 起 /bus/refresh的POST请求这一步可以通过Git 所以在 Web Hook中就不需要维护所有节点内容来进行更新, 从而解决了上一章中仅通过 Web Hook来逐个进行刷新的问题。
指定刷新范围
在上面的例子中,我们通过向服务实例 请求SpringCloud Bus的/bus/refresh接口, 从而触发 总线上其他服务实例的/refresh。 但是在 一 些特殊场景下, 我们希望可以刷新 微服务中某个具体实例的配置。
Spring Cloud Bus 对这种场景也有很好的支待, /bus/refresh 接口提供了 一 个 destination参数, 用来定位具体要 刷新的应用程序。 比如, 我们可以请求 /bus/refresh?destination = customers:9000, 此时总线上的各应用实例会根据 destination属性的值来判断是否为自己的实例名, 若符合才进行配置刷新, 若不符合 就忽略该消息。
destination参数除了可以定位具体的实例之外, 还可以用来定位具体的服务。 定 位服 务 的原理是通过使用 Spring的PathMatcher (路径正则匹配)来实现的 , 比如 /bus/refresh?destination = customers:**, 该请求会触发 customers服务的所 有实例进行刷新。
架构优化
既然SpringCloud Bus的 /bus/refresh接口提供了针对服务和实例进行配置更新的 参数, 那么我们的架构也可以相应做出 一 些调整。 在之前的架构中, 服务的配置更新需要 通过向具体服务中的某个实例发送请求, 再触发对整个服务集群的配置更新。 虽然能实现 功能, 但是这样的结果是, 我们指定的应用实例会不同千集群中的其他应用实例, 这样会 增加集群内部的复杂度, 不利于将来的运维工作。 比如, 需要对服务实例进行迁移, 那么 我们不得不修改Web Hook中的配置等。 所以要尽可能地让服务集群中的各个节点是对等 的。
因此, 我们将之前的架构做了 一 些调整, 如下图所示。
我们主要做了以下这些改动:
1.在ConfigServer中也引入SpringCloud Bus, 将配置服务端也加入到消息总线中来。
2. /bus/refresh请求不再发送到具体服务实例上, 而是发送给Config Server, 并 通过destination参数来指定需要更新配置的服务或实例。
通过上面的改动,我们的服务实例不需要再承担触发配置更新的职责。 同时, 对于Git 的触发等配置都只需要针对ConfigServer即可, 从而简化了集群上的 一 些维护工作。
Kafka整合Spring Cloud Bus
在介绍Kafka之前,我们已经通过引入spring-cloud-starter-bus-amqp模块, 完成了使用RabbitMQ来实现消息总线。若我们要使用Kafka来实现消息总线 时, 只需把 spring-cloud-starter-bus-amqp替换成spring-cloud-starter-bus-kafka 模块,在pom.xml的dependency节点中进行修改, 具体如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
如果在启动 Kafka 时均采用了默认配置, 那么我们不需要再做任何其他配置就能在本 地实现从 RabbitMQ 到 Kafka 的切换。可以尝试把刚刚搭建的 ZooKeeper 、 Kafka 启动起来, 并将修改为 spring-cloud-starter-bus-kafka 模块的 config-server 和 config-cotent 启动起来。
在 config-server 启动时, 我们可以在控制台中看到如下输出:
2016-09-28 22:11:29.627 INFO 15144 --- [ main]
o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound:
springCloudBus
2016-09-28 22:11:29.642 INFO 15144 --- [-localhost:2181]
org.IOitec.zkclien.ZkEventThread : Starting ZkClient event thread.
016-09-28 22:11:30.290 INFO 15144 --- [ main]
o.s.i.kafka.support.ProducerFactoryBean : Using producer properties =>
{bootstrap.servers=localhost:9092, linger.ms=O, acks=l, compression.type=none,
batch.size=l6384)
2016-09-28 22:11:30.298 INFO 15144 --- [ main]
o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
2016-09-28 22: 11: 30. 322 INFO 15144 --- [ main)
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding
{message-handler:ou七bound.springCloudBus} as a subscriber to the'springCloudBusOutput'
channel
2016-09-28 22: 11: 30. 322 INFO 15144 --- [ main)
o.s.integration.channel.DirectChannel : Channel
'config-server:7001.springCloudBusOutput' has 1 subscriber(s).
2016-09-28 22:11:30.322 INFO 15144 --- [ main)
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
2016-09-28 22: 11: 31. 465 INFO 15144 --- [ main]
s.i.k.i.KafkaMessageDrivenChannelAdapter : started
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@4178
cb34
2016-09-28 22: 11: 31. 467 INFO 15144 --- [ main]
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : Adding
{message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282aOd5a
30b) as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:11:31.467 INFO 15144 --- [ main]
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : started
inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b
从控制台的输出内容我们可以看到, config-server 连接到了 Kafka 中, 并使用了
名为 springCloudBus 的 Topico 此时, 我们可以使用 kafka-topics --list --zookeeper localhost:2181 命令来查看当前 Kafka 中的 Topic 。 若已成功启动了 config-server 并配置正确, 可以 在 Kafka 中看到已经多了 一 个名为 springCloudBus 的 Topico
我们再启动配置了 spring-cloud-starter-bus- kafka 模块的 config-client, 可以看到控制台中输出了如下内容:
2016-09-28 22:43:55.067 INFO 6136 --- [ main]
o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound:
springCloudBus
2016-09-28 22:43:55.078 INFO 6136 --- [-localhost:2181]
org.IOitec.zkclient.ZkEventThread : Starting ZkClient event thread.
2016-09-28 22:50:38.584 INFO 828 --- [ main]
o.s.i.kafka.support.ProducerFactoryBean : Using producer proper七ies =>
{bootstrap.servers=localhost:9092, manger.ms=O, acks=l, compression.type=none,
batch.size=l6384}
2016-09-28 22:50:38.592 INFO 828 --- [ main]
o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
2016-09-28 22:50:38.615 INFO 828 --- [ main]
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding
{message-handler: outbound. springCloudBus} as a subscriber to the 'springCloudBusOutput'
channel
2016-09-28 22:50:38.616 INFO 828 --- [ main]
o.s.integration.channel.DirectChannel : Channel
'di立space:7002.springCloudBusOutput' has 1 subscriber(s).
2016-09-28 22:50:38.616 INFO 828 --- [ main]
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
2016-09-28 22:50:39.162 INFO 828 --- [ main)
s.i.k.i.KafkaMessageDrivenChannelAdapter : started
org.springframework.in七egration.kafka.inbound.KafkaMessageDrivenChannelAdapter@60cf
855e
2016-09-28 22:50:39.162 INFO 828 --- [ main)
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : Adding
{message-handler:inbound.springClouc!Bus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee
216) as a subscriber to the 'bridge.springClouc!Bus' channel
2016-09-28 22:50:39.163 INFO 828 --- [ main)
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : started
inbound.springClouc!Bus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216
可以看到, config-content 启 动时输出了类似的内容, 它们都订阅了名为 springCloudBus 的 Topic 。 从这里我们也可以知道, 在消息总线上的节点, 从结构上来 说, 不论是 config-server 还是 config-content, 它们都是对等的。 在启动了 config-server 和 config-c让ent 之后,为了更明显地观察消息总线刷 新配置 的效果, 我们可以在本地启动多个不同端口的 config-content 。 此时, 我们的 config-server 以及多个 config-content 都已经连接到了由 Kafka 实现的消息总线 上。 我们可以先访问各个 con巨g-c让ent 上 的 /from 请求, 查看它获取到的配置内容。 然后, 修改 Git 中对应的参数内容, 再访问各个 con丘g-c巨ent 上的 /from 请求, 可以 看到配置内容并没有改变。 最后, 我们向 con巨g-server 发 送 POST 请 求: /bus/ refresh, 此时再去访间各个config-content 上的 /from请求, 就能获得最新 的配置信息,各客户端上的配置都已经加载为最新的Git 配置内容。
从config-client的控制台中, 我们可以看到如下内容:
2016-09-29 08:20:34.361 INFO 21256 --- [ kafka-binder-1]
o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [from)
RefreshListener监听类记录了收到远程刷新请求, 并刷新了from属性的日志。
深入理解
启动消费者控制台之后,我们向con丘g-server发送POST请求: /bus/refresh, 此时在控制台中可以看到类似如下的内容:
content Type "application/json"
{
"type": "RefreshRemoteApplicationEvent",
"timestamp": 1475073160814,
"originService": "config-server: 7001",
"destinationService": "*: * *",
"id": "bbfbf495-39d8-4ff9-93d6-174873ff7299"
}
contentType "application/json"
{
“type": "AckRemoteApplicationEvent",
"timestamp": 1475073160821,
"originService": "config-server: 7001",
"destinationService": "*: **",
"id": "lf794774-10d6-4140-a80d-470983c6c0ff",
"ackid": "bbfbf495-39d8-4ff9-93d6-174873ff7299",
"ackDestinationService": "*: **",
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}
contentType "application/json"
{
"type": "AckRemoteApplicationEvent",
"timestamp": 1475075467554,
"originService": "didispace: 7002",
"destinationService": "*: **",
"id": "7 56015le-f60c-4 9cd-8167-b69le84 6ad08",
"ackid": "21502725-28f5-4dl 9-a98a-f8114fa4 fldc",
"ackDestinationService": "*:**",
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}
• type: 消息的事件类型。在上面的例子中,包含了RefreshRemoteApplicationEvent 和AckRemoteAppticationEvent。其中,RefreshRemoteApplicationEvent 事件就是我们用来刷新配置的事件,而AckRemoteApp巨ca巨onEvent是响应消 息已经正确接收的告知消息 事件。
• timestamp: 消息的时间戳。
• originService: 消息的来源服务实例。
• destinationService: 消息的目标服务实例。 上面示例中的 * : ** 代表了总线上 的所有服务实例。 如果想要指定服务或是实例, 在之前介绍RabbitMQ实现消息总 线时已经提过, 只需要通过使用des巨nation参数来定位具体要刷新的应用实例 即可, 比如发起/bus/refresh?des巨nation = d沁ispace请求, 就可以得到 如下的刷新事件消息,其中destinationService为didispace:**, 表示总 线上所有didispace服务的实例。
content Type "application/json"
{
"type": "RefreshRemoteApplicationEvent",
"timestamp": 1475131215007,
"originService": "config-server: 7001",
"destinationService": "didispace:**",
"id": "667fe948-e9b2-447f-be22-3c8acf647ead"
}
• id: 消息的唯 一 标识。
上面的消息内容是RefreshRemoteApplicationEvent和AckRemoteApplicationEvent 类型共有的, 下面几个属性是AckRemoteApplicationEvent所特有的,分别表示如下 含义。
• ackid:Ack消息对应的消息来源。我们可以看到第 一 条AckRemoteApplication Event的ackid对应了 RefreshRemoteApplicationEvent的id, 说明这条 Ack是告知该 RefreshRemoteApplicationEvent事件的消息已经被收到。
• ackDestinationService: Ack 消息的目标服务实例。 可以看到这里使用的是 * : ** , 所以消息总线上所有的实例都会收到该Ack消息。
• event: Ack 消息的来源事件。 可以看到上例中的两个Ack均来源于刷新配置的 RefreshRemoteApplicationEvent事件, 我们在测试的时候由于启动了两个 config-client, 所以有两个实例接收到了配置刷新事件, 同时它们都会返回 一 个 Ack消息 。 由于ackDestinationService为 * : ** , 所以两个 config-client 都会收到对RefreshRemoteApplicationEvent事件的Ack消息。