文章

集群容错(七)之Router实现

集群容错(七)之Router实现

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文接 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》 一文,分享 dubbo-cluster 模块, router 包,实现 Dubbo的路由规则功能。

Router 相关类,如下图:

Router相关类

老艿艿:本文对应 《Dubbo 用户指南 —— 路由规则》 文档。如果之前没了解过该功能的胖友,请先阅读了解下哈。

2. RouterFactory

com.alibaba.dubbo.rpc.cluster.RouterFactory ,Router 工厂接口。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SPI
public interface RouterFactory {

    /**
     * Create router.
     *
     * 创建 Router 对象
     *
     * @param url
     * @return router
     */
    @Adaptive("protocol")
    Router getRouter(URL url);

}
  • @SPI拓展点注解,Dubbo SPI,无默认值。
  • @Adaptive(“protocol”)注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Router 实现,使用URL.protocol属性。
  • #getRouter(URL url)接口方法,获得 Router 对象。

2.1 ConditionRouterFactory

com.alibaba.dubbo.rpc.cluster.router.condition.ConditionRouterFactory ,实现 RouterFactory 接口,ConditionRouter 工厂实现类。代码如下:

1
2
3
4
5
6
7
8
9
10
public class ConditionRouterFactory implements RouterFactory {

    public static final String NAME = "condition";

    @Override
    public Router getRouter(URL url) {
        return new ConditionRouter(url);
    }

}
  • 对应 Router 实现类为 ConditionRouter 。

2.2 ScriptRouterFactory

com.alibaba.dubbo.rpc.cluster.router.script.ScriptRouterFactory ,实现 RouterFactory 接口,ScriptRouter 工厂实现类。代码如下:

1
2
3
4
5
6
7
8
9
10
public class ScriptRouterFactory implements RouterFactory {

    public static final String NAME = "script";

    @Override
    public Router getRouter(URL url) {
        return new ScriptRouter(url);
    }

}
  • 对应 Router 实现类为 ScriptRouter 。

2.3 FileRouterFactory

com.alibaba.dubbo.rpc.cluster.router.file.FileRouterFactory ,实现 RouterFactory 接口,基于文件读取路由规则,创建对应的 Router 实现类的对象。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class FileRouterFactory implements RouterFactory {

    public static final String NAME = "file";

    /**
     * RouterFactory$Adaptive 对象
     */
    private RouterFactory routerFactory;

    public void setRouterFactory(RouterFactory routerFactory) {
        this.routerFactory = routerFactory;
    }

    @Override
    public Router getRouter(URL url) {
        try {
            // Transform File URL into Script Route URL, and Load
            // file:///d:/path/to/route.js?router=script ==> script:///d:/path/to/route.js?type=js&rule=<file-content>
            // 获得 router 配置项,默认为 script
            String protocol = url.getParameter(Constants.ROUTER_KEY, ScriptRouterFactory.NAME); // Replace original protocol (maybe 'file') with 'script'
            // 使用文件后缀做为类型
            String type = null; // Use file suffix to config script type, e.g., js, groovy ...
            String path = url.getPath();
            if (path != null) {
                int i = path.lastIndexOf('.');
                if (i > 0) {
                    type = path.substring(i + 1);
                }
            }
            // 读取规则内容
            String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath())));

            // 创建路由规则 URL
            boolean runtime = url.getParameter(Constants.RUNTIME_KEY, false);
            URL script = url.setProtocol(protocol).addParameter(Constants.TYPE_KEY, type)
                    .addParameter(Constants.RUNTIME_KEY, runtime)
                    .addParameterAndEncoded(Constants.RULE_KEY, rule);

            // 通过 Dubbo SPI Adaptive 机制,获得 Router 对象
            return routerFactory.getRouter(script);
        } catch (IOException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

}
  • 第 20 行:获得”router”配置项,默认为”script”。
  • 第 21 至 29 行:获得类型,基于文件后缀。
  • 第 31 行:从文件规则内容中,读取。
  • 第 33 至 37 行:创建路由规则 URL 对象。
  • 第 40 行:通过 Dubbo SPI Adaptive对应的 Router 对象机制,获得。

