Spring Cloud Bus 消息总线

Stella981
• 阅读 549

什么是消息总线?

在微服务架构的系统中, 我们通常会使用轻量级的消息代理来构建 一 个共用的消息主 题让系统中所有微服务实例都连接上来, 由于该主题中产生的消息会被所有实例监听和消 费, 所以我们称它为消息总线。 在总线上的各个实例都可以方便地广播 一 些需要让其他连 接在该主题上的实例都知道的消息, 例如配置信息的变更或者其他 一 些管理操作等。 由于消息总线在微服务架构系统中被广泛使用, 所以它同配置中心 一 样, 几乎是微服 务架构中的必备组件。 Spring Cloud 作为微服务架构综合性的解决方案,对此自然也有自己 的实现, 这就是本章我们将要具体介绍的 Spring Cloud Bus 。 通过使用 Spring Cloud Bus,

可以非常容易地搭建起消息总线, 同时实现了 一 些消息总线中的常用功能, 比如, 配合 Spring Cloud Config 实现微服务应用配置信息的动态更新等。

消息代理

消息代理 (Message Broker) 是 一 种消息验证、 传输、 路由的架构模式。 它在应用程序 之间起到通信调度并最小化应用之间的依赖的作用, 使得应用程序可以高效地解耦通信过 程。 消息代理是 一 个中间件产品, 它的核心是 一 个消息的路由程序, 用来实现接收和分发 消息, 并根据设定好的消息处理流来转发给正确的应用。 它包括独立的通信和消息传递协 议, 能够实现组织内部和组织间的网络通信。

• 将消息路由到 一 个或多个目的地。

• 消息转化为其他的表现方式。

• 执行消息的聚集、 消息的分解, 并将结果发送到它们的目的地, 然后重新组合响应

返回给消息用户。

• 调用Web服务来检索数据。

• 响应事件或错误。

• 使用发布-订阅模式来提供内容或基千主题的消息路由。

目前已经有非常多的开源产品可以供大家使用, 比如:

• ActiveMQ

• Kafka

• RabbitMQ

• RocketMQ

当前版本的Spring Cloud Bus仅支待两款中间件产品: RabbitMQ和Kafka。

RabbitMQ整合Spring Cloud Bus

因为SpringCloud基千Spring Boot, 在上 一 节中我们已经体验了Spring Boot与 RabbitMQ的整 合, 所以在SpringCloud Bus中使用RabbitMQ也是非常容易配置的。

下面我们来具体动手尝试整个配置过程。

• 准备工作: 这里我们不创建新的应用, 但需要用到上 一 章中已经实现的 关于Spring Cloud Config的几个工程, 若读者对其还不了解, 建议先阅读第8章的内容。

• con巨g-repo: 定义在Git仓库中的 一 个目录,其中存储了应用名为中dispace 的多环境配置文件, 配置文件中有 一 个 from参数。

• config-server-eureka: 配置了Git仓库, 并注册到了Eureka的服务端。

• con巨g-c巨ent-eureka: 通过 Eureka发现ConfigServer的客户端, 应用名 为 中也space, 用来访问配置服务器以获取配置信息。 该应用中提供了 一 个 /from接口, 它会获取 con巨g-repo/di尘space-dev.properties中的 from属性并返回。

306 第9章 · 消息总线: Spring Cloud Bus

• 扩展con巨g-c巨ent-eureka应用 。

• 修改pom.xml, 增加spring-cloud-s迳江er-bus-amqp模块(注意spring- boot-starter-actuator模块也是必需的, 用 来提供刷新端点)。

org.springframework.cloud

spring-cloud-starter-bus-amqp

• 在配置文件中增加关千RabbitMQ的连接和用户信息。

spring.rabbitrnq.host=localhost

spring.rabbitrnq.port=5672

spring.rabbitrnq.usernarne = springcloud

spring.rabbi 七 mq.password = l23456

• 启动 config-server-eureka, 再启动 两个con丘g-c巨ent-eureka C分别在

不同的端口上, 比如7002、 7003)。 我们可以在 config-c巨ent-eureka中的控 制台中看到如下内容, 在启动时, 客户端程序多了 一 个/bus/refresh请求。

o.s.b.a.e.mvc.EndpointHandlerMapping : Mapped "{[/bus/refresh],methods=[POST]}" onto public void org.springfrarnework.cloud.bus.endpoint.RefreshBusEndpoint.refresh(java.lang.String)

• 先访问两个con巨g-c止en七-eureka的/from 请求, 会返回当前config repo/d沁ispace-dev.proper巨es中的from属性。

