注册中心(一)之抽象API
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
在 《精尽 Dubbo 源码分析 —— 项目结构一览》「3.5 dubbo-registry」 中,对 dubbo-registry注册中心这个大模块做了大体的介绍。那么从本文开始,分享注册中心的代码实现。
本文分享 dubbo-registry-api 模块,注册中心的抽象 API ,类结构如下图:
整体比较易懂,笔者在这里先不介绍,胖友可以看完本文,回过头看看,自己是不是理解了?!
下面,我们按照从左到右的顺序,逐个分享。
2. RegistryFactory
com.alibaba.dubbo.registry.RegistryFactory ,注册中心工厂接口,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SPI("dubbo")
public interface RegistryFactory {
/**
* 连接注册中心.
* <p>
* 连接注册中心需处理契约:
* 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。
* 2. 支持URL上的username:password权限认证。
* 3. 支持backup=10.20.153.10备选注册中心集群地址。
* 4. 支持file=registry.cache本地磁盘文件缓存。
* 5. 支持timeout=1000请求超时设置。
* 6. 支持session=60000会话超时或过期设置。
*
* @param url 注册中心地址,不允许为空
* @return 注册中心引用,总不返回空
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
- RegistryFactory 是一个 Dubbo SPI 拓展接口。
- #getRegistry(url) 方法,获得注册中心 Registry 对象。
- 注意方法上注释的处理契约 。
- @Adaptive({“protocol”}) 注解,Dubbo SPI 会自动实现 RegistryFactory$Adaptive 类,根据 url.protocol 获得对应的 RegistryFactory 实现类。例如, url.protocol = zookeeper 时,获得 ZookeeperRegistryFactory 实现类。
2.1 AbstractRegistryFactory
com.alibaba.dubbo.registry.support.AbstractRegistryFactory ,实现 RegistryFactory 接口,RegistryFactory 抽象类,实现了 Registry 的容器管理。
2.1.1 属性
1
2
3
4
5
6
7
8
9
// The lock for the acquisition process of the registry
private static final ReentrantLock LOCK = new ReentrantLock();
/**
* Registry 集合
*
* key:{@link URL#toServiceString()}
*/
// Registry Collection Map<RegistryAddress, Registry>
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
- REGISTRIES 静态属性,Registry 集合。
- LOCK 静态属性,锁,用于 #destroyAll() 和 #getRegistry(url) 方法,对 REGISTRIES 访问的竞争。
2.1.2 createRegistry
#createRegistry(url)抽象方法,创建 Registry 对象。代码如下:
1
2
3
4
5
6
7
/**
* 创建 Registry 对象
*
* @param url 注册中心地址
* @return Registry 对象
*/
protected abstract Registry createRegistry(URL url);
子类实现该方法,创建其对应的 Registry 实现类。例如,ZookeeperRegistryFactory 的该方法,创建 ZookeeperRegistry 对象。
2.1.3 getRegistry
#getRegistry(url)实现方法,获得注册中心 Registry 对象。优先从缓存中获取,否则进行创建。
- 实现比较易懂,点击链接查看,有代码注释。
2.1.4 destroyAll
#destroyAll() 方法,销毁所有 Registry 对象。
- 实现比较易懂,点击链接查看,有代码注释。
3. RegistryService
com.alibaba.dubbo.registry.RegistryService ,注册中心服务接口,定义了注册、订阅、查询三种操作方法,如下:
- #register(url)注册 方法, 数据,比如:提供者地址,消费者地址,路由规则,覆盖规则,等数据。
- #unregister(url) 方法,取消注册。
- #subscribe(url, NotifyListener)订阅 方法, 符合条件的已注册数据,当有注册数据变更时自动推送。
- #lookup(url)查询 方法, 符合条件的已注册数据,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
ps:注意方法上注释的处理契约。
3.1 Registry
com.alibaba.dubbo.registry.Registry ,注册中心接口。Registry 继承了:
- RegistryService 接口,拥有拥有注册、订阅、查询三种操作方法。
- com.alibaba.dubbo.common.Node 接口,拥有节点相关的方法。
3.2 AbstractRegistry
com.alibaba.dubbo.registry.support.AbstractRegistry ,实现 Registry 接口,Registry 抽象类,实现了如下方法:
- 通用的注册、订阅、查询、通知等方法。
- 持久化注册数据到文件,以 properties 格式存储。应用于,重启时,无法从注册中心加载服务提供者列表等信息时,从该文件中读取。
3.2.1 属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// URL地址分隔符,用于文件缓存中,服务提供者URL分隔
// URL address separator, used in file cache, service provider URL separation
private static final char URL_SEPARATOR = ' ';
// URL地址分隔正则表达式,用于解析文件缓存中服务提供者URL列表
// URL address separated regular expression for parsing the service provider URL list in the file cache
private static final String URL_SPLIT = "\\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
/**
* 本地磁盘缓存。
*
* 1. 其中特殊的 key 值 .registies 记录注册中心列表
* 2. 其它均为 {@link #notified} 服务提供者列表
*/
// Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers
private final Properties properties = new Properties();
/**
* 注册中心缓存写入执行器。
*
* 线程数=1
*/
// File cache timing writing
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
/**
* 是否同步保存文件
*/
// Is it synchronized to save the file
private final boolean syncSaveFile;
/**
* 数据版本号
*
* {@link #properties}
*/
private final AtomicLong lastCacheChanged = new AtomicLong();
/**
* 已注册 URL 集合。
*
* 注意,注册的 URL 不仅仅可以是服务提供者的,也可以是服务消费者的
*/
private final Set<URL> registered = new ConcurrentHashSet<URL>();
/**
* 订阅 URL 的监听器集合
*
* key:消费者的 URL ,例如消费者的 URL
*/
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 被通知的 URL 集合
*
* key1:消费者的 URL ,例如消费者的 URL ,和 {@link #subscribed} 的键一致
* key2:分类,例如:providers、consumers、routes、configurators。【实际无 consumers ,因为消费者不会去订阅另外的消费者的列表】
* 在 {@link Constants} 中,以 "_CATEGORY" 结尾
*/
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
/**
* 注册中心 URL
*/
private URL registryUrl;
/**
* 本地磁盘缓存文件,缓存注册中心的数据
*/
// Local disk cache file
private File file;
/**
* 是否销毁
*/
private AtomicBoolean destroyed = new AtomicBoolean(false);
public AbstractRegistry(URL url) {
setUrl(url);
// Start file save timer
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 获得 `file`
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// 加载本地磁盘缓存文件到内存缓存
loadProperties();
// 通知监听器,URL 变化结果
notify(url.getBackupUrls());
}
- file见代码注释 属性, 。
- properties见代码注释 属性, 。
- syncSaveFile 属性, properties 发生变更时候,是同步还是异步写入 file 。
- registryCacheExecutor见代码注释 属性, 。
- lastCacheChanged见代码注释 属性, 。
- 因为每次写入 file 是全量,而不是增量写入,通过版本号,避免老版本覆盖新版本。
- registered见代码注释 属性, 。
- subscribed见代码注释 属性, 。
- notified见代码注释 属性, 。
- 从数据上,和 分类 properties 比较相似。笔者认为有两点差异:1)数据格式上, notified 根据 做了聚合;2)不从 file 中读取,都是从注册中心读取的数据。
- registryUrl见代码注释 属性, 。
- destroyed见代码注释 属性, 。
- 构造方法见代码注释 , 。
- 第 87 行:调用 #loadProperties() 方法,加载本地磁盘缓存文件到内存缓存。
- 代码比较简单,点击链接查看。
- 第 89 行:// 【TODO 8020】为什么构造方法,要通知,连监听器都没注册
- 第 87 行:调用 #loadProperties() 方法,加载本地磁盘缓存文件到内存缓存。
3.2.2 register && unregister
- #register(url)
- 从实现上,我们可以看出,并未向注册中心发起注册,仅仅是添加到 registered 中,进行状态的维护。实际上,真正的实现在 FailbackRegistry 类中。
- #unregister(url)
- 和 处理方式 #register(url) 的 相同。
3.2.3 subscribe && unsubscribe
- #subscribe(url, listener)
- 和 处理方式 #register(url) 的 相同。
- #unsubscribe(url, listener)
- 和 处理方式 #register(url) 的 相同。
3.2.4 notify
#notify(url, listener, urls) 方法,通知监听器,URL 变化结果。这里我们有两点要注意下:
- 第一,向注册中心发起订阅后,会获取到全量 数据,此时会被调用 #notify(…) 方法,即 Registry 获取到了全量数据。
- 第二,每次注册中心发生变更时,会调用 增量全量 #notify(…) 方法,虽然变化是 ,调用这个方法的调用方,已经进行处理,传入的 urls 依然是 的。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 通知监听器,URL 变化结果。
*
* 数据流向 `urls` => {@link #notified} => {@link #properties} => {@link #file}
*
* @param url 消费者 URL
* @param listener 监听器
* @param urls 通知的 URL 变化结果(全量数据)
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// 将 `urls` 按照 `url.parameter.category` 分类,添加到集合
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
// 获得消费者 URL 对应的在 `notified` 中,通知的 URL 变化结果(全量数据)
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
// 处理通知的 URL 变化结果(全量数据)
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 覆盖到 `notified`
// 当某个分类的数据为空时,会依然有 urls。其中 `urls[0].protocol = empty`,通过这样的方式,处理所有服务提供者为空的情况。
categoryNotified.put(category, categoryList);
// 保存到文件
saveProperties(url);
// 通知监听器
listener.notify(categoryList);
}
}
- 第 25 至 37 行:将 urls 按照 url.parameter.category 分类,添加到集合 result 中。
- 第 28 行:TODO 芋艿
- 这里有一点要注意,每次传入的 全量一个分类 urls 的” “,指的是至少要是 的全量,而不一定是全部数据。
- 第 41 至 46 行:获得消费者 URL 对应的在 notified 中的数据。
- 第 47 至 58 行:按照分类 ,循环处理通知的 URL 变化结果(全量数据)。
- 第 51 至 53 行:将 所有服务提供者为空 result 覆盖到 notified 中。这里又有一点需要注意,当某个分类的数据为空时,会依然有 urls 。其中 urls[0].protocol = empty ,通过这样的方式,处理 的情况。
- 第 55 行:调用 #saveProperties(url) 方法,保存到文件。
- 代码比较简单,点击链接查看。
- 第 57 行:调用 NotifyListener#notify(urls) 方法,通知监听器处理。例如,有新的服务提供者启动时,被通知,创建新的 Invoker 对象。
3.2.5 recover
- #recover()
- 和 处理方式 #register(url) 的 相同。
在注册中心断开,重连成功,调用 #recover() 方法,进行恢复注册和订阅。
3.2.6 destroy
- #destroy()
- 和 处理方式 #register(url) 的 相同。
在 JVM 关闭时,调用 #destroy() 方法,进行取消注册和订阅。
3.3 FailbackRegistry
com.alibaba.dubbo.registry.support.FailbackRegistry ,实现 AbstractRegistry 抽象类,支持失败重试的 Registry 抽象类。
在上文中的代码中,我们可以看到,AbstractRegistry 进行的注册、订阅等操作,更多的是修改状态,而无和注册中心实际的操作。FailbackRegistry 在 AbstractRegistry 的基础上,实现了和注册中心实际的操作,并且支持失败重试的特性。
3.3.1 属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 定时任务执行器
*/
// Scheduled executor service
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
/**
* 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
*/
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
private final ScheduledFuture<?> retryFuture;
/**
* 失败发起注册失败的 URL 集合
*/
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
/**
* 失败取消注册失败的 URL 集合
*/
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
/**
* 失败发起订阅失败的监听器集合
*/
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 失败取消订阅失败的监听器集合
*/
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
/**
* 失败通知通知的 URL 集合
*/
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/**
* 是否销毁
*/
private AtomicBoolean destroyed = new AtomicBoolean(false);
public FailbackRegistry(URL url) {
super(url);
// 重试频率,单位:毫秒
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 创建失败重试定时器
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
- retryExecutor见代码注释 属性, 。
- retryFuture见代码注释 属性, 。
- 第 41 至 51 行,在构造方法中创建该定时器,在其 #run() 方法中,会调用 #retry() 方法,进行重试。
- failedXXX见代码注释 属性, 。
- 每种操作都有一个记录失败的集合。
- destroyed见代码注释 属性, 。
3.3.2 register && unregister
代码比较易懂,点击链接查看。
3.3.3 subscribe && unsubscribe
代码比较易懂,点击链接查看。
3.3.4 notify
代码比较易懂,点击链接查看。
3.3.5 recover
- #recover()完全覆盖父类方法 方法, ( 即不像前面几个方法,会调用父类的方法 ),将需要注册和订阅的 URL 添加到 failedRegistered failedSubscribed 属性中。这样,在 #retry() 方法中,会重试进行连接。
代码比较易懂,点击链接查看。
3.3.6 retry
- #retry() 方法,遍历五个 failedXXX 属性,重试对应的操作。
代码比较易懂,点击链接查看。
3.3.7 destroy
- #destroy() 方法,取消注册和订阅,并关闭定时器。
代码比较易懂,点击链接查看。
4. NotifyListener
com.alibaba.dubbo.registry.NotifyListener ,通知监听器。当收到服务变更通知时触发,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface NotifyListener {
/**
* 当收到服务变更通知时触发。
* <p>
* 通知需处理契约:
* 1. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。
* 2. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。
* 3. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routers, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。
* 4. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。
* 5. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。
*
* @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
*/
void notify(List<URL> urls);
}
- 注意看方法上的注释,特别是全量分类为空顺序 、 、 、 。
NotifyListener 的子类如下图:
5. ProviderConsumerRegTable
com.alibaba.dubbo.registry.support.ProviderConsumerRegTable ,服务提供者和消费者注册表,存储 JVM 进程内自己的服务提供者和消费者的 Invoker 。
该信息用于 Dubbo QOS 使用,例如将 JVM 进程中,自己的服务提供者下线,又或者查询自己的服务提供者和消费者列表。
- 《Dubbo 用户指南 —— 在线运维命令 - QOS》
- 后续会有文章分享 QOS ,本文不多啰嗦。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ProviderConsumerRegTable {
/**
* 服务提供者 Invoker 集合
*
* key:服务提供者 URL 服务键
*/
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
/**
* 服务消费者 Invoker 集合
*
* key:服务消费者 URL 服务键
*/
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
// .... 省略方法
}
- 如下方法,已经添加代码注释,胖友点击查看。
- 服务提供者
- #registerProvider(invoker, registryUrl, providerUrl) 静态方法,注册 Provider Invoker 。
- #getProviderInvoker(serviceUniqueName) 静态静态,获得指定服务键的 Provider Invoker 集合。
- #getProviderWrapper(invoker) 静态方法,获得服务提供者对应的 Invoker Wrapper 对象。
- 服务消费者
- #registerConsumer(invoker, registryUrl, consumerUrl, registryDirectory) 静态方法,注册 Consumer Invoker 。
- #getConsumerInvoker(serviceUniqueName) 静态方法,获得指定服务键的 Consumer Invoker 集合。
5.1 ProviderInvokerWrapper
com.alibaba.dubbo.registry.support.ProviderInvokerWrapper ,实现 Invoker 接口,服务提供者 Invoker Wrapper ,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Invoker 对象
*/
private Invoker<T> invoker;
/**
* 原始 URL
*/
private URL originUrl;
/**
* 注册中心 URL
*/
private URL registryUrl;
/**
* 服务提供者 URL
*/
private URL providerUrl;
/**
* 是否注册
*/
private volatile boolean isReg;
// ... 省略方法
- 相比纯粹的 Invoker 对象,又多了运维命令需要的属性。例如 状态下线服务命令com.alibaba.dubbo.qos.command.impl.Offlinecom.alibaba.dubbo.qos.command.impl.Online isReg 属性,可以在使用 后,标记为 false 。想提前深入了解的胖友,可以看下 和 类。
5.2 ConsumerInvokerWrapper
com.alibaba.dubbo.registry.support.ConsumerInvokerWrapper ,实现 Invoker 接口,服务消费者 Invoker Wrapper ,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Invoker 对象
*/
private Invoker<T> invoker;
/**
* 原始 URL
*/
private URL originUrl;
/**
* 注册中心 URL
*/
private URL registryUrl;
/**
* 消费者 URL
*/
private URL consumerUrl;
/**
* 注册中心 Directory
*/
private RegistryDirectory registryDirectory;
- 相比纯粹的 Invoker 对象,又多了运维命令需要的属性。例如 列出消费者和提供者命令可调用com.alibaba.dubbo.qos.command.impl.Ls registryDirectory 属性,可以在使用 后,输出可消费者 的服务提供者数量 。想提前深入了解的胖友,可以看下 类。
5. integration
不同于上面我们看到的代码,integration 包下是对其他 Dubbo 模块的集成:
- RegistryProtocol ,对 dubbo-rpc-api 的依赖集成。
- RegistryDirectory ,对 dubbo-cluster 的依赖集成。
考虑到超出了本文的范畴,后面涉及到时,单独分享。
666. 彩蛋
对注册中心的使用,不熟悉的胖友,可能理解起来会有点懵。嘿嘿,仿佛为自己写的差,找了一个理由。哈哈哈。
下一篇,我们来结合 Zookeeper ,进一步理解。
另外,胖友也可以多多调试噢。


