Dubbo RPC远程调用过程源码分析(服务消费者)

Stella981
• 阅读 507

Dubbo RPC远程调用过程源码分析(服务消费者)

关注 “Java艺术”一起来充电吧!

上篇我们分析了服务提供者处理一个请求的全过程,当然,是跳过信息交换层和传输层的。本篇继续分析服务提供者发起一个远程RPC调用的全过程,也是跳过信息交换层和传输层,但发起请求的逻辑会复杂些,包括负载均衡和失败重试的过程,以及当消费端配置与每个服务提供端保持多个长连接时的处理逻辑。

本篇内容:

  • 多个Bean依赖同一个Service创建多个引用?

  • RPC层的服务引入及发起请求过程

  • 消费者发起调用的完整全链路

多个Bean依赖同一个Service创建多个引用?

回答一个疑惑,也是我一直以来的疑惑。假设我在一个微服务中,有两个bean都依赖同一个Service(dubbo接口),那么服务引入会创建两个ReferenceBean吗?

Dubbo RPC远程调用过程源码分析(服务消费者)

Dubbo RPC远程调用过程源码分析(服务消费者)

DemoServiceComponent1和DemoServiceComponent2都依赖DemoService服务。

在分析Dubbo与Spring整合时,我们已经知道,被@Reference注释的字段将由AnnotationInjectedBeanPostProcessor这个BeanPostProcessor处理,负责给bean注入依赖。看下这个BeanPostProcessor处理依赖的入口。