• 接着, 修改con巨g-repo/di中space-dev .properties中的from属性值, 并发送POST请求到其中的 一 个/bus/refresh。

• 最后, 再分别访问启动的两个con巨g-c归ent-eureka的/from请求, 此时这 两个请求都会返回最新的 con丘g-repo/di中space-dev.proper巨es 中的 from属性。 到 这里, 我们已经能够通过SpringCloud Bus来实时更新 总线上的属性配置了 。

原理分析

Spring Cloud Bus 消息总线

当我们将系统启动起来之后, 图中"Service A"的三个实例会请求ConfigServer以 获 此时, 若我们需要修改"Service A"的属性。

首先, 通过Git管理工具去仓库中修改 对应的属性值,但是这个修改并不会触发"Service A"实例的属性更新。我们向"Service A" 的实例3发送POST请求, 访问/bus/refresh接口。 此时,"Service A"的实例3就会 将刷新请求发送到消息总线中, 该消息事件会被"Service A"的实例l和实例2从总线中 获取到, 并重新从 ConfigServer中获取它们的配置信息, 从而实现配置信息的动态更新。

而从Git仓库中配置的修改到发 起 /bus/refresh的POST请求这一步可以通过Git 所以在 Web Hook中就不需要维护所有节点内容来进行更新, 从而解决了上一章中仅通过 Web Hook来逐个进行刷新的问题。

指定刷新范围

在上面的例子中,我们通过向服务实例 请求SpringCloud Bus的/bus/refresh接口, 从而触发 总线上其他服务实例的/refresh。 但是在 一 些特殊场景下, 我们希望可以刷新 微服务中某个具体实例的配置。

Spring Cloud Bus 对这种场景也有很好的支待, /bus/refresh 接口提供了 一 个 destination参数, 用来定位具体要 刷新的应用程序。 比如, 我们可以请求 /bus/refresh?destination = customers:9000, 此时总线上的各应用实例会根据 destination属性的值来判断是否为自己的实例名, 若符合才进行配置刷新, 若不符合 就忽略该消息。

destination参数除了可以定位具体的实例之外, 还可以用来定位具体的服务。 定 位服 务 的原理是通过使用 Spring的PathMatcher (路径正则匹配)来实现的 , 比如 /bus/refresh?destination = customers:**, 该请求会触发 customers服务的所 有实例进行刷新。

架构优化

既然SpringCloud Bus的 /bus/refresh接口提供了针对服务和实例进行配置更新的 参数, 那么我们的架构也可以相应做出 一 些调整。 在之前的架构中, 服务的配置更新需要 通过向具体服务中的某个实例发送请求, 再触发对整个服务集群的配置更新。 虽然能实现 功能, 但是这样的结果是, 我们指定的应用实例会不同千集群中的其他应用实例, 这样会 增加集群内部的复杂度, 不利于将来的运维工作。 比如, 需要对服务实例进行迁移, 那么 我们不得不修改Web Hook中的配置等。 所以要尽可能地让服务集群中的各个节点是对等 的。

因此, 我们将之前的架构做了 一 些调整, 如下图所示。

Spring Cloud Bus 消息总线

我们主要做了以下这些改动:

1.在ConfigServer中也引入SpringCloud Bus, 将配置服务端也加入到消息总线中来。

2. /bus/refresh请求不再发送到具体服务实例上, 而是发送给Config Server, 并 通过destination参数来指定需要更新配置的服务或实例。

通过上面的改动,我们的服务实例不需要再承担触发配置更新的职责。 同时, 对于Git 的触发等配置都只需要针对ConfigServer即可, 从而简化了集群上的 一 些维护工作。

Kafka整合Spring Cloud Bus

在介绍Kafka之前,我们已经通过引入spring-cloud-starter-bus-amqp模块, 完成了使用RabbitMQ来实现消息总线。若我们要使用Kafka来实现消息总线 时, 只需把 spring-cloud-starter-bus-amqp替换成spring-cloud-starter-bus-kafka 模块,在pom.xml的dependency节点中进行修改, 具体如下:

<dependency> 
    <groupId>org.springframework.cloud</groupId> 
    <artifactId>spring-cloud-starter-bus-kafka</artifactId> 
</dependency>

如果在启动 Kafka 时均采用了默认配置, 那么我们不需要再做任何其他配置就能在本 地实现从 RabbitMQ 到 Kafka 的切换。可以尝试把刚刚搭建的 ZooKeeper 、 Kafka 启动起来, 并将修改为 spring-cloud-starter-bus-kafka 模块的 config-server 和 config-cotent 启动起来。