3. Router

com.alibaba.dubbo.rpc.cluster.Router ,实现 Comparable 接口,路由规则接口。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Router extends Comparable<Router> {

    /**
     * get the router url.
     * <p>
     * 路由规则 URL
     *
     * @return url
     */
    URL getUrl();

    /**
     * route.
     *
     * 路由,筛选匹配的 Invoker 集合
     *
     * @param invokers   Invoker 集合
     * @param url        refer url
     * @param invocation
     * @return routed invokers 路由后的 Invoker 集合
     * @throws RpcException
     */
    <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
  • 一个 Router 对象,对应一条路由规则
  • Configurator 有优先级的要求,所以实现 Comparable 接口。
  • #getUrl()接口方法,获得路由 URL ,里面带有路由规则。
  • #route(List invokers, URL url, Invocation invocation)**匹配的**接口方法,路由,筛选Invoker 集合。

3.1 ConditionRouter

com.alibaba.dubbo.rpc.cluster.router.condition.ConditionRouter ,实现 Router 接口,基于条件表达式的 Router 实现类。

基于条件表达式的路由规则,如:host = 10.20.153.10 => host = 10.20.153.11

注意,胖友一定要看了 《Dubbo 用户指南 —— 路由规则》条件路由规则 部分,不然下面影响理解。

3.1.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
 * 分组正则匹配,详细见 {@link #parseRule(String)} 方法
 *
 * 前 [] 为匹配,分隔符
 * 后 [] 为匹配,内容
 */
private static Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");

/**
 * 路由规则 URL
 */
private final URL url;
/**
 * 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为 0 。
 */
private final int priority;
/**
 * 当路由结果为空时,是否强制执行,如果不强制执行,路由结果为空的路由规则将自动失效,可不填,缺省为 false 。
 */
private final boolean force;
/**
 * 消费者匹配条件集合,通过解析【条件表达式 rule 的 `=>` 之前半部分】
 */
private final Map<String, MatchPair> whenCondition;
/**
 * 提供者地址列表的过滤条件,通过解析【条件表达式 rule 的 `=>` 之后半部分】
 */
private final Map<String, MatchPair> thenCondition;

public ConditionRouter(URL url) {
    this.url = url;
    this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
    this.force = url.getParameter(Constants.FORCE_KEY, false);
    try {
        // 拆分条件变大时为 when 和 then 两部分
        String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
        if (rule == null || rule.trim().length() == 0) {
            throw new IllegalArgumentException("Illegal route rule!");
        }
        rule = rule.replace("consumer.", "").replace("provider.", "");
        int i = rule.indexOf("=>");
        String whenRule = i < 0 ? null : rule.substring(0, i).trim();
        String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
        // 解析 `whenCondition`
        Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
        // 解析 `thenCondition`
        Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
        // NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
        this.whenCondition = when;
        this.thenCondition = then;
    } catch (ParseException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

3.1.2 MatchPair

MatchPair 为 ConditionRouter 的内部静态类,用于匹配的值每个属性条件,例如 methodhost 等,对应一个 MatchPair 对象。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private static final class MatchPair {

    /**
     * 匹配的值集合
     */
    final Set<String> matches = new HashSet<String>();
    /**
     * 不匹配的值集合
     */
    final Set<String> mismatches = new HashSet<String>();

    /**
     * 判断 value 是否匹配 matches + mismatches
     *
     * @param value 值
     * @param param URL
     * @return 是否匹配
     */
    private boolean isMatch(String value, URL param) {
        // 只匹配 matches
        if (!matches.isEmpty() && mismatches.isEmpty()) {
            for (String match : matches) {
                if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                    return true;
                }
            }
            return false; // 如果没匹配上,认为为 false ,即不匹配
        }

        // 只匹配 mismatches
        if (!mismatches.isEmpty() && matches.isEmpty()) {
            for (String mismatch : mismatches) {
                if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                    return false;
                }
            }
            return true; // 注意,这里和上面不同。原因,你懂的。
        }

        // 匹配 mismatches + matches
        if (!matches.isEmpty()) {
            //when both mismatches and matches contain the same value, then using mismatches first
            for (String mismatch : mismatches) {
                if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                    return false;
                }
            }
            for (String match : matches) {
                if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                    return true;
                }
            }
            return false; // 如果没匹配上,认为为 false ,即不匹配
        }
        return false;
    }
}
  • #isMatch(String value, URL param)匹配方法,判断value是否matches和mismatches。
    • 那么为什么会有param参数呢?因为要支持$从 URL 中,读取参数。
    • #UrlUtils#isMatchGlobPattern(match, value, URL)方法,支持-通配,判断match和value是否匹配。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static boolean isMatchGlobPattern(String pattern, String value, URL param) {
    // 以美元符 `$` 开头,表示引用参数
    if (param != null && pattern.startsWith("$")) {
        pattern = param.getRawParameter(pattern.substring(1));
    }
    // 匹配
    return isMatchGlobPattern(pattern, value);
}