protected Object getInjectedObject(A annotation, Object bean, String beanName, Class<?> injectedType,

所以,会不会为DemoServiceComponent1和DemoServiceComponent2都创建一个服务引入代理对象,是由@Reference注释配置的属性决定的。connections参数是可以忽略的,mock参数是不能忽略的,所以DemoServiceComponent1中配置mock,而DemoServiceComponent2中不配置mock就会创建不同的代理对象。至于哪些属性是不可忽略的,可自行看代码,我只关心mock和connections,因为我在项目中用到这两个参数。其它分组、版本号、是否在引入时判断提供者是否可用,这些都是决定缓存key是否相同的关键参数。

但是这里是代理对象,底层依然使用同一个ReferenceBean,且注册中心中也只存在一个消费者,而实际上,这两个代理类也没什么不一样。可以继续看doGetInjectedBean方法。

@Override

决定是否使用同一个ReferenceBean,只由接口名、版本号、分组决定的,与其它参数配置无关。所以,不管DemoServiceComponent1和DemoServiceComponent2的connections参数是否相同,都只会按照Spring初始化bean的顺序决定使用哪个@Reference配置的connections。

总结,拿笔记下来:只要是同一个接口、同一个版本号、同一个分组,那么不管有多少个依赖它的bean配置的@Reference的参数怎么不同,都只会使用同一个ReferenceBean对象。至于不同bean中@Reference配置的参数不同,会有哪些起作用,就取决于Spring初始化一个bean的过程。简单说,就是只有一个@Reference起作用。如果不指定Spring初始化bean的顺序,那么就给每个@Reference使用相同的属性配置,这样就确保配置都能起作用。

// 在处理bean的依赖注入时,如果字段是被@Reference注释的,

上面当是回顾下Spring阶段的服务引入过程。在配置层ReferenceConfig调用createProxy方法开始,就进入到注册中心层的服务引入。注册中心层的服务引用也不重复分析了。忘记回头看下这篇Dubbo分层架构之服务注册中心层的源码分析(下)

RPC层的服务引入

以dubbo协议为例,由注册中心委托给RegistryDirectory实现事件订阅,通过订阅获取所有可用的服务提供者,并依次调用RPC层的DubboProtocol的refer方创建Invoker实例,调用多少次就是有多少个服务提供者。

DubboProtocol的refer方法在其父类中实现。

@Override

refer方法创建一个支持异步转同步的Invoker。我们先看protocolBindingRefer方法,protocolBindingRefer返回一个Invoker,由子类DubboProtocol实现,所以AsyncToSyncInvoker持有的就是DubboProtocol创建的Invoker。

@Override

protocolBindingRefer创建一个DubboInvoker,只负责RPC层的调用。由于任何远程调用都是异步的,所以异步转同步的逻辑由AsyncToSyncInvoker实现。看下AsyncToSyncInvoker的invoke方法。

@Override

调用Invoker的invoke方法返回的Result实际上是一个Future,然后根据@Reference配置的属性,看下是否声明为同步调用,默认true,如果是,则调用Result的get方法,开始阻塞当前线程,直接接收到服务端的返回。异常则抛出一个RpcException。因为此处的Invoker也是经过层层包装的,上层会处理异常,比如Mock 机制。

AsyncToSyncInvoker包装了DubboInvoker,那么DubboInvoker包装的是什么呢?为何调用它的invoke方法就能调用远端服务呢?

DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, 

在new DubboInvoker时,调用getClients(url)方法获取远程连接,这里将涉及到信息交换层,我们只会分析到这里。

Dubbo RPC远程调用过程源码分析(服务消费者)

图为debug下断点的截图,能够看到,当前我配置的连接数为10,所以会创建10个ExchangeClient对象,这10个ExchangeClient是什么时候被使用的,稍后分析。

在new DubboInvoker时,除了调用getClients方法获取远程连接ExchangeClient之外,还给DubboInvoker传入了一个对象,就是invokers,这个invokers是DubboProtocol的一个字段,每创建一个DubboInvoker都会将其加入到这个字段中,所以每个DubboInvoker都能拿到所有的DubboInvoker。

DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

那么要分析服务消费端发起一次远程调用,就可以从这个DubboInvoker的invoker方法下断点。在此之前的调用路径先不分析。

【org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke】

Dubbo RPC远程调用过程源码分析(服务消费者)

例子中我给@Reference配置的连接数为10个,所以将会从10个连接中,通过当前调用次数与10取模,选出本次调用将要使用的长连接,说白了就是轮询。对应图中红框1部分代码。至于这个长连接数配置多少个合适,我也给不出一个准确的答案,就算配置一个也没多大问题,因为请求转为数据包发送出去之后,就不占用连接了。而等待服务端响应结果是NIO非阻塞模式等待的。消费端之所以能知道哪次响应是当前请求的响应,是通过在请求头中添加一个请求id识别的,服务端响应时也带上该请求id,后面分析传输层源码时介绍。

而图中2和3部分的代码是分别处理请求是否需要获取返回结果,即从url中获取return参数是否为true,比如

@Reference(check = false, mock = "org.apache.dubbo.demo.consumer.mock.DemoMock",

配置中指定sayHello方法不获取返回值,那么isOneway就为false,否则为true。当不需要获取返回值时,调用RpcContext.getContext().setFuture(null)设置当前线程上下文的Future为null,也就没办法通过RpcContext获取返回值。所以通过分析源码,我们能够知道很多配置的作用,以及怎么去用。

消费者发起调用的完整全链路

前面我们直接分析了RPC层的DubboInvoker的invoke方法,但似乎漏掉了很多内容,比如负载均衡呢?

Dubbo RPC远程调用过程源码分析(服务消费者)

服务引入的Invoker的包装链

  • Spring层

现在我们从ReferenceBeanInvocationHandler开始分析,这是一个InvocationHandler,就是jdk动态代理的InvocationHandler。而ReferenceBeanInvocationHandler是在Spring初始化bean过程中,由ReferenceAnnotationBeanPostProcessor处理依赖注入时,创建一个ReferenceBean的代理对象时创建的,由于是使用jdk动态代理,所以你应该知道ReferenceBeanInvocationHandler的作用。

Dubbo RPC远程调用过程源码分析(服务消费者)

ReferenceBeanInvocationHandler代理的是ReferenceBean,但是ReferenceBean是一个FactoryBean,所以是代理ReferenceBean.getObject返回的对象。

  • 配置层

InvokerInvocationHandler则是在配置层ReferenceConfig的createProxy方法中,使用javassist代理的由注册中心层返回的Invoker。代理来代理去的确实绕得很。

  • 注册中心层

配置层ReferenceConfig调用注册中心层的refer方法引入服务。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

在RegistryProtocol的refer方法中,RegistryDirectory由Cluster的join方法转为Invoker,这里会比较难以理解,因为服务提供者是有多个的,而且是会改变的,所以不能像服务导出那样,一条链路无缝衔接。这里只能返回一个抽象的Invoker,只有在RegistryDirectory订阅到提供者时,才会生成具体的Invoker。

这个cluster默认为FailoverCluster。

public class FailoverCluster implements Cluster {

但是这个FailoverCluster会被包装成MockClusterWrapper。Cluster的join方法被声明为自适应扩展点机制,由于我们并没有配置cluster,所以默认是FailoverCluster。

但为什么会被包装成MockClusterWrapper呢?这就是SPI自适应扩展机制最另人捉狂的地方。先看下dubbo-cluster模块下的resources目录下的org.apache.dubbo.rpc.cluster.Cluster配置文件。

Dubbo RPC远程调用过程源码分析(服务消费者)

dubbo在Cluster的SPI配置文件中加入了MockClusterWrapper。Dubbo的SPI在加载该配置文件时会解析成<name,Class>的映射,同时将构造方法需要传入一个同类型对象的类视为包装类,不管自适应扩展机制最终获取的是哪个Cluster,都会被包装上注册的所有包装类,而Cluster的SPI配置文件中只注册了MockClusterWrapper这一个包装类。

public class MockClusterWrapper implements Cluster {

MockClusterWrapper中创建了MockClusterInvoker,所以MockClusterInvoker以及FailoverClusterInvoker都是在注册中心层包上的。MockClusterInvoker包装了FailoverClusterInvoker。

  • RPC层

RPC层DubboProtocol的refer方法是由注册中心层RegistryDirectory调用的,在RegistryDirectory订阅到服务提供者时,根据提供者的数量循环遍历调用。而DubboInvoker在RPC层DubboProtocol创建的,并且也是在RPC层封装为AsyncToSyncInvoker的。所以RegistryDirectory完成将AsyncToSyncInvoker包装为InvokerDelegate,也就是图中的InvokerWrapper。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {

所以知道了Invoker的包装链路,顺着这个链路也就能知道整个调用过程了。

负载均衡是何时起作用的

最后,再回答一个问题,负载均衡是何时起作用的?

当cluster配置或默认使用FailoverCluster时,FailoverCluster的join方法创建的就是FailoverClusterInvoker,而负载均衡的调用入口就在FailoverClusterInvoker的doInvoke方法中。因为FailoverClusterInvoker持有RegistryDirectory的引用,所以FailoverClusterInvoker能够获取到服务的所有提供者,自然负责从所有提供者中选择一个调用,这也是集群层要做的事情。

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

select方法中的第一个参数就是负载均衡器,而loadbalance是doInvoke的参数,所以要看是从哪里传递过来的。这就要看FailoverClusterInvoker的父类的invoke方法。

@Override

initLoadBalance方法就是从所有可调用的Invoker中取第一个,然后通过SPI自适应扩展点机制从Invoker中获取URL,并从URL中获取负载均衡的参数配置,最后获取到负载均衡器。关于负载均衡的一些介绍可看下往期的两篇文章:源码分析Dubbo负载均衡策略的权重如何动态修改Dubbo自适应随机负载均衡策略的实现

Dubbo RPC远程调用过程源码分析(服务消费者)

Dubbo RPC远程调用过程源码分析(服务消费者)

公众号:Java艺术

扫码关注最新动态

本文分享自微信公众号 - Java艺术(javaskill)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
1小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(