在 config-server 启动时, 我们可以在控制台中看到如下输出:

2016-09-28 22:11:29.627 INFO 15144 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: 
springCloudBus
2016-09-28 22:11:29.642 INFO 15144 --- [-localhost:2181] 
org.IOitec.zkclien.ZkEventThread : Starting ZkClient event thread. 
016-09-28 22:11:30.290 INFO 15144 --- [ main] 
o.s.i.kafka.support.ProducerFactoryBean : Using producer properties =>
{bootstrap.servers=localhost:9092, linger.ms=O, acks=l, compression.type=none,
batch.size=l6384)
2016-09-28 22:11:30.298 INFO 15144 --- [ main] 
o.a.k.clients.producer.ProducerConfig : ProducerConfig values: 
2016-09-28 22: 11: 30. 322 INFO 15144 --- [ main) 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding
{message-handler:ou七bound.springCloudBus} as a subscriber to the'springCloudBusOutput'
channel
2016-09-28 22: 11: 30. 322 INFO 15144 --- [ main) 
o.s.integration.channel.DirectChannel : Channel 
'config-server:7001.springCloudBusOutput' has 1 subscriber(s). 
2016-09-28 22:11:30.322 INFO 15144 --- [ main) 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
2016-09-28 22: 11: 31. 465 INFO 15144 --- [ main] 
s.i.k.i.KafkaMessageDrivenChannelAdapter : started
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@4178
cb34
2016-09-28 22: 11: 31. 467 INFO 15144 --- [ main] 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : Adding
{message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282aOd5a
30b) as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:11:31.467 INFO 15144 --- [ main] 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : started
inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b

从控制台的输出内容我们可以看到, config-server 连接到了 Kafka 中, 并使用了

名为 springCloudBus 的 Topico 此时, 我们可以使用 kafka-topics --list --zookeeper localhost:2181 命令来查看当前 Kafka 中的 Topic 。 若已成功启动了 config-server 并配置正确, 可以 在 Kafka 中看到已经多了 一 个名为 springCloudBus 的 Topico

我们再启动配置了 spring-cloud-starter-bus- kafka 模块的 config-client, 可以看到控制台中输出了如下内容:

2016-09-28 22:43:55.067 INFO 6136 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: 
springCloudBus 
2016-09-28 22:43:55.078 INFO 6136 --- [-localhost:2181] 
org.IOitec.zkclient.ZkEventThread : Starting ZkClient event thread. 
2016-09-28 22:50:38.584 INFO 828 --- [ main] 
o.s.i.kafka.support.ProducerFactoryBean : Using producer proper七ies => 
{bootstrap.servers=localhost:9092, manger.ms=O, acks=l, compression.type=none, 
batch.size=l6384} 
2016-09-28 22:50:38.592 INFO 828 --- [ main] 
o.a.k.clients.producer.ProducerConfig : ProducerConfig values: 
2016-09-28 22:50:38.615 INFO 828 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding 
{message-handler: outbound. springCloudBus} as a subscriber to the 'springCloudBusOutput' 
channel 
2016-09-28 22:50:38.616 INFO 828 --- [ main] 
o.s.integration.channel.DirectChannel : Channel 
'di立space:7002.springCloudBusOutput' has 1 subscriber(s). 
2016-09-28 22:50:38.616 INFO 828 --- [ main] 
o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus 
2016-09-28 22:50:39.162 INFO 828 --- [ main) 
s.i.k.i.KafkaMessageDrivenChannelAdapter : started
org.springframework.in七egration.kafka.inbound.KafkaMessageDrivenChannelAdapter@60cf
855e 
2016-09-28 22:50:39.162 INFO 828 --- [ main) 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : Adding 
{message-handler:inbound.springClouc!Bus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee 
216) as a subscriber to the 'bridge.springClouc!Bus' channel 
2016-09-28 22:50:39.163 INFO 828 --- [ main) 
o.s.c.s.b.k.KafkaMessageChanne1Binder$7 : started 
inbound.springClouc!Bus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216