public static boolean isMatchGlobPattern(String pattern, String value) {
    // 全匹配
    if ("*".equals(pattern)) {
        return true;
    }
    // 全部为空,匹配
    if ((pattern == null || pattern.length() == 0) && (value == null || value.length() == 0)) {
        return true;
    }
    // 有一个为空,不匹配
    if ((pattern == null || pattern.length() == 0) || (value == null || value.length() == 0)) {
        return false;
    }

    // 支持 * 的通配
    int i = pattern.lastIndexOf('*');
    // doesn't find "*"
    if (i == -1) {
        return value.equals(pattern);
    }
    // "*" is at the end
    else if (i == pattern.length() - 1) {
        return value.startsWith(pattern.substring(0, i));
    }
    // "*" is at the beginning
    else if (i == 0) {
        return value.endsWith(pattern.substring(i + 1));
    }
    // "*" is in the middle
    else {
        String prefix = pattern.substring(0, i);
        String suffix = pattern.substring(i + 1);
        return value.startsWith(prefix) && value.endsWith(suffix);
    }
}

代码比较简单,所以胖友自己读下。

3.1.3 parseRule

#parseRule(rule) 方法,解析路由配置内容 “rule” 。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private static Map<String, MatchPair> parseRule(String rule) throws ParseException {
//    System.out.println("rule: " + rule); // add by 芋艿,方便大家看
    Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
    if (StringUtils.isBlank(rule)) {
        return condition;
    }
    // Key-Value pair, stores both match and mismatch conditions
    MatchPair pair = null;
    // Multiple values
    Set<String> values = null;
    final Matcher matcher = ROUTE_PATTERN.matcher(rule);
    while (matcher.find()) { // Try to match one by one
        String separator = matcher.group(1);
        String content = matcher.group(2);
//        System.out.println(separator + "\t" + content); // add by 芋艿,方便大家看
        // Start part of the condition expression.
        if (separator == null || separator.length() == 0) {
            pair = new MatchPair();
            condition.put(content, pair);
        }
        // The KV part of the condition expression
        else if ("&".equals(separator)) {
            if (condition.get(content) == null) {
                pair = new MatchPair();
                condition.put(content, pair);
            } else {
                pair = condition.get(content);
            }
        }
        // The Value in the KV part.
        else if ("=".equals(separator)) {
            if (pair == null) {
                throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start());
            }
            values = pair.matches;
            values.add(content);
        }
        // The Value in the KV part.
        else if ("!=".equals(separator)) {
            if (pair == null) {
                throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start());
            }
            values = pair.mismatches;
            values.add(content);
        }
        // The Value in the KV part, if Value have more than one items.
        else if (",".equals(separator)) { // Should be seperateed by ','
            if (values == null || values.isEmpty()) {
                throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start());
            }
            values.add(content);
        } else {
            throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start());
        }
    }
    return condition;
}
  • 第 11 至 14 行:通过 循环ROUTE_PATTERN正则匹配rule,多次,直到结束。如下是两个例子:
