文章

过滤器(九)之TpsLimitFilter

过滤器(九)之TpsLimitFilter

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文分享 TpsLimitFilter 过滤器,用于服务提供者中,提供 限流 的功能。

配置方式

① 通过 配置项,添加到 或 或 中开启,例如:

```plain text plain

1
2
3
4
5
6
7
8
9
10
11
---

② 通过  配置项,设置 TPS **周期**。

**注意**

笔者阅读的 Dubbo 版本,目前暂未配置 TpsLimitFilter 到 Dubbo SPI 文件里,所以我们需要添加到 com.alibaba.dubbo.rpc.Filter 中,例如:

```plain text
plain tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter

2. TpsLimitFilter

com.alibaba.dubbo.rpc.filter.TpsLimitFilter ,实现 Filter 接口,TPS 限流过滤器实现类。代码如下:

```plain text plain 1: @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY) 2: public class TpsLimitFilter implements Filter { 3: 4: private final TPSLimiter tpsLimiter = new DefaultTPSLimiter(); 5: 6: @Override 7: public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { 8: if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) { 9: throw new RpcException( 10: new StringBuilder(64) 11: .append(“Failed to invoke service “) 12: .append(invoker.getInterface().getName()) 13: .append(“.”) 14: .append(invocation.getMethodName()) 15: .append(“ because exceed max service tps.”) 16: .toString()); 17: } 18: // 服务调用 19: return invoker.invoke(invocation); 20: } 21: 22: }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
---

- 第 8 至 17 行:调用
TPSLimiter#isAllowable(url, invocation)
方法,根据 tps 限流规则判断是否限制此次调用。若是,抛出 RpcException 异常。目前使用 TPSLimiter 作为限流器的实现类。
- 第 19 行:调用
Invoker#invoke(invocation)
方法,服务调用。

# 3. TPSLimiter

com.alibaba.dubbo.rpc.filter.tps.TPSLimiter ,TPS 限制器接口。代码如下:

```plain text
plain public interface TPSLimiter {      /**      * judge if the current invocation is allowed by TPS rule      *      * 根据 tps 限流规则判断是否限制此次调用.      *      * @param url        url      * @param invocation invocation      * @return true allow the current invocation, otherwise, return false      */     boolean isAllowable(URL url, Invocation invocation);  }

3.1 DefaultTPSLimiter

com.alibaba.dubbo.rpc.filter.tps.DefaultTPSLimiter ,实现 TPSLimiter 接口,默认 TPS 限制器实现类,以服务为维度。代码如下:

```plain text plain 1: public class DefaultTPSLimiter implements TPSLimiter { 2: 3: /** 4: * StatItem 集合 5: * 6: * key:服务名 7: */ 8: private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>(); 9: 10: @Override 11: public boolean isAllowable(URL url, Invocation invocation) { 12: // 获得 TPS 大小配置项 13: int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1); 14: // 获得 TPS 周期配置项,默认 60 秒 15: long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL); 16: String serviceKey = url.getServiceKey(); 17: // 要限流 18: if (rate > 0) { 19: // 获得 StatItem 对象 20: StatItem statItem = stats.get(serviceKey); 21: // 不存在,则进行创建 22: if (statItem == null) { 23: stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval)); 24: statItem = stats.get(serviceKey); 25: } 26: // 根据 TPS 限流规则判断是否限制此次调用. 27: return statItem.isAllowable(url, invocation); 28: // 不限流 29: } else { 30: // 移除 StatItem 31: StatItem statItem = stats.get(serviceKey); 32: if (statItem != null) { 33: stats.remove(serviceKey); 34: } 35: // 返回通过 36: return true; 37: } 38: } 39: 40: }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
---

- stats**即以服务为维度**
属性,StatItem 集合,Key 为 服务名,
。
- 第 13 行:获得 TPS 大小配置项
“tps”
。
- 第 15 行:获得 TPS 周期配置项
“tps.interval”
,默认 60 * 1000 毫秒。
- 第 17 至 27 行:若要限流,调用
StatItem#isAllowable(url, invocation)
方法,根据 TPS 限流规则判断是否限制此次调用。
- 第 28 至 37 行:若不限流,移除 StatItem 对象。

## 3.2 StatItem

com.alibaba.dubbo.rpc.filter.tps.StatItem ,统计项。

### 3.2.1 构造方法

```plain text
plain /**  * 统计名,目前使用服务名  */ private String name; /**  * 周期  */ private long interval; /**  * 限制大小  */ private int rate; /**  * 最后重置时间  */ private long lastResetTime; /**  * 当前周期,剩余种子数  */ private AtomicInteger token;  StatItem(String name, int rate, long interval) {     this.name = name;     this.rate = rate;     this.interval = interval;     this.lastResetTime = System.currentTimeMillis();     this.token = new AtomicInteger(rate); }

3.2.2 isAllowable

plain text plain public boolean isAllowable(URL url, Invocation invocation) { // 若到达下一个周期,恢复可用种子数,设置最后重置时间。 long now = System.currentTimeMillis(); if (now > lastResetTime + interval) { token.set(rate); // 回复可用种子数 lastResetTime = now; // 最后重置时间 } // CAS ,直到或得到一个种子,或者没有足够种子 int value = token.get(); boolean flag = false; while (value > 0 && !flag) { flag = token.compareAndSet(value, value - 1); value = token.get(); } // 是否成功 return flag; }


666. 彩蛋

实际在服务的限流时,更推荐使用 令牌桶算法 ,在 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 中,我们有详细分享。

本文由作者按照 CC BY 4.0 进行授权