集群容错(三)之Directory实现
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码解析 —— 集群容错(二)之 Cluster 实现》 一文,分享 dubbo-cluster 模块, directory 包,各种 Directory 实现类。
Directory ,中文直译为目录,代表了多个 Invoker ,可以把它看成 List 。但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更。
Directory 子类如下图:
Directory 子类
我们看到有两个实现类:
- StaticDirectory ,静态静态 Directory 实现类,从命名上看出它是 的 List 。
- RegistryDirectory ,基于注册中心动态动态 的 Directory 实现类,从命名上看出它是 的,会根据注册中心的推送变更 List 。
2. Directory
com.alibaba.dubbo.rpc.cluster.Directory ,继承 Node 接口,Directory 接口。代码如下:
```plain text plain public interface Directory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
---
- 定义了**两个类型Invoker 集合**
接口方法,分别返回服务的
和
。
- 一个 Directory **只对应**
一个服务类型。
# 3. AbstractDirectory
com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory ,实现 Directory 接口,Directory 抽象实现类,实现了公用的**路由规则( Router )**的逻辑。
## 3.1 构造方法
```plain text
plain /** * 是否已经销毁 */ private volatile boolean destroyed = false; /** * 注册中心 URL */ private final URL url; /** * 消费者 URL * * 若未显示调用 {@link #AbstractDirectory(URL, URL, List)} 构造方法,consumerUrl 等于 {@link #url} */ private volatile URL consumerUrl; /** * Router 数组 */ private volatile List<Router> routers; public AbstractDirectory(URL url) { this(url, null); } public AbstractDirectory(URL url, List<Router> routers) { this(url, url, routers); } public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) { if (url == null) { throw new IllegalArgumentException("url == null"); } this.url = url; this.consumerUrl = consumerUrl; // 设置 Router 数组 setRouters(routers); }
- consumerUrl 字段,认真看下注释和构造方法。
- 调用 #setRouters(routers) 方法,初始化并设置 Router 数组。
3.2 setRouters
#setRouters(routers) 方法,初始化并设置 Router 数组。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》 中。
3.3 list
#list(Invocation)实现方法,获得所有服务 Invoker 集合。代码如下:
```plain text plain 1: @Override 2: public List<Invoker
1
2
3
4
5
6
7
8
9
---
- 第 7 行:调用 **抽象**
#doList(invocation)
方法,获得所有 Invoker 集合。代码如下:
```plain text
plain protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
- 第 9 至 20 行:根据路由规则( Router )《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》 ,进一步筛选合适的 Invoker 集合。详细解析,见 。
4. RegistryDirectory
com.alibaba.dubbo.registry.integration.RegistryDirectory ,实现 NotifyListener 接口,实现 AbstractDirectory 抽象类,基于注册中心的 Directory 实现类。
- RegistryDirectory 在 dubbo-registry 模块, integration 包下,是 Dubbo 注册中心模块集成 Directory 的实现类。
- RegistryDirectory 作为一个 NotifyListener ,订阅监听 注册中心( Registry ) 的数据,实现对变更的 。
4.1 构造方法
RegistryDirectory 的字段有 17 个,比较多,所以胖友请耐心。
```plain text plain // ========== Dubbo SPI Adaptive 对象 BEGIN ========== /** * Cluster$Adaptive 对象 */ private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); /** * RouterFactory$Adaptive 对象 */ private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension(); /** * ConfiguratorFactory$Adaptive 对象 */ private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension(); // ========== 服务消费者相关 BEGIN ========== /** * 服务类型,例如:com.alibaba.dubbo.demo.DemoService */ private final Class
1
2
3
4
5
6
7
8
9
10
11
12
13
---
- 分成**五类**
变量。胖友自己看注释。
- 如果不理解,可以结合下面的具体方法的使用。 当然也可以给我留言,因为确实变量有点多和复杂。
## 4.2 subscribe
#subscribe(URL) 方法,向**注册中心**发起订阅。代码如下:
```plain text
plain public void subscribe(URL url) { // 设置消费者 URL setConsumerUrl(url); // 向注册中心,发起订阅 registry.subscribe(url, this); }
- 调用父 #setConsumerUrl(url) 方法,设置 consumerUrl 消费者 URL 。
- 调用 Registry#subscribe(url, NotifyListener) 方法,向注册中心,发起订阅。
服务消费者,再引用服务时,会创建 RegistryDirectory 对象,并发起1)服务提供者 + 2)路由规则 + 3)配置规则的数据订阅。如下图:
doRefer
- 对应为 RegistryProtocol#doRefer(Cluster, Registry, Class type, URL url) 方法。
4.3 notify
在注册中心( Registry )发现数据发生变化时,会通知对应的 NotifyListener 们。如下图:
notify
- 对应为 AbstractRegistry#notify(URL url, NotifyListener, List urls) 方法。
- 因为 RegistryDirectory 作为一个 NotifyListener ,向注册中心( Registry )发起了订阅,所以此时会被通知。注意,是按照分类循环通知的一次只有一类 URL ,也就是说, 。
#notify(List urls)实现方法,代码如下:
```plain text plain 1: @Override 2: public synchronized void notify(List
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
---
- **注意同步**
,这是一个
的方法。
- 第 3 至 19 行:根据 URL 的**分类三个1)服务提供者 + 2)路由规则 + 3)配置规则**
或协议,分成组
集合:
。
- 第 20 至 24 行:非空,调用 **配置规则**[「4.3.1 toConfigurators」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#)
#toConfigurators(configuratorUrls)
方法,处理
URL 集合。详细解析,见
。
- 第 25 至 32 行:非空,调用 **路由规则**[「4.3.2 toRouters」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#)
#toRouters(routerUrls)
方法,处理
URL 集合。详细解析,见
。
- 若转换到 **父**
routers
非空,调用
#setRouters(routers)
方法,设置路由规则。
- 第 33 至 41 行:合并配置规则,到 [《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》](http://svip.iocoder.cn/Dubbo/cluster-6-impl-configurator?self=)[「4.1.2 mergeUrl」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#)
directoryUrl
中,形成
overrideDirectoryUrl
变量。详细解析,见
的
。
- 第 43 行:调用 **服务提供者**[「4.3.3 refreshInvoker」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#)
#refreshInvoker(invokerUrls)
方法,处理
URL 集合。详细解析,见
。
### 4.3.1 toConfigurators
详细解析,见 [《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》](http://svip.iocoder.cn/Dubbo/cluster-6-impl-configurator?self=) 的 [「4.1.1 toConfigurators」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#) 。
### 4.3.2 toRouters
详细解析,见 [《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》](http://svip.iocoder.cn/Dubbo/cluster-7-impl-router/?self=) 。
## 4.7 内部类
### 4.7.1 InvokerDelegate
InvokerDelegate ,实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper 类,Invoker 代理类,主要用于存储**注册中心下发的 url 地址**( providerUrl ),用于重新重新 refer 时能够根据 providerURL queryMap overrideMap 重新组装。 代码如下:
老艿艿:目前貌似没看到这块逻辑噢
```plain text
plain private static class InvokerDelegate<T> extends InvokerWrapper<T> { /** * 服务提供者 URL * * 未经过配置合并 */ private URL providerUrl; public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) { super(invoker, url); this.providerUrl = providerUrl; } public URL getProviderUrl() { return providerUrl; } }
4.7.2 InvokerComparator
InvokerComparator ,实现 Comparator 接口,Invoker 排序器实现类,根据 URL 升序 。代码如下:
```plain text plain private static class InvokerComparator implements Comparator<Invoker<?» { /** * 单例 */ private static final InvokerComparator comparator = new InvokerComparator(); private InvokerComparator() { } public static InvokerComparator getComparator() { return comparator; } @Override public int compare(Invoker<?> o1, Invoker<?> o2) { return o1.getUrl().toString().compareTo(o2.getUrl().toString()); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
---
### 4.3.3 refreshInvoker
#refreshInvoker(List invokerUrls) 方法,官方注释其如下:
根据 invokerURL 列表转换为 invoker 列表。转换规则如下:
3. 如果 url 已经被转换为 invoker ,则不在重新引用,直接从缓存中获取,注意如果 url 中任何一个参数变更也会重新引用
4. 如果传入的 invoker 列表不为空,则表示最新的 invoker 列表
5. 如果传入的 invokerUrl 列表是空,则表示只是下发的 override 规则或 route 规则,需要重新交叉对比,决定是否需要重新引用。
- 是不是看起来有点点懵逼?淡定,我们来看看代码。
```plain text
plain 1: private void refreshInvoker(List<URL> invokerUrls) { 2: if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null 3: && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { 4: // 设置禁止访问 5: this.forbidden = true; // Forbid to access 6: // methodInvokerMap 置空 7: this.methodInvokerMap = null; // Set the method invoker map to null 8: // 销毁所有 Invoker 集合 9: destroyAllInvokers(); // Close all invokers 10: } else { 11: // 设置允许访问 12: this.forbidden = false; // Allow to access 13: // 引用老的 urlInvokerMap 14: Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference 15: // 传入的 invokerUrls 为空,说明是路由规则或配置规则发生改变,此时 invokerUrls 是空的,直接使用 cachedInvokerUrls 。 16: if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { 17: invokerUrls.addAll(this.cachedInvokerUrls); 18: // 传入的 invokerUrls 非空,更新 cachedInvokerUrls 。 19: } else { 20: this.cachedInvokerUrls = new HashSet<URL>(); 21: this.cachedInvokerUrls.addAll(invokerUrls); //Cached invoker urls, convenient for comparison //缓存invokerUrls列表,便于交叉对比 22: } 23: // 忽略,若无 invokerUrls 24: if (invokerUrls.isEmpty()) { 25: return; 26: } 27: // 将传入的 invokerUrls ,转成新的 urlInvokerMap 28: Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map 29: // 转换出新的 methodInvokerMap 30: Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map 31: // state change 32: // If the calculation is wrong, it is not processed. 如果计算错误,则不进行处理. 33: if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { 34: logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); 35: return; 36: } 37: // 若服务引用多 group ,则按照 method + group 聚合 Invoker 集合 38: this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; 39: this.urlInvokerMap = newUrlInvokerMap; 40: // 销毁不再使用的 Invoker 集合 41: try { 42: destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker 43: } catch (Exception e) { 44: logger.warn("destroyUnusedInvokers error. ", e); 45: } 46: } 47: }
- ========== 第一部分 ==========
- 第 2 至 3 行:当 1下线 invokerUrls 集合大小为 ,并且协议为 empty:// ,说明所有服务提供者都已经 。若注册中心为 Zookeeper ,可参见 ZookeeperRegistry#toUrlsWithEmpty(URL consumer, String path, List providers) 方法。
- 第 5 行:设置禁止 访问,因为没有服务提供者了。
- 第 7 行: methodInvokerMap 置空。
- 第 9 行:调用 「4.3.3.5 destroyAllInvokers」 #destroyAllInvokers() 方法,销毁所有服务提供者 Invoker 集合。详细解析,见 。
- ========== 第二部分 ==========
- 第 12 行:设置允许 访问,因为有服务提供者了。
- 第 15 至 17 行:传入的 说明是路由规则或配置规则发生改变 invokerUrls 为空, ,此时 invokerUrls 是空的,直接使用 cachedInvokerUrls 。对应官方注释【第 3 点】(部分,不包括“需要重新交叉对比,决定是否需要重新引用”)。
- 第 18 至 22 行:传入的 新的 invokerUrls 非空,更新 cachedInvokerUrls 。考虑到并发的问题,更新的方式为创建 HashSet 。对应官方注释【第 2 点】。
- 为什么【第 15 至 17 行】不需要更新 呢?因为 invokerUrls 为空,直接使用 cachedInvokerUrls ,相当于进行了“更新”。
- 第 23 至 26 行:忽略,若无 invokerUrls 。出现情况为,初始是按照 configurators => routers => providers ,所以前两个会出现这个情况。关于这一点,胖友可以调试感受下。
- 第 28 行:调用 新的「4.3.3.1 toInvokers」 #toInvokers(List urls) 方法,将传入的 invokerUrls ,转换成 urlInvokerMap 。详细解析,见 。
- 第 30 行:调用 新的「4.3.3.2 toMethodInvokers」 #toMethodInvokers(newUrlInvokerMap) 方法,将 urlInvokerMap 转成与方法的映射关系,即 methodInvokerMap 。详细解析,见 。
- 第 31 至 36 行:如果计算错误,则不进行处理。一般来说,是防御性编程。
- 第 38 行:若服务引用多method + group「4.3.3.3 toMethodInvokers」 group ,则调用 #toMergeMethodInvokerMap(newMethodInvokerMap) 方法,按照 聚合 Invoker 集合。详细解析,见 。
- 第 39 行:赋值 urlInvokerMap 属性。
- 第 40 至 45 行:调用 销毁「4.3.3.4 toMethodInvokers」 #destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap) 方法, 不再使用的 Invoker 集合。详细解析,见 。
4.3.3.1 toInvokers
#toInvokers(List urls) 方法,
```plain text plain 1: private Map<String, Invoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
---
- 第 3 行:
newUrlInvokerMap
变量,新的
urlInvokerMap
字段,后面会赋值给它。
- 第 4 至 7 行:若
urls
为空,直接返回,防御性编程。
- 第 9 行:**已初始化**
keys
变量,
的服务器提供 URL 集合,即服务提供者 URL 已经处理。
- 第 11 行:获得引用服务的协议。一般情况下,我们不会设置
配置项。
- 第 13 行:**循环**
urls
集合,转成 Invoker 集合。
- 协议处理相关
- 第 14 至 28 行:如果 reference 端配置了 protocol ,则**只选择匹配**
的 protocol 。
- 第 29 至 32 行:**忽略**
,若为
empty://
协议。
- 第 33 至 38 行:**忽略**
,若应用程序不支持该协议。
- 第 40 行:调用 **合并**[「4.3.3.1 mergeUrl」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#)
#mergeUrl(providerUrl)
方法,
URL 参数。详细解析,见
。
- 第 41 至 47 行:**忽略**
,通过
keys
判断已经初始化。
- 若未初始化,添加到
keys
中。
- 第 48 至 75 行:“创建”服务 Invoker 对象。
- 第 50 至 51 行:获得
url
对应在
localUrlInvokerMap
缓存的 Invoker 对象。
- 第 52 至 72 行:不在缓存中,需要重新 refer 引用,创建 Invoker 对象。
- 第 54 至 60 行:通过配置项
enable
和
disable
判断,服务是否开启。
- 第 61 至 65 行: 若开启,创建 Invoker 对象。
- 【重要】**第 64 行:调用 Protocol$Adaptive#refer(serviceType, url) 方法,引用服务,创建服务提供者 Invoker 对象**[《精尽 Dubbo 源码解析 —— 服务引用》](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#)
。详细解析,在
已经有了。
- 第 64 行:创建 InvokerDelegate 对象。详细解析,见 [「4.7.1 InvokerDelegate」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#)
。
- 第 73 至 75 行:在缓存中,直接使用缓存的 Invoker 对象,添加到
newUrlInvokerMap
中。
- 第 78 行:清空
keys
。
- 第 79 行:返回结果
newUrlInvokerMap
。
### 4.3.3.1.1 mergeUrl
#mergeUrl(providerUrl) 方法,合并 URL 参数,**优先级**为配置规则 > 服务消费者配置 > 服务提供者配置。代码如下:
```plain text
plain 1: private URL mergeUrl(URL providerUrl) { 2: // 合并消费端参数 3: providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters 4: 5: // 合并配置规则 6: List<Configurator> localConfigurators = this.configurators; // local reference 7: if (localConfigurators != null && !localConfigurators.isEmpty()) { 8: for (Configurator configurator : localConfigurators) { 9: providerUrl = configurator.configure(providerUrl); 10: } 11: } 12: 13: // 不检查连接是否成功,总是创建 Invoker ! 14: providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker! 15: 16: // The combination of directoryUrl and override is at the end of notify, which can't be handled here 17: // 仅合并提供者参数,因为 directoryUrl 与 override 合并是在 notify 的最后,这里不能够处理 18: this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters // 合并提供者参数 19: 20: // 【忽略】因为是对 1.0 版本的兼容 21: if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0) 22: && "dubbo".equals(providerUrl.getProtocol())) { // Compatible version 1.0 23: //fix by tony.chenl DUBBO-44 24: String path = directoryUrl.getParameter(Constants.INTERFACE_KEY); 25: if (path != null) { 26: int i = path.indexOf('/'); 27: if (i >= 0) { 28: path = path.substring(i + 1); 29: } 30: i = path.lastIndexOf(':'); 31: if (i >= 0) { 32: path = path.substring(0, i); 33: } 34: providerUrl = providerUrl.setPath(path); 35: } 36: } 37: 38: // 返回服务提供者 URL 39: return providerUrl; 40: }
- 【重要「6. ClusterUtils」 】第 3 行:调用 ClusterUtils#mergeUrl(providerUrl, queryMap) 方法,合并服务消费者配置到 providerUrl 。详细解析,见 。
- 第 5 至 11 行:合并配置规则《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》 到 providerUrl 中。详细解析,见
- 第 14 行:设置 providerUrl 不检查连接是否成功,总是创建 Invoker !
- 第 18 行:仅合并提供者参数。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》
- 第 20 至 36 行:【忽略 】因为是对 1.0 版本的兼容。
4.3.3.2 toMethodInvokers
#toMethodInvokers(Map<String, Invoker> invokersMap) 方法,将 invokersMap 转成与方法的映射关系。代码如下:
```plain text plain 1: private Map<String, List<Invoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
---
- 第 3 行:
newMethodInvokerMap
变量,新的
methodInvokerMap
字段,后面会赋值给它。
- 第 5 行:创建 Invoker 集合。在【第 29 行】,我们可以看到,实际就是
invokersMap
的值的集合。
- 第 8 至 31 行:按照方法名为**维度对应的 Invoker 集合**
( KEY ) ,聚合
到
newMethodInvokerMap
中。
- 第 33 行:路由全 [《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》](http://svip.iocoder.cn/Dubbo/cluster-7-impl-router/?self=)
invokersList
,匹配合适的 Invoker 集合。详细解析,见
。
- 第 35 行:添加 **全量**
newInvokersList
到
newMethodInvokerMap
中,表示该服务提供者的
Invoker 集合。
- 第 36 至 45 行:**循环**[《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》](http://svip.iocoder.cn/Dubbo/cluster-7-impl-router/?self=)
,基于每个方法路由,匹配合适的 Invoker 集合。详细解析,见
。
- 第 46 至 53 行:循环**排序不可变**
每个方法的 Invoker 集合,并设置为
。
### 4.3.3.3 toMergeMethodInvokerMap
#toMergeMethodInvokerMap(Map<String, List<Invoker>> methodMap) ,按照 **method + group** 聚合 Invoker 集合。代码如下:
```plain text
plain 1: private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) { 2: Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>(); 3: // 循环方法,按照 method + group 聚合 Invoker 集合 4: for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) { 5: String method = entry.getKey(); 6: List<Invoker<T>> invokers = entry.getValue(); 7: // 按照 Group 聚合 Invoker 集合的结果。其中,KEY:group VALUE:Invoker 集合。 8: Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>(); 9: // 循环 Invoker 集合,按照 group 聚合 Invoker 集合 10: for (Invoker<T> invoker : invokers) { 11: String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, ""); 12: List<Invoker<T>> groupInvokers = groupMap.get(group); 13: if (groupInvokers == null) { 14: groupInvokers = new ArrayList<Invoker<T>>(); 15: groupMap.put(group, groupInvokers); 16: } 17: groupInvokers.add(invoker); 18: } 19: // 大小为 1,使用第一个 20: if (groupMap.size() == 1) { 21: result.put(method, groupMap.values().iterator().next()); 22: // 大于 1,将每个 Group 的 Invoker 集合,创建成 Cluster Invoker 对象。 23: } else if (groupMap.size() > 1) { 24: List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>(); 25: for (List<Invoker<T>> groupList : groupMap.values()) { 26: groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList))); 27: } 28: result.put(method, groupInvokers); 29: // 大小为 0 ,使用原有值 30: } else { 31: result.put(method, invokers); 32: } 33: } 34: return result; 35: }
- 第 2 行:新的 result 属性, methodInvokerMap 字段,后面会赋值给它。
- 第 3 终 33 行:循环method + group ,按照 聚合 Invoker 集合。
- 第 8 行: 按照 Group 聚合 Invoker 集合的结果。其中,KEYVALUE :group , :Invoker 集合。
- 第 9 至 18 行:循环group Invoker 集合,按照 聚合 Invoker 集合。
- ========== 结果 groupMap 处理 ==========
- 第 19 至 21 行:若数量为 1 ,使用第一个。
- 第 29 至 32 行:若数量为 0 ,使用原有值 等价 invokers 。实际上,和【第 19 至 21 行】 。
- 第 22 至 28 行:若数量大于每个 1 ,循环 Group 的 Invoker 集合,调用 Cluster$Adaptive#join(Directory) 方法,创建对应的 Cluster Invoker 对象。
- 我们发现,此处创建 StaticDirectory「5. StaticDirectory」 对象。详细解析,见 。
那么,引用多个服务分组有什么用呢?为什么要按照 group 进行聚合,直接调用不可以么?让我们来打开 ProtocolRegistry#refer(Class type, URL url) 方法,如下图所示:
refer
- 当引用多个服务分组时,会自动分组聚合 使用到 的特性。那么之后 MergeableCluster 会怎么做呢?详细解析,见后文 。
4.3.3.4 destroyUnusedInvokers
#destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap) 方法,销毁不再使用的 Invoker 集合。代码如下:
```plain text plain private void destroyUnusedInvokers(Map<String, Invoker
1
2
3
4
5
6
7
8
9
---
### 4.3.3.5 destroyAllInvokers
#destroyAllInvokers() 方法,销毁所有服务提供者 Invoker 。代码如下:
```plain text
plain private void destroyAllInvokers() { Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference 本地引用,避免并发问题 if (localUrlInvokerMap != null) { // 循环 urlInvokerMap ,销毁所有服务提供者 Invoker for (Invoker<T> invoker : new ArrayList<Invoker<T>>(localUrlInvokerMap.values())) { try { invoker.destroy(); } catch (Throwable t) { logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t); } } // urlInvokerMap 清空 localUrlInvokerMap.clear(); } // methodInvokerMap 置空 methodInvokerMap = null; }
4.4 doList
#doList(Invocation)实现方法,获得对应的 Invoker 集合。代码如下:
```plain text plain 1: @Override 2: public List<Invoker
1
2
3
4
5
6
7
8
9
10
11
---
- 通过四种方式,从
methodInvokerMap
中,获得对应的 Invoker 集合。
- 第一种,可根据**第一个参数**
枚举路由。这是个非常小众的场景,胖友不必理解。例子如下:
```plain text
plain // DemoService 接口定义 public interface DemoService { void hello(String name); void hello01(String name); void hello02(String name); } // 消费者调用 DemoService demoService = (DemoService) context.getBean("demoService"); demoService.hello("01");
```plain text
- 通过这样的方式,调用到的服务提供者的 DemoServiceImpl#hello01(name) 方法。
- 如果使用该
特性,注意避免出现无关的几个方法,例如 #hello(name) 和 #hello01(name) 是毫无关系的两个方法,而我真的想调用 #hello(name) 方法,结果调用到了 #hello01(name) 方法。
- 如下是 Dubbo Commiter
诣极 的解惑,非常感谢。动态的方法名本身就是接口中已经定义的举个例子吧借口定义了 method, method1,method2, 如果我发起rpc调用method(1, 2, 3), 这个时候会去查找方法method1的invokers, 如果我这个时候发起rpc method(2, 1, 3), 这个时候会去查找方法method2的invokers, 然后调用invokers的method方法
- 另外,经过沟通,【第 19 行】的 "." 是个 BUG ,方法里不能包含该字符,因此,笔者改成了【第 20 行】,去掉了 "." 进行测试。
```
- 另外,经过沟通,【第 19 行】的 "." 是个 BUG ,方法里不能包含该字符,因此,笔者改成了【第 20 行】,去掉了 "." 进行测试。
- 第二种,根据方法名 获得 Invoker 集合。一般情况下,都能匹配到。
- 第三种,使用全量 Invoker 集合。例如, #$echo(name) 回声方法。
- 第四种,使用 methodInvokerMap 第一个 Invoker 集合。防御性编程。
4.5 isAvailable
```plain text plain @Override public boolean isAvailable() { // 若已销毁,返回不可用 if (isDestroyed()) { return false; } // 任意一个 Invoker 可用,则返回可用 Map<String, Invoker
1
2
3
4
5
6
7
---
## 4.6 destroy
```plain text
plain @Override public void destroy() { if (isDestroyed()) { return; } // 取消订阅 // unsubscribe. try { if (getConsumerUrl() != null && registry != null && registry.isAvailable()) { registry.unsubscribe(getConsumerUrl(), this); } } catch (Throwable t) { logger.warn("unexpeced error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t); } // 标记已经销毁 super.destroy(); // must be executed after unsubscribing // 销毁所有 Invoker try { destroyAllInvokers(); } catch (Throwable t) { logger.warn("Failed to destroy service " + serviceKey, t); } }
5. StaticDirectory
com.alibaba.dubbo.rpc.cluster.directory.StaticDirectory ,实现 AbstractDirectory 抽象类,静态 Directory 实现类。逻辑比较简单,将传入的 invokers 集合,封装成静态的 Directory 对象。代码如下:
```plain text plain public class StaticDirectory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
---
- 代码比较易懂,胖友自己看下。
---
除了在 [「4.3.3.3 toMergeMethodInvokerMap」](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory/#) 方法中,使用到了 StaticDirectory 对象。我们来看看 ReferenceConfig#createProxy(Map<String, String> map) 的使用,代码如下图:

createProxy
- 第 522 至 527 行:当 **有注册中心仅调用第一个可用的 Invoker 对象**
registryURL
非空时,意味着
,使用
cluster=available
集群方式,并调用
Cluster$Adaptive#join(StaticDirectory)
方法,创建对应的 Cluster Invoker 对象。这意味着,服务调用时,因为使用的是
cluster=available
,
。下面,我们来做一个 YY :
- 目前我们有 A , B 两个机房,分别对应 zk01 集群,zk02 集群。这两个 zk 集群**不互通**
。
- A , B 机房,分别部署了 **User 服务提供者**
,仅注册到自己机房的 zk 集群。
- A , B 机房,部署了对应的 **User 服务消费User 服务提供者**
,那么如果我们希望优先调用本机房。当本机房
全挂的情况下,使用另外一个机房,该如何配置呢?
```plain text
plain // A 机房 <dubbo:reference interface="com.alibaba.dubbo.demo.UserService" registry="zk01,zk02" /> // B 机房 <dubbo:reference interface="com.alibaba.dubbo.demo.UserService" registry="zk02,zk01" />
```plain text * 即在 "registry" 配置项中,将自己的 zk 集群放在前面。 * 当然,大多数情况下,很少会出现一个机房服务提供者全挂,zk 集群还存活着。
1
2
3
4
5
6
7
# 6. ClusterUtils
com.alibaba.dubbo.rpc.cluster.support.ClusterUtils ,Cluster 工具类。代码如下:
```plain text
plain 1: public class ClusterUtils { 2: 3: private ClusterUtils() { 4: } 5: 6: public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) { 7: // 合并配置 Map 结果 8: Map<String, String> map = new HashMap<String, String>(); 9: // 远程配置 Map 结果 10: Map<String, String> remoteMap = remoteUrl.getParameters(); 11: 12: // 添加 `remoteMap` 到 `map` 中,并移除不必要的配置 13: if (remoteMap != null && remoteMap.size() > 0) { 14: map.putAll(remoteMap); 15: 16: // Remove configurations from provider, some items should be affected by provider. 线程池配置不使用提供者的 17: map.remove(Constants.THREAD_NAME_KEY); 18: map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREAD_NAME_KEY); 19: 20: map.remove(Constants.THREADPOOL_KEY); 21: map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADPOOL_KEY); 22: 23: map.remove(Constants.CORE_THREADS_KEY); 24: map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.CORE_THREADS_KEY); 25: 26: map.remove(Constants.THREADS_KEY); 27: map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADS_KEY); 28: 29: map.remove(Constants.QUEUES_KEY); 30: map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.QUEUES_KEY); 31: 32: map.remove(Constants.ALIVE_KEY); 33: map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.ALIVE_KEY); 34: 35: map.remove(Constants.TRANSPORTER_KEY); 36: map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.TRANSPORTER_KEY); 37: } 38: // 添加 `localMap` 到 `map` 中 39: if (localMap != null && localMap.size() > 0) { 40: map.putAll(localMap); 41: } 42: 43: // 添加指定的 `remoteMap` 的配置项到 `map` 中,因为上面被 `localMap` 覆盖了。 44: if (remoteMap != null && remoteMap.size() > 0) { 45: // Use version passed from provider side 46: String dubbo = remoteMap.get(Constants.DUBBO_VERSION_KEY); 47: if (dubbo != null && dubbo.length() > 0) { 48: map.put(Constants.DUBBO_VERSION_KEY, dubbo); 49: } 50: String version = remoteMap.get(Constants.VERSION_KEY); 51: if (version != null && version.length() > 0) { 52: map.put(Constants.VERSION_KEY, version); 53: } 54: String group = remoteMap.get(Constants.GROUP_KEY); 55: if (group != null && group.length() > 0) { 56: map.put(Constants.GROUP_KEY, group); 57: } 58: String methods = remoteMap.get(Constants.METHODS_KEY); 59: if (methods != null && methods.length() > 0) { 60: map.put(Constants.METHODS_KEY, methods); 61: } 62: // Reserve timestamp of provider url. 保留 provider 的启动 timestamp 63: String remoteTimestamp = remoteMap.get(Constants.TIMESTAMP_KEY); 64: if (remoteTimestamp != null && remoteTimestamp.length() > 0) { 65: map.put(Constants.REMOTE_TIMESTAMP_KEY, remoteMap.get(Constants.TIMESTAMP_KEY)); 66: } 67: // Combine filters and listeners on Provider and Consumer 合并 filter 和 listener 68: String remoteFilter = remoteMap.get(Constants.REFERENCE_FILTER_KEY); 69: String localFilter = localMap.get(Constants.REFERENCE_FILTER_KEY); 70: if (remoteFilter != null && remoteFilter.length() > 0 71: && localFilter != null && localFilter.length() > 0) { 72: localMap.put(Constants.REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter); 73: } 74: String remoteListener = remoteMap.get(Constants.INVOKER_LISTENER_KEY); 75: String localListener = localMap.get(Constants.INVOKER_LISTENER_KEY); 76: if (remoteListener != null && remoteListener.length() > 0 77: && localListener != null && localListener.length() > 0) { 78: localMap.put(Constants.INVOKER_LISTENER_KEY, remoteListener + "," + localListener); 79: } 80: } 81: 82: // 清空原有配置,使用合并的配置覆盖 83: return remoteUrl.clearParameters().addParameters(map); 84: } 85: 86: }
- 将 合并前者指定 localMap 和 remoteUrl.parameters 成 map ,大多数以 为主【第 12 至 41 行】,部分 以后者为主【第 43 至 80 行】。
- 将合并的 覆盖 map 的结果, 设置到 remoteUrl 中。
666. 彩蛋
比想象中,长好多的一篇博客,原本预期会短蛮多的。
顺便吐槽下,中间碰到一些困惑,网络上搜了一圈,都没解释到很多细节的点的源码解析文章,真的是。哎~~~



