服务暴露(二)之远程暴露(Dubbo)
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
在 《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》 一文中,我们已经分享了本地暴露服务。在本文中,我们来分享远程暴露服务。在 Dubbo 中提供多种协议( Protocol ) 的实现,大体流程一致,本文以 Dubbo Protocol 为例子,这也是 Dubbo 的默认协议。
如果不熟悉该协议的同学,可以先看看 《Dubbo 使用指南 —— dubbo://》,简单了解即可。
特性
缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 remoting 交互。
- 连接个数:单连接
- 连接方式:长连接
- 传输协议:TCP
- 传输方式:NIO 异步传输
- 序列化:Hessian 二进制序列化
- 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。
- 适用场景:常规远程服务方法调用
相比本地暴露,远程暴露会多做如下几件事情:
- 启动通信服务器,绑定服务端口,提供远程调用。
- 向注册中心注册服务提供者,提供服务消费者从注册中心发现服务。
2. 远程暴露
远程暴露服务的顺序图如下:
在 #doExportUrlsFor1Protocol(protocolConfig, registryURLs) 方法中,涉及远程暴露服务的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 服务远程暴露
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
// 获得监控中心 URL
URL monitorUrl = loadMonitor(registryURL); // TODO 芋艿,监控
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
}
} else { // 用于被服务消费者直连服务提供者,参见文档 http://dubbo.apache.org/zh-cn/docs/user/demos/explicit-target.html 。主要用于开发测试环境使用。
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
}
}
- 第 30 至 41 行:大体和【第 7 至 29 行】逻辑相同。差别在于,当配置注册中心为
"N/A"时,表示即使远程暴露服务,也不向注册中心注册。这种方式用于被服务消费者直连服务提供者,参见 《Dubbo 用户指南 —— 直连提供者》 文档。在开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表。 - 第 8 行:循环祖册中心 URL 数组
registryURLs。 - 第 10 行:
"dynamic"配置项,服务是否动态注册。如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。 - 第 12 行:调用
#loadMonitor(registryURL)方法,获得监控中心 URL。- 在 「2.1 loadMonitor」 小节,详细解析。
- 第 13 至 15 行:调用 URL#addParameterAndEncoded(key, value) 方法,将监控中心的 URL 作为
"monitor"参数添加到服务提供者的 URL 中,并且需要编码。通过这样的方式,服务提供者的 URL 中,包含了监控中心的配置。 - 第 20 行:调用 URL#addParameterAndEncoded(key, value) 方法,将服务体用这的 URL 作为
"export"参数添加到注册中心的 URL 中。通过这样的方式,注册中心的 URL 中,包含了服务提供者的配置。 - 第 20 行:调用
ProxyFactory#getInvoker(proxy, type, url)方法,创建 Invoker 对象。该 Invoker 对象,执行#invoke(invocation)方法时,内部会调用 Service 对象(ref)对应的调用方法。- 详细的实现,后面单独写文章分享。
- 为什么传递的是注册中心的 URL 呢?下文会详细解析。
- 第 23 行:创建 com.alibaba.dubbo.config.invoker.DelegateProviderMetaDataInvoker 对象。该对象在 Invoker 对象的基础上,增加了当前服务提供者 ServiceConfig 对象。
- 第 26 行:调用
Protocol#export(invoker)方法,暴露服务。- 此处 Dubbo SPI 自适应的好处就出来了,可以自动根据 URL 参数,获得对应的拓展实现。例如,
invoker传入后,根据invoker.url自动获得对应 Protocol 拓展实现为 DubboProtocol。 - 实际上,Protocol 有两个 Wrapper 拓展实现类:ProtocolFilterWrapper、ProtocolListenerWrapper。所以,
#export(…)方法的调用顺序是:- Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
- =>
- Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
- 也就是说,这一条大的调用链,包含两条小的调用链。原因是:
- 首先,传入的是注册中心的 URL,通过 Protocol$Adaptive 获取到的是 RegistryProtocol 对象。
- 其次,RegistryProtocol 会在其
#export(…)方法中,使用服务提供者的 URL ( 即注册中心的 URL 的export参数值),再次调用 Protocol$Adaptive 获取到的是 DubboProtocol 对象,进行服务暴露。
- 为什么是这样的顺序?通过这样的顺序,可以实现类似 AOP 的效果,在本地服务器启动完成后,再向注册中心注册。伪代码如下:
- 此处 Dubbo SPI 自适应的好处就出来了,可以自动根据 URL 参数,获得对应的拓展实现。例如,
1
2
3
4
5
6
RegistryProtocol#export(...) {
// 1. 启动本地服务器
DubboProtocol#export(...);
// 2. 向注册中心注册。
}
1
2
- 这也是为什么上文提到的 "为什么传递的是**注册中心的 URL** 呢?" 的原因。
- 如果无法理解,在 [「3. Protocol」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 在解析代码,进一步理顺。
- 第 28 行:添加到
exporters集合中。
2.1 loadMonitor
友情提示,监控中心不是本文的重点,简单分享下该方法的逻辑。可直接跳过。
#loadMonitor(registryURL) 方法,加载监控中心 com.alibaba.dubbo.common.URL 数组。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 加载监控中心 URL
*
* @param registryURL 注册中心 URL
* @return 监控中心 URL
*/
protected URL loadMonitor(URL registryURL) {
// 从 属性配置 中加载配置到 MonitorConfig 对象。
if (monitor == null) {
String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address");
String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol");
if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) {
return null;
}
monitor = new MonitorConfig();
if (monitorAddress != null && monitorAddress.length() > 0) {
monitor.setAddress(monitorAddress);
}
if (monitorProtocol != null && monitorProtocol.length() > 0) {
monitor.setProtocol(monitorProtocol);
}
}
appendProperties(monitor);
// 添加 `interface` `dubbo` `timestamp` `pid` 到 `map` 集合中
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// 将 MonitorConfig ,添加到 `map` 集合中。
appendParameters(map, monitor);
// 获得地址
String address = monitor.getAddress();
String sysaddress = System.getProperty("dubbo.monitor.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
// 直连监控中心服务器地址
if (ConfigUtils.isNotEmpty(address)) {
// 若不存在 `protocol` 参数,默认 "dubbo" 添加到 `map` 集合中。
if (!map.containsKey(Constants.PROTOCOL_KEY)) {
if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) {
map.put(Constants.PROTOCOL_KEY, "logstat");
} else {
map.put(Constants.PROTOCOL_KEY, "dubbo");
}
}
// 解析地址,创建 Dubbo URL 对象。
return UrlUtils.parseURL(address, map);
// 从注册中心发现监控中心地址
} else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) {
return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map));
}
return null;
}
- 第 8 至 24 行:从属性配置中,加载配置到 MonitorConfig 对象。
- 第 25 至 32 行:添加
interface、dubbo、timestamp、pid到map集合中。 - 第 34 行:调用
#appendParameters(map, config)方法,将 MonitorConfig,添加到map集合中。 - 第 35 至 40 行:获得监控中心的地址。
- 第 42 行:当
address非空时,直连监控中心服务器地址的情况。- 第 43 至 50 行:若不存在
protocol参数,缺省默认为 “dubbo”,并添加到map集合中。- 第 44 至 55 行:可以忽略。因为,
logstat这个拓展实现已经不存在。
- 第 44 至 55 行:可以忽略。因为,
- 第 52 行:调用 UrlUtils#parseURL(address, map) 方法,解析
address,创建 Dubbo URL 对象。- 已经添加了代码注释,胖友点击链接查看。
- 第 43 至 50 行:若不存在
- 第 54 至 56 行:当
protocol = registry时,并且注册中心 URL 非空时,从注册中心发现监控中心地址。以registryURL为基础,创建 URL:- protocol = dubbo
- parameters.protocol = registry
- parameters.refer = map
- 第 57 行:无注册中心,返回空。
- ps:后续会有文章,详细分享。
3. Protocol
本文涉及的 Protocol 类图如下:
3.1 ProtocolFilterWrapper
接 《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「 3.2 ProtocolFilterWrapper」 小节。
#export(invoker) 方法,代码如下:
1
2
3
4
5
6
7
8
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 注册中心
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 建立带有 Filter 过滤链的 Invoker ,再暴露服务。
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
- 第 2 至 5 行:当
invoker.url.protocl = registry,注册中心的 URL,无需创建 Filter 过滤链。 - 第 7 行:调用
#buildInvokerChain(invoker, key, group)方法,创建带有 Filter 过滤链的 Invoker 对象。 - 第 7 行:调用
protocol#export(invoker)方法,继续暴露服务。 - 在 RegistryProtocol 中,会调用 第 7 行
DubboProtocol#export(…)方法时,会走【第 7 行】的流程。
3.2 RegistryProtocol
com.alibaba.dubbo.registry.integration.RegistryProtocol,实现 Protocol 接口,注册中心协议实现类。
3.2.1 属性
属性相关,代码如下:
友情提示,仅包含本文涉及的属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// ... 省略部分和本文无关的属性。
/**
* 单例。在 Dubbo SPI 中,被初始化,有且仅有一次。
*/
private static RegistryProtocol INSTANCE;
/**
* 绑定关系集合。
*
* key:服务 Dubbo URL
*/
// To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
// 用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露
// providerurl <--> exporter
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
/**
* Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。
*/
private Protocol protocol;
/**
* RegistryFactory 自适应拓展实现类,通过 Dubbo SPI 自动注入。
*/
private RegistryFactory registryFactory;
public RegistryProtocol() {
INSTANCE = this;
}
public static RegistryProtocol getRegistryProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); // load
}
return INSTANCE;
}
INSTANCE静态属性,单例。通过 Dubbo SPI 加载创建,有且仅有一次。#getRegistryProtocol()静态方法,获得单例。
bounds属性,绑定关系集合。其中,Key 为服务提供者 URL。protocol属性,Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。registryFactory属性,自适应拓展实现类,通过 Dubbo SPI 自动注入。- 用于创建注册中心 Registry 对象。
3.2.2 export
本文涉及的 #export(invoker) 方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 暴露服务
// export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 获得注册中心 URL
URL registryUrl = getRegistryUrl(originInvoker);
// 获得注册中心对象
// registry provider
final Registry registry = getRegistry(originInvoker);
// 获得服务提供者 URL
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
// 向本地注册表,注册服务提供者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
// 向注册中心注册服务提供者(自己)
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); // 标记向本地注册表的注册服务提供者,已经注册
}
// 使用 OverrideListener 对象,订阅配置规则
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
- 第 4 行:调用
#doLocalExport(invoker)方法,暴露服务。 - 第 7 行:调用
#getRegistryUrl(originInvoker)方法,获得注册中心 URL。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 获得注册中心 URL
*
* @param originInvoker 原始 Invoker
* @return URL
*/
private URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { // protocol
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryUrl;
}
1
- 该过程是我们在 [《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「2.1 loadRegistries」](http://svip.iocoder.cn/Dubbo/service-export-remote-dubbo/#) 的那张图的反向流程,即**红线部分**:
- 第 11 行:获得注册中心对象。
- 第 14 行:调用
#getRegistedProviderUrl(originInvoker)方法,获得服务提供者 URL。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
// 从注册中心的 export 参数中,获得服务提供者的 URL
URL providerUrl = getProviderUrl(originInvoker);
//The address you see at the registry
return providerUrl.removeParameters(getFilteredKeys(providerUrl)) // 移除 .hide 为前缀的参数
.removeParameter(Constants.MONITOR_KEY) // monitor
.removeParameter(Constants.BIND_IP_KEY) // bind.ip
.removeParameter(Constants.BIND_PORT_KEY) // bind.port
.removeParameter(QOS_ENABLE) // qos.enable
.removeParameter(QOS_PORT) // qos.port
.removeParameter(ACCEPT_FOREIGN_IP); // qos.accept.foreign.ip
}
private URL getProviderUrl(final Invoker<?> origininvoker) {
String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); // export
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
}
return URL.valueOf(export);
}
private static String[] getFilteredKeys(URL url) {
Map<String, String> params = url.getParameters();
if (params != null && !params.isEmpty()) {
List<String> filteredKeys = new ArrayList<String>();
for (Map.Entry<String, String> entry : params.entrySet()) {
if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
filteredKeys.add(entry.getKey());
}
}
return filteredKeys.toArray(new String[filteredKeys.size()]);
} else {
return new String[]{};
}
}
1
2
- **重点**,从注册中心的 URL 中获得 `export` 参数对应的值,即服务提供者的 URL。
- 移除**多余**的参数。因为,这些参数注册到注册中心没有实际的用途。
- 第 17 行:配置项
register,服务提供者是否注册到配置中心。 - 第 20 行:调用
ProviderConsumerRegTable#registerProvider(invoker, registryUrl)方法,向本地注册表,注册服务提供者。 - 第 24 行:调用
#register(registryUrl, registedProviderUrl)方法,向注册中心注册服务提供者(自己)。代码如下:
1
2
3
4
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
1
- 调用 `RegistryService#register(url)` 方法,在 [《精尽 Dubbo 源码分析 —— 注册中心(一)之抽象 API》「3. RegistryService」](http://svip.iocoder.cn/Dubbo/registry-api/?self=),有详细解析。
- 第 25 行:标记向本地注册表的注册服务提供者,已经注册。
- 第 28 至 34 行:使用 OverrideListener 对象,订阅配置规则。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》 中。
- 第 36 行:创建 DestroyableExporter 对象。
3.2.3 doLocalExport
#doLocalExport() 方法,暴露服务。此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 暴露服务。
*
* 此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。
*
* @param originInvoker 原始 Invoker
* @param <T> 泛型
* @return Exporter 对象
*/
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 获得在 `bounds` 中的缓存 Key
String key = getCacheKey(originInvoker);
// 从 `bounds` 获得,是不是已经暴露过服务
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
// 未暴露过,进行暴露服务
if (exporter == null) {
// 创建 Invoker Delegate 对象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 暴露服务,创建 Exporter 对象
// 使用 创建的Exporter对象 + originInvoker ,创建 ExporterChangeableWrapper 对象
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
// 添加到 `bounds`
bounds.put(key, exporter);
}
}
}
return exporter;
}
- 第 13 行:调用
#getCacheKey(originInvoker)方法,获得在bounds中的缓存 Key。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
/**
* Get the key cached in bounds by invoker
*
* 获 取invoker 在 bounds中 缓存的key
*
* @param originInvoker 原始 Invoker
* @return url 字符串
*/
private String getCacheKey(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
return providerUrl.removeParameters("dynamic", "enabled").toFullString();
}
- 第 14 至 18 行:从
bounds中,获得已经暴露过的 ExporterChangeableWrapper 对象。 - 第 20 至 28 行:未暴露过,进行暴露服务。
- 第 22 行:调用
#getProviderUrl(originInvoker)方法,获得服务提供者的 URL。 - 第 22 行:创建 com.alibaba.dubbo.registry.integration.RegistryProtocol.InvokerDelegete 对象。
- InvokerDelegete 实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper 类,主要增加了
#getInvoker()方法,获得真实的,非 InvokerDelegete 的 Invoker 对象。因为,可能会存在InvokerDelegete.invoker也是 InvokerDelegete 类型的情况。
- InvokerDelegete 实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper 类,主要增加了
- 第 25 行:调用
DubboProtocol#export(invoker)方法,暴露服务,返回 Exporter 对象。- 在 「4.3 DubboProtocol」 中,详细分享。
- 若此若是其他协议,若调用对应协议的
XXXProtocol#export(invoker)方法。
- 第 25 行:使用【创建的 Exporter 对象】+【绑定
originInvoker】,创建 ExporterChangeableWrapper 对象。这样,originInvoker就和 Exporter 对象,形成了绑定的关系。- ExporterChangeableWrapper 在 「4.1 ExporterChangeableWrapper」 看详细代码。
- 第 27 行:添加到
bounds。
- 第 22 行:调用
3.3 DubboProtocol
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol,实现 AbstractProtocol 抽象类,Dubbo 协议实现类。
3.3.1 属性
属性相关,代码如下:
友情提示,仅包含本文涉及的属性。
1
2
3
4
5
6
7
8
// ... 省略部分和本文无关的属性。
/**
* 通信服务器集合
*
* key: 服务器地址。格式为:host:port
*/
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
serverMap属性,通信服务器集合。其中,Key 为服务器地址,格式为host:port。
3.3.2 export
本文涉及的 #export(invoker) 方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 创建 DubboExporter 对象,并添加到 `exporterMap`。
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// TODO 【8033 参数回调】
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
// 初始化序列化优化器
optimizeSerialization(url);
return exporter;
}
- 第 6 行:调用 #serviceKey(url) 方法,获得服务键。该方法从父类继承而来。
- 第 7 行:创建 DubboExporter 对象。
- 在 「4.3 DubboExporter」 详细解析。
- 第 8 行:添加到 exporterMap 中。该属性从父类继承而来。
- 第 10 至 24 行:TODO 【8033 参数回调】
- 第 27 行:调用
#openServer(url)方法,启动服务器。 - 第 30 行:调用
#optimizeSerialization(url)方法,初始化序列化优化器。在 《精尽 Dubbo 源码分析 —— 序列化(一)之总体实现》 中,详细解析。
3.3.3 openServer
友情提示:本小节的内容,胖友先看过 《精尽 Dubbo 源码分析 —— NIO 服务器》 所有的文章。
#openServer(url) 方法,启动通信服务器。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 通信客户端集合
*
* key: 服务器地址。格式为:host:port
*/
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
/**
* 启动服务器
*
* @param url URL
*/
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
- 第 8 行:获得服务器地址。
- 第 10 行:配置项
isserver,可以暴露一个仅当前 JVM 可调用的服务。目前该配置项已经不存在。 - 第 12 行:从
serverMap获得对应服务器地址已存在的通信服务器。即,不重复创建。 - 第 13 至 14 行:通信服务器不存在,调用
#createServer(url)方法,创建服务器。 - 第 15 至 18 行:通信服务器已存在,调用
Server#reset(url)方法,重置服务器的属性。- 为什么会存在呢?因为键是
host:port,那么例如,多个 Service 共用同一个 Protocol,服务器是同一个对象。
- 为什么会存在呢?因为键是
3.3.4 createServer
#createServer(url) 方法,创建并启动通信服务器。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private ExchangeServer createServer(URL url) {
// 默认开启 server 关闭时发送 READ_ONLY 事件
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// 默认开启 heartbeat
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 校验 Server 的 Dubbo SPI 拓展是否存在
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 设置编解码器为 `"Dubbo"`
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 启动服务器
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 校验 Client 的 Dubbo SPI 拓展是否存在
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
- 第 4 行:默认开启 Server 关闭时,发送 READ_ONLY 事件。
- 第 7 行:默认开启心跳功能。
- 第 9 至 13 行:校验配置的 Server 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。
- 第 16 行:设置编解码器为
"Dubbo"协议,即 DubboCountCodec。 - 第 18 至 24 行:调用
Exchangers#bind(url, handler)方法,启动服务器。具体requestHanlder属性值的实现,我们放在 Dubbo 协议下的 RPC 的文章里分享。requestHanlder属性的简化代码如下:
1
2
3
4
5
6
7
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
// .... 省略具体实现代码
return invoker.invoke(inv);
}
}
- 第 26 至 33 行:校验配置的 Client 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。默认情况下,未配置,所以不会校验。
- 第 34 行:返回通信服务器。
4. Exporter
本文涉及的 Exporter 类图如下:
4.1 ExporterChangeableWrapper
com.alibaba.dubbo.registry.integration.RegistryProtocol.ExporterChangeableWrapper,实现 Exporter 接口,Exporter 可变的包装器。
- 建立【返回的 Exporter】与【Protocol export 出的 Exporter】的对应关系。
- 在 override 时可以进行关系修改。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private class ExporterChangeableWrapper<T> implements Exporter<T> {
/**
* 原 Invoker 对象
*/
private final Invoker<T> originInvoker;
/**
* 暴露的 Exporter 对象
*/
private Exporter<T> exporter;
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
this.originInvoker = originInvoker;
}
public Invoker<T> getOriginInvoker() {
return originInvoker;
}
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void setExporter(Exporter<T> exporter) {
this.exporter = exporter;
}
public void unexport() {
String key = getCacheKey(this.originInvoker);
// 移除出 `bounds`
bounds.remove(key);
// 取消暴露
exporter.unexport();
}
}
4.2 DestroyableExporter
com.alibaba.dubbo.registry.integration.RegistryProtocol.DestroyableExporter,实现 Exporter 接口,可销毁的 Exporter 实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private class ExporterChangeableWrapper<T> implements Exporter<T> {
/**
* 原 Invoker 对象
*/
private final Invoker<T> originInvoker;
/**
* 暴露的 Exporter 对象
*/
private Exporter<T> exporter;
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
this.originInvoker = originInvoker;
}
public Invoker<T> getOriginInvoker() {
return originInvoker;
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
// 可以重新设置 Exporter 对象
public void setExporter(Exporter<T> exporter) {
this.exporter = exporter;
}
@Override
public void unexport() {
String key = getCacheKey(this.originInvoker);
// 移除出 `bounds`
bounds.remove(key);
// 取消暴露
exporter.unexport();
}
}
- Exporter 代理原始的对应关系,建立
Invoker与Protocol#export(Invoker invoker)方法返回的 Exporter 的对应关系。 - 在配置规则发生变化时,可调用
#setExporter(Exporter)方法,修改对应关系。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》。
4.3 DubboExporter
com.alibaba.dubbo.rpc.protocol.dubbo.DubboExporter,实现 AbstractExporter 抽象类,Dubbo Exporter 实现类。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DubboExporter<T> extends AbstractExporter<T> {
/**
* 服务键
*/
private final String key;
/**
* Exporter 集合
*
* key: 服务键
*
* 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
*/
private final Map<String, Exporter<?>> exporterMap;
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
}
@Override
public void unexport() {
// 取消暴露
super.unexport();
// 移除
exporterMap.remove(key);
}
}
key属性,服务键。#exporterMap属性,Exporter 集合。在上文DubboProtocol#export(invoker)方法中,我们可以看到,该属性就是AbstractProtocol.exporterMap属性。- 构造方法发起,暴露,将自己添加到
exporterMap中。 #unexport()取消方法,暴露,将自己移除出exporterMap中。
- 构造方法发起,暴露,将自己添加到
666. 彩蛋
写的有点神情恍惚。
一度以为自己在老家。
夜,02:02 ,睡觉….




