注册中心(二)之Zookeeper
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
前置阅读文章:
在《注册中心(一)之抽象 API》 中,我们分享的那是相当抽象。因此,在本文中,我们会分享 Dubbo 使用 Zookeeper 作为注册中心的代码,同时也会分享服务暴露和引用时,对注册中心的使用。
下面,我们先来看下 《Dubbo 用户指南 —— zookeeper 注册中心》 文档,内容如下:
流程
流程说明:
- 服务提供者 启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
- 服务消费者 启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
- 监控中心 启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。
- 在图中,我们可以看到 Zookeeper 的节点层级,自上而下是:
- Root 层:根目录,可通过 的 “group” 设置 Zookeeper 的根节点,缺省使用 “dubbo” 。
- Service 层:服务接口全名。
- Type“routes”“configurations” 层:分类。目前除了我们在图中看到的 “providers” ( 服务提供者列表 ) “consumers” ( 服务消费者列表 ) 外,还有 ( 路由规则列表 ) 和 ( 配置规则列表 )。
- URL 层:URL ,根据不同 Type 目录,下面可以是服务提供者 URL 、服务消费者 URL 、路由规则 URL 、配置规则 URL 。
- 实际上 URL 上带有 Type “category” 参数,已经能判断每个 URL 的分类,但是 Zookeeper 是基于节点目录订阅的,所以增加了 层。
- 实际上,服务消费者 启动后,不仅仅订阅了 “providers” 分类,也订阅了 “routes” “configurations” 分类。
2. ZookeeperRegistryFactory
com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory ,实现 AbstractRegistryFactory 抽象类,Zookeeper Registry 工厂。代码如下:
```plain text plain public class ZookeeperRegistryFactory extends AbstractRegistryFactory { /** * Zookeeper 工厂 */ private ZookeeperTransporter zookeeperTransporter; /** * 设置 Zookeeper 工厂 * * 该方法,通过 Dubbo SPI 注入 * * @param zookeeperTransporter Zookeeper 工厂对象 */ public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } @Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } }
1
2
3
4
5
6
7
8
9
10
11
---
# 3. ZookeeperRegistry
[com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry](https://github.com/YunaiV/dubbo/blob/66a1e1b0ef4b01175be148d27fdcf519f4f01b15/dubbo-registry/dubbo-registry-zookeeper/src/main/java/com/alibaba/dubbo/registry/zookeeper/ZookeeperRegistry.java) ,实现 FailbackRegistry 抽象类,Zookeeper Registry 。
## 3.1 属性 + 构造方法
```plain text
plain 1: /** 2: * 默认端口 3: */ 4: private final static int DEFAULT_ZOOKEEPER_PORT = 2181; 5: /** 6: * 默认 Zookeeper 根节点 7: */ 8: private final static String DEFAULT_ROOT = "dubbo"; 9: 10: /** 11: * Zookeeper 根节点 12: */ 13: private final String root; 14: /** 15: * Service 接口全名集合 16: */ 17: private final Set<String> anyServices = new ConcurrentHashSet<String>(); 18: /** 19: * 监听器集合 20: */ 21: private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>(); 22: /** 23: * Zookeeper 客户端 24: */ 25: private final ZookeeperClient zkClient; 26: 27: public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 28: super(url); 29: if (url.isAnyHost()) { 30: throw new IllegalStateException("registry address == null"); 31: } 32: // 获得 Zookeeper 根节点 33: String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 参数值 34: if (!group.startsWith(Constants.PATH_SEPARATOR)) { 35: group = Constants.PATH_SEPARATOR + group; 36: } 37: this.root = group; 38: // 创建 Zookeeper Client 39: zkClient = zookeeperTransporter.connect(url); 40: // 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。 41: zkClient.addStateListener(new StateListener() { 42: public void stateChanged(int state) { 43: if (state == RECONNECTED) { 44: try { 45: recover(); 46: } catch (Exception e) { 47: logger.error(e.getMessage(), e); 48: } 49: } 50: } 51: }); 52: }
- rootRoot 属性,Zookeeper 根节点,即首图的 层。
- anyServices集合整个Service动态 属性,Service 接口接口全名 。该属性适可用于监控中心,订阅 层。因为,Service 层是 的,可以有不断有新的 Service 服务发布(注意,不是服务实例)。在 #doSubscribe(url, notifyListener) 方法中,会更容易理解。
- zkListeners 属性,监听器集合,建立 NotifyListener 和 ChildListener 的映射关系。
- zkClient 属性,Zookeeper 客户端。
- 构造方法
- 第 28 至 31 行:设置注册中心的 URL 。
- 第 32 至 37 行:设置在 Zookeeper 的根节点,缺省使用 DEFAULT_ROOT 。
- 第 39 行:调用 ZookeeperTransporter#connect(url) 方法,基于 Dubbo SPI Adaptive 机制,根据 url 参数,加载对应的 ZookeeperTransporter 实现类,创建对应的 ZookeeperClient 实现类的对应。
- 第 41 至 51 行:添加 StateListener 对象到 ZookeeperClient 对象中。该监听器,在重连时,在第 45 行的代码,调用 #recover() 方法,进行恢复逻辑,重新发起注册和订阅。
3.2 doRegister
```plain text plain 1: @Override 2: protected void doRegister(URL url) { 3: try { 4: zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 5: } catch (Throwable e) { 6: throw new RpcException(“Failed to register “ + url + “ to zookeeper “ + getUrl() + “, cause: “ + e.getMessage(), e); 7: } 8: }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
---
- 第 4 行:调用
#toUrlPath(url)
方法,获得 URL 的路径。
- 第 4 行:**持久数据**
url.parameters.dynamic
,是否动态数据。若为 false ,该数据为
,当注册方退出时,数据依然保存在注册中心。
- 第 4 行:调用 **URL 层**
ZookeeperClient#create(url, ephemeral)
方法,创建 URL 节点,即我们在首图看到的
。
### 3.2.1 toUrlPath
```plain text
plain /** * 获得 URL 的路径 * * Root + Service + Type + URL * * 被 {@link #doRegister(URL)} 和 {@link #doUnregister(URL)} 调用 * * @param url URL * @return 路径 */ private String toUrlPath(URL url) { return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); }
3.2.2 toCategoryPath
```plain text plain /** * 获得分类路径 * * Root + Service + Type * * @param url URL * @return 分类路径 */ private String toCategoryPath(URL url) { return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); }
1
2
3
4
5
6
7
---
### 3.2.3 toServicePath
```plain text
plain /** * 获得服务路径 * * Root + Type * * @param url URL * @return 服务路径 */ private String toServicePath(URL url) { String name = url.getServiceInterface(); if (Constants.ANY_VALUE.equals(name)) { return toRootPath(); } return toRootDir() + URL.encode(name); }
3.2.4 toRootDir
```plain text plain /** * 获得根目录 * * Root * * @return 路径 */ private String toRootDir() { if (root.equals(Constants.PATH_SEPARATOR)) { return root; } return root + Constants.PATH_SEPARATOR; } /** * Root * * @return 根路径 */ private String toRootPath() { return root; }
1
2
3
4
5
6
7
---
## 3.3 doUnregister
```plain text
plain @Override protected void doUnregister(URL url) { try { zkClient.delete(toUrlPath(url)); } catch (Throwable e) { throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
3.4 doSubscribe
```plain text plain 1: @Override 2: protected void doSubscribe(final URL url, final NotifyListener listener) { 3: try { 4: // 处理所有 Service 层的发起订阅,例如监控中心的订阅 5: if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { 6: String root = toRootPath(); 7: // 获得 url 对应的监听器集合 8: ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); 9: if (listeners == null) { // 不存在,进行创建 10: zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); 11: listeners = zkListeners.get(url); 12: } 13: // 获得 ChildListener 对象 14: ChildListener zkListener = listeners.get(listener); 15: if (zkListener == null) { // 不存在 ChildListener 对象,进行创建 ChildListener 对象 16: listeners.putIfAbsent(listener, new ChildListener() { 17: public void childChanged(String parentPath, List
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
---
- 整个方法分成两部分,分别:
- ============ 第二部分【第 44 至 78 行】 ============
- 处理**指定服务消费者**
Service 层的发起订阅,例如
的订阅。
- 第 47 行:子节点数据数组,即 **Service 层所有 URL**
下的
。
- 第 49 行:循环分类数组。其中,调用
#toCategoriesPath(url)
方法,获得 分类数组。
- 第 51 至 55 行:获得订阅的
url
对应的监听器集合。
- 第 56 至 66 行:获得 **在 URL 层发生变更时新的服务提供者实例**
listener
( NotifyListener ) 对应的 ChildListener 对象。
,会调用
NotifyListener#notify(url, listener, currentChilds)
方法,回调 NotifyListener 的逻辑。酱紫,如果 Service 下增加
( 新的 URL ),服务消费者可创建新的 Invoker 对象,用于调用该服务提供者。
- 第 68 行:创建 **Type持久**
节点。该节点为
节点。
- 第 70 行:向 Zookeeper 的 **Path**
节点,发起订阅。
- 第 72 至 74 行:添加到
urls
中。
- 第 77 行:**首次全量数据获取完成时**
,调用
NotifyListener#notify(url, listener, currentChilds)
方法,回调 NotifyListener 的逻辑。酱紫,服务消费者可创建所有的 Invoker 对象,用于调用服务提供者们。
- 回看【第 77 行】和【第 62 行】,全量 + 增量,仔细理解下。
- ============ 第一部分【第 5 至 43 行】 ============
- 处理**所有监控中心**
Service 层的发起订阅,例如
的订阅
- 第 8 至 12 行:获得订阅的
url
对应的监听器集合。
- 第 13 至 30 行:获得 **在 Service 层发生变更时**
listener
( NotifyListener ) 对应的 ChildListener 对象。
,若是新增 Service 接口全名时(即新增服务),调用
#subscribe(url, listener)
方法,发起该 Service 层的订阅(【第 45 至 78 行】的逻辑)。是否是新增的服务,通过
anyServices
属性来判断。
- 第 32 行:创建 **Service持久**
节点。该节点为
节点。
- 第 34 行:向 Zookeeper 的 **Service**
节点,发起订阅。
- 第 36 至 43 行:**首次全量数据获取完成时**
,循环 Service 接口全名数组,调用
#subscribe(url, listener)
方法,发起该 Service 层的订阅(【第 45 至 78 行】的逻辑)。
友情提示:如果觉得比较绕,或者笔者讲的不清晰,胖友可以进行调试理解。
### 3.4.1 toCategoriesPath
```plain text
plain /** * 获得分类路径数组 * * Root + Service + Type * * @param url URL * @return 分类路径数组 */ private String[] toCategoriesPath(URL url) { // 获得分类数组 String[] categories; if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { // * 时, categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY}; } else { categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY}); } // 获得分类路径数组 String[] paths = new String[categories.length]; for (int i = 0; i < categories.length; i++) { paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i]; } return paths; }
3.4.2 toUrlsWithEmpty
```plain text plain /** * 获得 providers 中,和 consumer 匹配的 URL 数组 * * 若不存在匹配,则创建 empty:// 的 URL返回。通过这样的方式,可以处理类似服务提供者为空的情况。 * * @param consumer 用于匹配 URL * @param path 被匹配的 URL 的字符串 * @param providers 匹配的 URL 数组 * @return 匹配的 URL 数组 */ private List
1
2
3
4
5
6
7
8
---
- #toUrlsWithoutEmpty()
方法,代码如下:
```plain text
plain /** * 获得 providers 中,和 consumer 匹配的 URL 数组 * * @param consumer 用于匹配 URL * @param providers 被匹配的 URL 的字符串 * @return 匹配的 URL 数组 */ private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) { List<URL> urls = new ArrayList<URL>(); if (providers != null && !providers.isEmpty()) { for (String provider : providers) { provider = URL.decode(provider); if (provider.contains("://")) { // 是 url URL url = URL.valueOf(provider); // 将字符串转化成 URL if (UrlUtils.isMatch(consumer, url)) { // 匹配 urls.add(url); } } } } return urls; }
3.5 doUnsubscribe
```plain text plain @Override protected void doUnsubscribe(URL url, NotifyListener listener) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners != null) { ChildListener zkListener = listeners.get(listener); if (zkListener != null) { // 向 Zookeeper ,移除订阅 zkClient.removeChildListener(toUrlPath(url), zkListener); } } }
1
2
3
4
5
6
7
---
## 3.6 lookup
```plain text
plain /** * 查询符合条件的已注册数据,与订阅的推模式相对应,这里为拉模式,只返回一次结果。 * * @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @return 已注册信息列表,可能为空,含义同{@link com.alibaba.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。 * @see com.alibaba.dubbo.registry.NotifyListener#notify(List) */ @Override public List<URL> lookup(URL url) { if (url == null) { throw new IllegalArgumentException("lookup url == null"); } try { // 循环分类数组,获得所有的 URL 数组 List<String> providers = new ArrayList<String>(); for (String path : toCategoriesPath(url)) { List<String> children = zkClient.getChildren(path); if (children != null) { providers.addAll(children); } } // 匹配 return toUrlsWithoutEmpty(url, providers); } catch (Throwable e) { throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
3.7 isAvailable
```plain text plain @Override public boolean isAvailable() { return zkClient.isConnected(); }
1
2
3
4
5
6
7
---
## 3.8 destroy
```plain text
plain @Override public void destroy() { super.destroy(); try { zkClient.close(); } catch (Exception e) { logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e); } }
4. 调用
4.1 服务提供者
回头看 《精尽 Dubbo 源码分析 —— 服务暴露(二)之远程暴露(Dubbo)》 的 「3.2.2 export」 小节,我们可以看到:
- 第 14 行:调用 #register(registryUrl, registedProviderUrl) 方法,向注册中心注册服务提供者(自己)。代码如下:
plain text plain public void register(URL registryUrl, URL registedProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); }
4.2 服务消费者
回头看 《精尽 Dubbo 源码分析 —— 服务引用(二)之远程引用(Dubbo)》 的 「3.2.2 doRefer 小节,我们可以看到:
- 第 20 至 25 行:调用 自己 RegistryService#register(url) 方法,向注册中心注册 (服务消费者)。
- 第 35 行:调用 Directory#subscribe(url) 方法,向注册中心订阅服务提供者 + 路由规则 + 配置规则。
- 在该方法中,会循环获得到的服务体用这列表,调用 Protocol#refer(type, url) 方法,创建每个调用服务的 Invoker 对象。
666. 彩蛋
知识星球
嘿嘿,写完 Zookeeper 作为注册中心是否清晰了一些?!