1
2
3
4
5
rule: host = 192.168.3.17 & method = say01
host =  192.168.3.17 &  method =    say01
---------- 分割线 ----------
rule: host = 192.168.3.17
host =  192.168.3.17
  • 第 16 至 29 行:处理条件属性的情况,例如:host和& method等等,此时会获得对应的 MatchPair 对象。若不存在,则进行创建 MatchPair 对象。
  • 第 30 至 45 行:处理条件条件值的情况,例如:= 192.168.3.17和!= say01等等,此时会添加到 MatchPair 的matches或mismatches中。
    • 第 46 至 51 行:处理条件条件值也会以逗号(,)分隔多个值的情况,此时添加到 MatchPair 的matches或mismatches中。
  • 第 52 至 54 行:非法,抛出 ParseException 异常。

3.1.4 route

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    // 为空,直接返回空 Invoker 集合
    if (invokers == null || invokers.isEmpty()) {
        return invokers;
    }
    try {
        // 不匹配 `whenCondition` ,直接返回 `invokers` 集合,因为不需要走 `whenThen` 的匹配
        if (!matchWhen(url, invocation)) {
            return invokers;
        }
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        // `whenThen` 为空,则返回空 Invoker 集合
        if (thenCondition == null) {
            logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
            return result;
        }
        // 使用 `whenThen` ,匹配 `invokers` 集合。若符合,添加到 `result` 中
        for (Invoker<T> invoker : invokers) {
            if (matchThen(invoker.getUrl(), url)) {
                result.add(invoker);
            }
        }
        // 若 `result` 非空,返回它
        if (!result.isEmpty()) {
            return result;
        // 如果 `force=true` ,代表强制执行,返回空 Invoker 集合
        } else if (force) {
            logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
            return result;
        }
    } catch (Throwable t) {
        logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
    }
    // 如果 `force=false` ,代表不强制执行,返回 `invokers` 集合,即忽略路由规则
    return invokers;
}
  • 第 3 至 6 行:若 invokers为空,直接返回Invoker 集合。
  • 第 8 至 11 行:调用 消费者#matchWhen(url, invocation)方法,使用服务url匹配whenCondition。代码如下:
1
2
3
boolean matchWhen(URL url, Invocation invocation) {
    return whenCondition == null || whenCondition.isEmpty() || matchCondition(whenCondition, url, null, invocation);
}
  • 如果匹配条件为空,表示对所有消费方应用,如:=> host != 10.20.153.11。
  • 不匹配,则直接返回invokers集合,因为不需要走whenThen的匹配。
  • #matchCondition(…) 方法的详细解析,见 「3.1.5 matchCondition」

  • 第 13 至 17 行:若 whenThen为空,则返回Invoker 集合。
  • 第 18 至 23 行:循环提供者者调用#matchThen(url, invocation)方法,使用服务invokers的 URL ,匹配whenThen集合。代码如下:
1
2
3
private boolean matchThen(URL url, URL param) {
    return !(thenCondition == null || thenCondition.isEmpty()) && matchCondition(thenCondition, url, param, null);
}
  • 如果过滤条件为空,表示禁止访问,如:host = 10.20.153.10 =>。
  • 匹配,添加到result中。

  • ========== 处理 三种result+force的情况 ==========
  • 第 24 至 26 行:若 非空它result,返回。
  • 第 27 至 31 行:若 为空空result,如果force=true,代表强制执行,返回Invoker 集合。
  • 第 36 行:若 为空不全忽略result,如果force=false,代表强制执行,返回invokers集合,即路由规则。

