Dubbo 是如何控制并发数和限流的?

Stella981
• 阅读 686

点击关注公众号,Java干货****及时送达Dubbo 是如何控制并发数和限流的?

ExecuteLimitFilter

ExecuteLimitFilter ,在服务提供者,通过 <dubbo:service /> 的 "executes" 统一配置项开启:表示每服务的每方法最大可并行执行请求数。

ExecuteLimitFilter是通过信号量来实现的对服务端的并发数的控制。

ExecuteLimitFilter执行流程:

  1. 首先会去获得服务提供者每服务每方法最大可并行执行请求数

  2. 如果每服务每方法最大可并行执行请求数大于零,那么就基于基于服务 URL + 方法维度获取一个RpcStatus实例

  3. 通过RpcStatus实例获取一个信号量,若果获取的这个信号量调用tryAcquire返回false,则抛出异常

  4. 如果没有抛异常,那么久调用RpcStatus静态方法beginCount,给这个 URL + 方法维度开始计数

  5. 调用服务

  6. 调用结束后计数调用RpcStatus静态方法endCount,计数结束

  7. 释放信号量

ExecuteLimitFilter

@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {    URL url = invoker.getUrl();    String methodName = invocation.getMethodName();    Semaphore executesLimit = null;    boolean acquireResult = false;    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);    if (max > 0) {        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());        //            if (count.getActive() >= max) {        /**             * http://manzhizhen.iteye.com/blog/2386408             * use semaphore for concurrency control (to limit thread number)             */        executesLimit = count.getSemaphore(max);        if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");        }    }    long begin = System.currentTimeMillis();    boolean isSuccess = true;    RpcStatus.beginCount(url, methodName);    try {        Result result = invoker.invoke(invocation);        return result;    } catch (Throwable t) {        isSuccess = false;        if (t instanceof RuntimeException) {            throw (RuntimeException) t;        } else {            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);        }    } finally {        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);        if(acquireResult) {            executesLimit.release();        }    }}

我们接下来看看RpcStatus这个类

private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();public static RpcStatus getStatus(URL url, String methodName) {    String uri = url.toIdentityString();    ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);    if (map == null) {        METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());        map = METHOD_STATISTICS.get(uri);    }    RpcStatus status = map.get(methodName);    if (status == null) {        map.putIfAbsent(methodName, new RpcStatus());        status = map.get(methodName);    }    return status;}

这个方法很简单,大概就是给RpcStatus这个类里面的静态属性METHOD_STATISTICS里面设值。外层的map是以url为key,里层的map是以方法名为key。

private volatile int executesPermits;public Semaphore getSemaphore(int maxThreadNum) {    if(maxThreadNum <= 0) {        return null;    }    if (executesLimit == null || executesPermits != maxThreadNum) {        synchronized (this) {            if (executesLimit == null || executesPermits != maxThreadNum) {                executesLimit = new Semaphore(maxThreadNum);                executesPermits = maxThreadNum;            }        }    }    return executesLimit;}

这个方法是获取信号量,如果这个实例里面的信号量是空的,那么就添加一个,如果不是空的就返回。另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Dubbo 系列面试题和答案。

TPSLimiter

TpsLimitFilter 过滤器,用于服务提供者中,提供限流的功能。

配置方式:

通过 <dubbo:parameter key="tps" value="" /> 配置项,添加到 <dubbo:service /> 或 <dubbo:provider /> 或 <dubbo:protocol /> 中开启,例如:

dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" ><dubbo:parameter key="tps" value="100" /></dubbo:service>

通过 <dubbo:parameter key="tps.interval" value="" /> 配置项,设置 TPS 周期。

源码分析

TpsLimitFilter

private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {    if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {        throw new RpcException(            "Failed to invoke service " +            invoker.getInterface().getName() +            "." +            invocation.getMethodName() +            " because exceed max service tps.");    }    return invoker.invoke(invocation);}

invoke方法调用了DefaultTPSLimiter的isAllowable,我们进入到isAllowable方法看一下

DefaultTPSLimiter

private final ConcurrentMap<String, StatItem> stats    = new ConcurrentHashMap<String, StatItem>();@Overridepublic boolean isAllowable(URL url, Invocation invocation) {    //获取tps这个参数设置的大小    int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);    //获取tps.interval这个参数设置的大小,默认60秒    long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,                                     Constants.DEFAULT_TPS_LIMIT_INTERVAL);    String serviceKey = url.getServiceKey();    if (rate > 0) {        StatItem statItem = stats.get(serviceKey);        if (statItem == null) {            stats.putIfAbsent(serviceKey,                              new StatItem(serviceKey, rate, interval));            statItem = stats.get(serviceKey);        }        return statItem.isAllowable();    } else {        StatItem statItem = stats.get(serviceKey);        if (statItem != null) {            stats.remove(serviceKey);        }    }    return true;}

若要限流,调用 StatItem#isAllowable(url, invocation) 方法,根据 TPS 限流规则判断是否限制此次调用。

StatItem

private long lastResetTime;private long interval;private AtomicInteger token;private int rate;public boolean isAllowable() {    long now = System.currentTimeMillis();    // 若到达下一个周期,恢复可用种子数,设置最后重置时间。    if (now > lastResetTime + interval) {        token.set(rate);// 回复可用种子数        lastResetTime = now;// 最后重置时间    }    // CAS ,直到或得到一个种子,或者没有足够种子    int value = token.get();    boolean flag = false;    while (value > 0 && !flag) {        flag = token.compareAndSet(value, value - 1);        value = token.get();    }    return flag;}

关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Dubbo 系列面试题和答案。

作者:luozhiyun
出处:www.cnblogs.com/luozhiyun/p/10960593.html

Dubbo 是如何控制并发数和限流的?

Dubbo 是如何控制并发数和限流的?

Dubbo 是如何控制并发数和限流的?

Dubbo 是如何控制并发数和限流的?

Dubbo 是如何控制并发数和限流的?

Dubbo 是如何控制并发数和限流的?

Dubbo 是如何控制并发数和限流的?

关注Java技术栈看更多干货

Dubbo 是如何控制并发数和限流的?

Dubbo 是如何控制并发数和限流的?

戳原文,获取精选面试题!

本文分享自微信公众号 - Java技术栈(javastack)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Stella981 Stella981
3年前
200的大额人民币即将面世?央行:Yes!
点击上方蓝字关注我们!(https://oscimg.oschina.net/oscnet/2a1c2ac00bf54458a78c48a6c2e547d5.png)点击上方“印象python”,选择“星标”公众号重磅干货,第一时间送达!!(
可莉 可莉
3年前
200的大额人民币即将面世?央行:Yes!
点击上方蓝字关注我们!(https://oscimg.oschina.net/oscnet/2a1c2ac00bf54458a78c48a6c2e547d5.png)点击上方“印象python”,选择“星标”公众号重磅干货,第一时间送达!!(
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这