可以看到, config-content 启 动时输出了类似的内容, 它们都订阅了名为 springCloudBus 的 Topic 。 从这里我们也可以知道, 在消息总线上的节点, 从结构上来 说, 不论是 config-server 还是 config-content, 它们都是对等的。 在启动了 config-server 和 config-c让ent 之后,为了更明显地观察消息总线刷 新配置 的效果, 我们可以在本地启动多个不同端口的 config-content 。 此时, 我们的 config-server 以及多个 config-content 都已经连接到了由 Kafka 实现的消息总线 上。 我们可以先访问各个 con巨g-c让ent 上 的 /from 请求, 查看它获取到的配置内容。 然后, 修改 Git 中对应的参数内容, 再访问各个 con丘g-c巨ent 上的 /from 请求, 可以 看到配置内容并没有改变。 最后, 我们向 con巨g-server 发 送 POST 请 求: /bus/ refresh, 此时再去访间各个config-content 上的 /from请求, 就能获得最新 的配置信息,各客户端上的配置都已经加载为最新的Git 配置内容。

从config-client的控制台中, 我们可以看到如下内容:

2016-09-29 08:20:34.361 INFO 21256 --- [ kafka-binder-1] 
o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [from) 

RefreshListener监听类记录了收到远程刷新请求, 并刷新了from属性的日志。

深入理解

启动消费者控制台之后,我们向con丘g-server发送POST请求: /bus/refresh, 此时在控制台中可以看到类似如下的内容:

content Type "application/json"
{
"type": "RefreshRemoteApplicationEvent", 
"timestamp": 1475073160814, 
"originService": "config-server: 7001", 
"destinationService": "*: * *", 
"id": "bbfbf495-39d8-4ff9-93d6-174873ff7299"
}

contentType "application/json"
{
“type": "AckRemoteApplicationEvent", 
"timestamp": 1475073160821, 
"originService": "config-server: 7001", 
"destinationService": "*: **", 
"id": "lf794774-10d6-4140-a80d-470983c6c0ff", 
"ackid": "bbfbf495-39d8-4ff9-93d6-174873ff7299", 
"ackDestinationService": "*: **", 
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}

contentType "application/json"
{
"type": "AckRemoteApplicationEvent", 
"timestamp": 1475075467554, 
"originService": "didispace: 7002", 
"destinationService": "*: **", 
"id": "7 56015le-f60c-4 9cd-8167-b69le84 6ad08", 
"ackid": "21502725-28f5-4dl 9-a98a-f8114fa4 fldc", 
"ackDestinationService": "*:**", 
"event": "org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"
}

• type: 消息的事件类型。在上面的例子中,包含了RefreshRemoteApplicationEvent 和AckRemoteAppticationEvent。其中,RefreshRemoteApplicationEvent 事件就是我们用来刷新配置的事件,而AckRemoteApp巨ca巨onEvent是响应消 息已经正确接收的告知消息 事件。

• timestamp: 消息的时间戳。

• originService: 消息的来源服务实例。

• destinationService: 消息的目标服务实例。 上面示例中的 * : ** 代表了总线上 的所有服务实例。 如果想要指定服务或是实例, 在之前介绍RabbitMQ实现消息总 线时已经提过, 只需要通过使用des巨nation参数来定位具体要刷新的应用实例 即可, 比如发起/bus/refresh?des巨nation = d沁ispace请求, 就可以得到 如下的刷新事件消息,其中destinationService为didispace:**, 表示总 线上所有didispace服务的实例。

content Type "application/json"
{
    "type": "RefreshRemoteApplicationEvent", 
    "timestamp": 1475131215007, 
    "originService": "config-server: 7001", 
    "destinationService": "didispace:**", 
    "id": "667fe948-e9b2-447f-be22-3c8acf647ead"
}

• id: 消息的唯 一 标识。

上面的消息内容是RefreshRemoteApplicationEvent和AckRemoteApplicationEvent 类型共有的, 下面几个属性是AckRemoteApplicationEvent所特有的,分别表示如下 含义。

• ackid:Ack消息对应的消息来源。我们可以看到第 一 条AckRemoteApplication Event的ackid对应了 RefreshRemoteApplicationEvent的id, 说明这条 Ack是告知该 RefreshRemoteApplicationEvent事件的消息已经被收到。

• ackDestinationService: Ack 消息的目标服务实例。 可以看到这里使用的是 * : ** , 所以消息总线上所有的实例都会收到该Ack消息。

• event: Ack 消息的来源事件。 可以看到上例中的两个Ack均来源于刷新配置的 RefreshRemoteApplicationEvent事件, 我们在测试的时候由于启动了两个 config-client, 所以有两个实例接收到了配置刷新事件, 同时它们都会返回 一 个 Ack消息 。 由于ackDestinationService为 * : ** , 所以两个 config-client 都会收到对RefreshRemoteApplicationEvent事件的Ack消息。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这