情况比较多,胖友可以回过头在理一理。

3.1.5 matchCondition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
    Map<String, String> sample = url.toMap();
    boolean result = false; // 是否匹配
    for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
        // 获得条件属性
        String key = matchPair.getKey();
        String sampleValue;
        // get real invoked method name from invocation
        if (invocation != null && (Constants.METHOD_KEY.equals(key) || Constants.METHODS_KEY.equals(key))) {
            sampleValue = invocation.getMethodName();
        } else {
            sampleValue = sample.get(key);
            if (sampleValue == null) {
                sampleValue = sample.get(Constants.DEFAULT_KEY_PREFIX + key);
            }
        }
        // 匹配条件值
        if (sampleValue != null) {
            if (!matchPair.getValue().isMatch(sampleValue, param)) { // 返回不匹配
                return false;
            } else {
                result = true;
            }
        } else {
            // not pass the condition
            if (!matchPair.getValue().matches.isEmpty()) { // 无条件值,但是有匹配条件 `matches` ,则返回不匹配。
                return false;
            } else {
                result = true;
            }
        }
    }
    return result;
}

3.1.5 compareTo

1
2
3
4
5
6
7
8
@Override
public int compareTo(Router o) {
    if (o == null || o.getClass() != ConditionRouter.class) {
        return 1;
    }
    ConditionRouter c = (ConditionRouter) o;
    return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1);
}
  • 优先,按照 降序“priority”。
  • 其次,按照 升序“url”。

3.2 ScriptRouter

com.alibaba.dubbo.rpc.cluster.router.script.ScriptRouter ,实现 Router 接口,基于脚本的 Router 实现类。

脚本路由规则 4 支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。

1
"script://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("function route(invokers) { ... } (invokers)")

基于脚本引擎的路由规则,如:

1
2
3
4
5
6
7
8
9
function route(invokers) {
    var result = new java.util.ArrayList(invokers.size());
    for (i = 0; i < invokers.size(); i ++) {
        if ("10.20.153.10".equals(invokers.get(i).getUrl().getHost())) {
            result.add(invokers.get(i));
        }
    }
    return result;
} (invokers); // 表示立即执行方法

3.2.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
 * 脚本类型 与 ScriptEngine 的映射缓存
 */
private static final Map<String, ScriptEngine> engines = new ConcurrentHashMap<String, ScriptEngine>();

/**
 * 路由规则 URL
 */
private final ScriptEngine engine;
/**
 * 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为 0 。
 */
private final int priority;
/**
 * 路由规则内容
 */
private final String rule;
/**
 * 路由规则 URL
 */
private final URL url;

public ScriptRouter(URL url) {
    this.url = url;
    String type = url.getParameter(Constants.TYPE_KEY);
    this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
    String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
    // 初始化 `engine`
    if (type == null || type.length() == 0) {
        type = Constants.DEFAULT_SCRIPT_TYPE_KEY;
    }
    if (rule == null || rule.length() == 0) {
        throw new IllegalStateException(new IllegalStateException("route rule can not be empty. rule:" + rule));
    }
    ScriptEngine engine = engines.get(type);
    if (engine == null) { // 在缓存中不存在,则进行创建 ScriptEngine 对象
        engine = new ScriptEngineManager().getEngineByName(type);
        if (engine == null) {
            throw new IllegalStateException(new IllegalStateException("Unsupported route rule type: " + type + ", rule: " + rule));
        }
        engines.put(type, engine);
    }
    this.engine = engine;
    this.rule = rule;
}

