文章

过滤器(九)之TpsLimitFilter

过滤器(九)之TpsLimitFilter

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文分享 TpsLimitFilter 过滤器,用于服务提供者中,提供 限流 的功能。

配置方式

① 通过 配置项,添加到 或 或 中开启,例如:

1
2
3
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
    <dubbo:parameter key="tps" value="100" />
</dubbo:service>

② 通过 配置项,设置 TPS 周期

注意

笔者阅读的 Dubbo 版本,目前暂未配置 TpsLimitFilter 到 Dubbo SPI 文件里,所以我们需要添加到 com.alibaba.dubbo.rpc.Filter 中,例如:

1
tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter

2. TpsLimitFilter

com.alibaba.dubbo.rpc.filter.TpsLimitFilter ,实现 Filter 接口,TPS 限流过滤器实现类。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    new StringBuilder(64)
                            .append("Failed to invoke service ")
                            .append(invoker.getInterface().getName())
                            .append(".")
                            .append(invocation.getMethodName())
                            .append(" because exceed max service tps.")
                            .toString());
        }
        // 服务调用
        return invoker.invoke(invocation);
    }

}

  • 第 8 至 17 行:调用 TPSLimiter#isAllowable(url, invocation) 方法,根据 tps 限流规则判断是否限制此次调用。若是,抛出 RpcException 异常。目前使用 TPSLimiter 作为限流器的实现类。
  • 第 19 行:调用 Invoker#invoke(invocation) 方法,服务调用。

3. TPSLimiter

com.alibaba.dubbo.rpc.filter.tps.TPSLimiter ,TPS 限制器接口。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface TPSLimiter {

    /**
     * judge if the current invocation is allowed by TPS rule
     *
     * 根据 tps 限流规则判断是否限制此次调用.
     *
     * @param url        url
     * @param invocation invocation
     * @return true allow the current invocation, otherwise, return false
     */
    boolean isAllowable(URL url, Invocation invocation);
}

3.1 DefaultTPSLimiter

com.alibaba.dubbo.rpc.filter.tps.DefaultTPSLimiter ,实现 TPSLimiter 接口,默认 TPS 限制器实现类,以服务为维度。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class DefaultTPSLimiter implements TPSLimiter {

    /**
     * StatItem 集合
     *
     * key:服务名
     */
    private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();

    @Override
    public boolean isAllowable(URL url, Invocation invocation) {
        // 获得 TPS 大小配置项
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        // 获得 TPS 周期配置项,默认 60 秒
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        String serviceKey = url.getServiceKey();
        // 要限流
        if (rate > 0) {
            // 获得 StatItem 对象
            StatItem statItem = stats.get(serviceKey);
            // 不存在,则进行创建
            if (statItem == null) {
                stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            // 根据 TPS 限流规则判断是否限制此次调用.
            return statItem.isAllowable(url, invocation);
        // 不限流
        } else {
            // 移除 StatItem
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                stats.remove(serviceKey);
            }
            // 返回通过
            return true;
        }
    }

}

  • stats即以服务为维度属性,StatItem 集合,Key 为 服务名。
  • 第 13 行:获得 TPS 大小配置项 tps
  • 第 15 行:获得 TPS 周期配置项 tps.interval,默认 60 * 1000 毫秒。
  • 第 17 至 27 行:若要限流,调用 StatItem#isAllowable(url, invocation) 方法,根据 TPS 限流规则判断是否限制此次调用。
  • 第 28 至 37 行:若不限流,移除 StatItem 对象。

3.2 StatItem

com.alibaba.dubbo.rpc.filter.tps.StatItem ,统计项。

3.2.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * 统计名,目前使用服务名
 */
private String name;
/**
 * 周期
 */
private long interval;
/**
 * 限制大小
 */
private int rate;
/**
 * 最后重置时间
 */
private long lastResetTime;
/**
 * 当前周期,剩余种子数
 */
private AtomicInteger token;

StatItem(String name, int rate, long interval) {
    this.name = name;
    this.rate = rate;
    this.interval = interval;
    this.lastResetTime = System.currentTimeMillis();
    this.token = new AtomicInteger(rate);
}

3.2.2 isAllowable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean isAllowable(URL url, Invocation invocation) {
    // 若到达下一个周期,恢复可用种子数,设置最后重置时间。
    long now = System.currentTimeMillis();
    if (now > lastResetTime + interval) {
        token.set(rate); // 回复可用种子数
        lastResetTime = now; // 最后重置时间
    }

    // CAS ,直到或得到一个种子,或者没有足够种子
    int value = token.get();
    boolean flag = false;
    while (value > 0 && !flag) {
        flag = token.compareAndSet(value, value - 1);
        value = token.get();
    }

    // 是否成功
    return flag;
}

666. 彩蛋

实际在服务的限流时,更推荐使用 令牌桶算法 ,在 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 中,我们有详细分享。

本文由作者按照 CC BY 4.0 进行授权