3.2.2 route

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    try {
        // 执行脚本
        List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);
        Compilable compilable = (Compilable) engine;
        Bindings bindings = engine.createBindings();
        bindings.put("invokers", invokersCopy);
        bindings.put("invocation", invocation);
        bindings.put("context", RpcContext.getContext());
        CompiledScript function = compilable.compile(rule); // 编译
        Object obj = function.eval(bindings); // 执行
        // 根据结果类型,转换成 (List<Invoker<T>>) 类型返回
        if (obj instanceof Invoker[]) {
            invokersCopy = Arrays.asList((Invoker<T>[]) obj);
        } else if (obj instanceof Object[]) {
            invokersCopy = new ArrayList<Invoker<T>>();
            for (Object inv : (Object[]) obj) {
                invokersCopy.add((Invoker<T>) inv);
            }
        } else {
            invokersCopy = (List<Invoker<T>>) obj;
        }
        return invokersCopy;
    } catch (ScriptException e) {
        // 发生异常,忽略路由规则,返回全 `invokers` 集合
        logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
        return invokers;
    }
}
  • 比较易懂,胖友自己看代码注释。

3.2.3 compareTo

1
2
3
4
5
6
7
8
@Override
public int compareTo(Router o) {
    if (o == null || o.getClass() != ScriptRouter.class) {
        return 1;
    }
    ScriptRouter c = (ScriptRouter) o;
    return this.priority == c.priority ? rule.compareTo(c.rule) : (this.priority > c.priority ? 1 : -1);
}
  • 优先,按照 降序“priority”。
  • 其次,按照 升序“rule”。

3.3 MockInvokersSelector

详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(八)之 Mock 实现》

4. 集成 Router 模块

如下图所示,我们可以看到,有二个类,调用 Router#route(List, URL, Invocation) 方法,集成 Router 模块。

集成Router模块

4.1 AbstractDirectory

4.1.1 setRouters

#setRouters(List routers) 方法,设置路由规则们。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void setRouters(List<Router> routers) {
    // copy list // 复制 routers ,因为下面要修改
    routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
    // append url router
    // 拼接 `url` 中,配置的路由规则
    String routerkey = url.getParameter(Constants.ROUTER_KEY);
    if (routerkey != null && routerkey.length() > 0) {
        RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
        routers.add(routerFactory.getRouter(url));
    }
    // append mock invoker selector
    routers.add(new MockInvokersSelector());
    // 排序
    Collections.sort(routers);
    // 赋值给属性
    this.routers = routers;
}
  • 第 3 行:复制重新创建routers数组,因为下面会进行修改。
  • 第 5 至 10 行:添加配置的路由规则url中到routers中。例如:
1
2
3
4
<dubbo:registry id="zk01" address="zookeeper://127.0.0.1:2181">
    <dubbo:parameter key="router" value="file" />
    <dubbo:parameter key="rule" value="/Users/yunai/xxx.js" />
</dubbo:registry>
  • 受限于 XML 对字符的限制,”condition” 或 “script” 类型的路由配置会比较难设置。所以笔者认为,如果是使用 XML 配置路由规则,”file” 类型是比较合适的方式。当然,如果使用 Java API 又或者注解的方式,应该不存在这样的问题。

  • 第 12 行:添加 MockInvokersSelector到routers中。
  • 第 14 行:排序routers。
  • 第 16 行:赋值属性给 AbstractDirectory 。

4.1.2 list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    // 获得所有 Invoker 集合
    List<Invoker<T>> invokers = doList(invocation);
    // 根据路由规则,筛选 Invoker 集合
    List<Router> localRouters = this.routers; // local reference 本地引用,避免并发问题
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}
  • 第 8 至 20 行:循环匹配的,调用Router#route(invokers, url, invocation)方法,不断路由,筛选Invoker 集合。
    • 第 13 行:判断 true否则只在提供者地址列表变更时预先执行并缓存结果“runtime”为才执行:是否在每次调用时执行路由规则,调用时直接从缓存中获取路由结果。如果用了参数路由,必须设为true,需要注意设置会影响调用的性能,可不填,缺省为flase。

4.2 RegistryDirectory

4.2.1 notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public synchronized void notify(List<URL> urls) {
    // 【省略无关代码】根据 URL 的分类或协议,分组成三个集合 。
    List<URL> invokerUrls = new ArrayList<URL>(); // 服务提供者 URL 集合
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();

    //  【省略无关代码】处理配置规则 URL 集合
    // configurators

    // 处理路由规则 URL 集合
    if (!routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }

    //  【省略无关代码】合并配置规则,到 `directoryUrl` 中,形成 `overrideDirectoryUrl` 变量。
    //  【省略无关代码】处理服务提供者 URL 集合
}
  • 第 12 行:若注册中心通知的routerUrls非空,进行处理routerUrls集合。
  • 第 13 行:调用 转换#toRouters(routerUrls)方法,将路由规则 URL 集合,成对应的 Router 集合。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private List<Router> toRouters(List<URL> urls) {
    List<Router> routers = new ArrayList<Router>();
    if (urls == null || urls.isEmpty()) {
        return routers;
    }
    for (URL url : urls) {
        // 忽略,若是 "empty://" 。一般情况下,所有路由规则被删除时,有且仅有一条协议为 "empty://" 的路由规则 URL
        if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
            continue;
        }
        // 获得 "router"
        String routerType = url.getParameter(Constants.ROUTER_KEY);
        if (routerType != null && routerType.length() > 0) {
            url = url.setProtocol(routerType);
        }
        try {
            // 创建 Router 对象
            Router router = routerFactory.getRouter(url);
            // 添加到返回结果
            if (!routers.contains(router)) {
                routers.add(router);
            }
        } catch (Throwable t) {
            logger.error("convert router url to router error, url: " + url, t);
        }
    }
    return routers;
}
  • 代码易懂,胖友看下注释理解。

  • 第 14 至 16 行:null0「4.1.1 setRouters」routers集合非( 允许集合大小为0),调用#setRouters(routers)方法,设置路由规则集合,即。

4.2.2 toMethodInvokers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
    // 创建新的 `methodInvokerMap`
    Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>();
    // 创建 Invoker 集合
    List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
    //  【省略无关代码】按服务提供者 URL 所声明的 methods 分类,兼容注册中心执行路由过滤掉的 methods
    // 路由全 `invokersList` ,匹配合适的 Invoker 集合
    List<Invoker<T>> newInvokersList = route(invokersList, null);
    // 添加 `newInvokersList` 到 `newMethodInvokerMap` 中,表示该服务提供者的全量 Invoker 集合
    newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
    // 循环,基于每个方法路由,匹配合适的 Invoker 集合
    if (serviceMethods != null && serviceMethods.length > 0) {
        for (String method : serviceMethods) {
            List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
            if (methodInvokers == null || methodInvokers.isEmpty()) {
                methodInvokers = newInvokersList;
            }
            newMethodInvokerMap.put(method, route(methodInvokers, method));
        }
    }
    // 【省略无关代码】循环排序每个方法的 Invoker 集合,并设置为不可变
}
  • 第 8 行:调用 全进行缓存只在提供者地址列表变更时预先执行并缓存结果**#route(invokers, method)方法,路由invokersList,匹配合适的 Invoker 集合,这就是上文提到的”。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private List<Invoker<T>> route(List<Invoker<T>> invokers, String method) {
    // 创建 Invocation 对象
    Invocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
    // 获得 Router 数组
    List<Router> routers = getRouters();
    // 根据路由规则,筛选 Invoker 集合
    if (routers != null) {
        for (Router router : routers) {
            if (router.getUrl() != null) {
                invokers = router.route(invokers, getConsumerUrl(), invocation);
            }
        }
    }
    return invokers;
}
  • 主要是调用Router#route(…)方法,路由。

  • 第 11 至 20 行:循环每个方法进行缓存,调用#route(invokers, method)方法,路由的methodInvokers,匹配合适的 Invoker 集合。

本文由作者按照 CC BY 4.0 进行授权