注册中心(三)之Redis
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
前置阅读文章:
我们先来看下 《Dubbo 用户指南 —— Redis 注册中心》 文档,内容如下:
基于 Redis 实现的注册中心。
使用 Redis 的 Key/Map 结构存储数据结构:
- 主 Key 为服务名和类型
- Map 中的 Key 为 URL 地址
- Map 中的 Value 为过期时间,用于判断脏数据,脏数据由监控中心删除
横向来看,和基于 Zookeeper 实现的注册中心,也是分成 Root、Service、Type、URL 四层。
- 使用 Redis Map 的数据结构,聚合相同服务和类型 (Root + Service + Type)。
- 不使用 Redis 的自动过期机制,而是通过监控中心,实现过期机制。因为,Redis Key 自动过期时,不存在相应的事件通知。
- 服务提供者和消费者,定时延长其注册的 URL 地址的过期时间。
使用 Redis 的 Publish/Subscribe 事件通知数据变更:
- 通过事件的值区分事件类型:
register、unregister - 普通消费者直接订阅指定服务提供者的 Key,只会收到指定服务的变更事件
- 监控中心通过 psubscribe 功能订阅
/dubbo/*,会收到所有服务的变更事件 - 服务实例的启动或关闭,会写入或删除对应的 Redis Map 中,并发起对应的
register、unregister事件,从而保证实时性。 - 通过监控中心,轮询 Key 过期,保证最终一致性。未正常关闭的服务实例的 URL 的删除,并发起对应的
unregister事件。
调用过程:
【一】服务提供方
- 1、服务提供方启动时,向
Key:/dubbo/com.foo.BarService/providers下,添加当前提供者的地址 - 2、并向
Channel:/dubbo/com.foo.BarService/providers发送register事件
【二】服务消费方
- 3、服务消费方启动时,从
Channel:/dubbo/com.foo.BarService/providers订阅register和unregister事件 - 4、并向
Key:/dubbo/com.foo.BarService/providers下,添加当前消费者的地址
服务消费方收到 register 和 unregister 事件后,从 Key:/dubbo/com.foo.BarService/providers 下获取提供者地址列表。
【三】服务监控中心
- 5、服务监控中心启动时,从
Channel:/dubbo/*订阅register和unregister,以及subscribe和unsubscribe事件 - 6、服务监控中心收到
register和unregister事件后,从Key:/dubbo/com.foo.BarService/providers下获取提供者地址列表 - 7、服务监控中心收到
subscribe和unsubscribe事件后,从Key:/dubbo/com.foo.BarService/consumers下获取消费者地址列表
本文涉及仅有 RedisRegistry 一个类,类图如下:
2. RedisRegistry
com.alibaba.dubbo.registry.redis.RedisRegistry,实现 FailbackRegistry 抽象类,基于 Redis 实现的注册中心实现类。
2.1 构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/**
* 默认端口
*/
private static final int DEFAULT_REDIS_PORT = 6379;
/**
* 默认 Redis 根节点
*/
private final static String DEFAULT_ROOT = "dubbo";
/**
* Redis Key 过期机制执行器
*/
private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));
/**
* Redis Key 过期机制 Future
*/
private final ScheduledFuture<?> expireFuture;
/**
* Redis 根节点
*/
private final String root;
/**
* JedisPool 集合
*
* key:ip:port
*/
private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>();
/**
* 通知器集合
*
* key:Root + Service,例如 `/dubbo/com.alibaba.dubbo.demo.DemoService`
*/
private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();
/**
* 重连周期,单位:毫秒
*/
private final int reconnectPeriod;
/**
* 过期周期,单位:毫秒
*/
private final int expirePeriod;
/**
* 是否监控中心
*
* 用于判断脏数据,脏数据由监控中心删除 {@link #clean(Jedis)}
*/
private volatile boolean admin = false;
/**
* 是否复制模式
*/
private boolean replicate;
public RedisRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 创建 GenericObjectPoolConfig 对象
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
if (url.getParameter("max.idle", 0) > 0)
config.setMaxIdle(url.getParameter("max.idle", 0));
if (url.getParameter("min.idle", 0) > 0)
config.setMinIdle(url.getParameter("min.idle", 0));
if (url.getParameter("max.active", 0) > 0)
config.setMaxTotal(url.getParameter("max.active", 0));
if (url.getParameter("max.total", 0) > 0)
config.setMaxTotal(url.getParameter("max.total", 0));
if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0)
config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
if (url.getParameter("num.tests.per.eviction.run", 0) > 0)
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
// 是否复制模式
String cluster = url.getParameter("cluster", "failover");
if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster);
// 解析
List<String> addresses = new ArrayList<String>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (backups != null && backups.length > 0) {
addresses.addAll(Arrays.asList(backups));
}
// 创建 JedisPool 对象
String password = url.getPassword();
for (String address : addresses) {
int i = address.indexOf(':');
String host;
int port;
if (i > 0) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = DEFAULT_REDIS_PORT;
}
if (StringUtils.isEmpty(password)) { // 无密码连接
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)));
} else { // 有密码连接
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), password));
}
}
// 解析重连周期
this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
// 获得 Redis 根节点
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) { // 头 `/`
group = Constants.PATH_SEPARATOR + group;
}
if (!group.endsWith(Constants.PATH_SEPARATOR)) { // 尾 `/`
group = group + Constants.PATH_SEPARATOR;
}
this.root = group;
// 创建实现 Redis Key 过期机制的任务
this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
deferExpired(); // Extend the expiration time
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
}
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}
jedisPools属性,JedisPool 集合,其中键为ip:port。在【第 64 至 84 行】和【第 93 至 99 行】和【第 101 至 121 行】初始化。root属性,Redis 根节点,即首图的 Root 层。在【第 126 至 134 行】初始化。replicate属性,是否复制模式。在【第 86 至 90 行】文档说明如下:
可通过 <dubbo:registry cluster="failover"/> 设置 redis 集群策略,缺省为 failover:
failover:只写入和读取任意一台,失败时重试另一台,需要服务器端自行配置数据同步。replicate:在客户端同时写入所有服务器,只读取单台,服务器端不需要同步,注册中心集群增大,性能压力也会更大。notifiers属性,通知器集合,其中键为 Root + Service。Notifier,用于 Redis Publish/Subscribe 机制中的订阅,实时监听数据的变化。reconnectPeriod属性,重连周期,单位:毫秒。在【第 124 行】初始化。用于订阅发生 Redis 连接异常时,Notifiersleep,等待重连上。expireExecutor属性,Redis Key 过期机制执行器。expirePeriod属性,Redis Key 过期周期,单位:毫秒。在【第 137 行】初始化。expireFuture属性,Redis Key 过期机制任务的 Future。在【第 138 至 146 行】初始化。- 该任务主要有两个逻辑:1)延长未过期的 Key;2)删除过期的 Key。
- 任务间隔为
expirePeriod的一半,避免过于频繁,对 Redis 的压力过大;同时,避免过于不频繁,每次执行时,都过期了。
admin属性,是否监控中心,在#clean(Jedis)方法,看到具体的使用。
2.2 doRegister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void doRegister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
// 计算过期时间
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
boolean success = false;
RpcException exception = null;
// 向 Redis 注册
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
try {
// 写入 Redis Map 键
jedis.hset(key, value, expire); // 注意,过期时间,作为 Map 的值
// 发布 Redis 注册事件
jedis.publish(key, Constants.REGISTER);
success = true;
// 如果服务器端已同步数据,只需写入单台机器
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
// 处理异常
if (exception != null) {
if (success) { // 虽然发生异常,但是结果成功
logger.warn(exception.getMessage(), exception);
} else { // 最终未成功
throw exception;
}
}
}
- 第 3 行:调用
#toCategoryPath(url)方法,获得分类路径作为 Key。 - 第 4 行:调用
URL#toFullString()方法,获得URL 字符串作为 Value。 - 第 6 行:计算过期时间,当前时间 +
expirePeriod。 - 第 9 至 30 行:向 Redis 注册。
- 第 16 行:调用
Jedis#hset(key, value, expire)方法,写入 Redis Map 中。注意,过期时间,作为 Map 的值。 - 第 18 行:调用
Jedis#publish(channel, message)方法,发布register事件。这样订阅该 Key 的服务消费者和监控中心,就会实时从 Redis 读取该服务的最新数据。 - 第 21 至 23 行:如果非
replicate,意味着 Redis 服务器端已同步数据,只需写入单台机器。因此,结束循环。否则,满足replicate,向所有 Redis 写入。
- 第 16 行:调用
- 第 31 至 38 行:处理异常。这块代码胖友自己看下,注意下
exception和success赋值的地方。这块的打印告警日志的处理方式,也适用于多次重试某个操作,结果发生异常,但是最终成功。例如,HTTP 请求远程服务。
2.1.1 toCategoryPath
1
2
3
4
5
6
7
8
9
10
11
/**
* 获得分类路径
*
* Root + Service + Type
*
* @param url URL
* @return 分类路径
*/
private String toCategoryPath(URL url) {
return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}
2.3 doUnregister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public void doUnregister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
RpcException exception = null;
boolean success = false;
// 向 Redis 注册
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
try {
// 删除 Redis Map 键
jedis.hdel(key, value);
// 发布 Redis 取消注册事件
jedis.publish(key, Constants.UNREGISTER);
success = true;
// 如果服务器端已同步数据,只需写入单台机器
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
// 处理异常
if (exception != null) {
if (success) { // 虽然发生异常,但是结果成功
logger.warn(exception.getMessage(), exception);
} else { // 最终未成功
throw exception;
}
}
}
当服务消费者或服务提供者,关闭时,会调用 #doUnregister(url) 方法,取消注册。在该方法中,会删除对应 Map 中的键 + 实时发布 unregister 事件,从而通知订阅者们。因此,正常情况下,就无需监控中心,做脏数据删除的工作。
代码比较简单,和 #doRegister() 方法,逻辑相反。
2.4 doSubscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
// 获得服务路径,例如:`/dubbo/com.alibaba.dubbo.demo.DemoService`
String service = toServicePath(url);
// 获得通知器 Notifier 对象
Notifier notifier = notifiers.get(service);
// 不存在,则创建 Notifier 对象
if (notifier == null) {
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
if (notifier == newNotifier) { // 保证并发的情况下,有且仅有一个启动
notifier.start();
}
}
boolean success = false;
RpcException exception = null;
// 循环 `jedisPools`,仅向一个 Redis 发起订阅
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
try {
// 处理所有 Service 层的发起订阅,例如监控中心的订阅
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
// 获得分类层集合,例如:`/dubbo/com.alibaba.dubbo.demo.DemoService/providers`
Set<String> keys = jedis.keys(service);
if (keys != null && !keys.isEmpty()) {
// 按照服务聚合 URL 集合
Map<String, Set<String>> serviceKeys = new HashMap<String, Set<String>>(); // Key:Root + Service;Value:URL。
for (String key : keys) {
String serviceKey = toServicePath(key);
Set<String> sk = serviceKeys.get(serviceKey);
if (sk == null) {
sk = new HashSet<String>();
serviceKeys.put(serviceKey, sk);
}
sk.add(key);
}
// 循环 serviceKeys,按照每个 Service 层的发起通知
for (Set<String> sk : serviceKeys.values()) {
doNotify(jedis, sk, url, Collections.singletonList(listener));
}
}
// 处理指定 Service 层的发起通知
} else {
doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener));
}
// 标记成功
success = true;
// 结束,仅仅从一台服务器读取数据
break; // Just read one server's data
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
// 处理异常
if (exception != null) {
if (success) { // 虽然发生异常,但是结果成功
logger.warn(exception.getMessage(), exception);
} else { // 最终未成功
throw exception;
}
}
}
【第一步】Notifier 部分
- 第 4 行:调用
#toServicePath(url)方法,获得服务路径,例如/dubbo/com.alibaba.dubbo.demo.DemoService。 - 第 6 行:获得通知器 Notifier 对象。
- 第 8 至 15 行:若不存在,则创建 Notifier 对象,并调用
Notifier#start()方法。
【第二步】获取初始化数据,并进行通知
- 第 19 行:循环
jedisPools,向 Redis 发起订阅,直到一个成功。我们会看到代码,分成两个部分。
【第二部分】第 46 至 49 行,适用服务提供者和服务消费者
处理指定 Service 层的初始化数据:
- 第 48 行:调用
Jedis#keys(pattern)方法,获得指定 Service 层下的所有 URL 们。例如/dubbo/com.alibaba.dubbo.demo.DemoService/*。 - 第 48 行:调用
#doNotify(jedis, keys, url, listeners)方法,通知监听器,初始的数据。
【第一部分】第 25 至 45 行,适用注册中心
处理所有 Service 层的初始化数据:
- 第 26 行:标记
admin = true。因为,只有注册中心,才清理脏数据。 - 第 28 行:调用
Jedis#keys(pattern)方法,获得所有 Service 层下的所有 URL 们。例如/dubbo/*。 - 第 31 至 39 行:按照服务聚合 URL 集合。
- 第 42 至 44 行:循环
serviceKeys,调用#doNotify(jedis, keys, url, listeners)方法,按照每个 Service 层类似,通知监听器,初始的数据。此处,就和【第 48 行】一样了。
另外,订阅动作(【第一步】)一定要在获取初始化数据(【第二步】)之前。如果反过来,可能获取数据完后,处理的过程中,有数据的变更,我们就无法收到 register、unregister 的事件。
2.4.1 toServicePath
1
2
3
4
5
6
7
8
9
10
11
/**
* 获得服务路径
*
* Root + Type
*
* @param url URL
* @return 服务路径
*/
private String toServicePath(URL url) {
return root + url.getServiceInterface();
}
2.4.2 toServicePath
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 获得服务路径,主要截掉多余的部分
*
* Root + Type
*
* @param categoryPath 分类路径
* @return 服务路径
*/
private String toServicePath(String categoryPath) {
int i;
if (categoryPath.startsWith(root)) {
i = categoryPath.indexOf(Constants.PATH_SEPARATOR, root.length());
} else {
i = categoryPath.indexOf(Constants.PATH_SEPARATOR);
}
return i > 0 ? categoryPath.substring(0, i) : categoryPath;
}
2.5 doUnsubscribe
1
2
3
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
}
此处目前并未实现,艿艿觉得,此处应该增加取消向 Redis 的订阅(Subscribe)。在 ZookeeperRegistry 的该方法中,是移除了对应的监听器。
2.6 doNotify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// @params key 分类数组,例如:`/dubbo/com.alibaba.dubbo.demo.DemoService/providers`
private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(getSubscribed()).entrySet()) {
doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<NotifyListener>(entry.getValue()));
}
}
// @params keys 分类数组,元素例如:`/dubbo/com.alibaba.dubbo.demo.DemoService/providers`
private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
if (keys == null || keys.isEmpty() || listeners == null || listeners.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
List<URL> result = new ArrayList<URL>();
List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0])); // 分类数组
String consumerService = url.getServiceInterface(); // 服务接口
// 循环分类层,例如:`/dubbo/com.alibaba.dubbo.demo.DemoService/providers`
for (String key : keys) {
// 若服务不匹配,返回
if (!Constants.ANY_VALUE.equals(consumerService)) {
String providerService = toServiceName(key);
if (!providerService.equals(consumerService)) {
continue;
}
}
// 若订阅的不包含该分类,返回
String category = toCategoryName(key);
if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
continue;
}
// 获得所有 URL 数组
List<URL> urls = new ArrayList<URL>();
Map<String, String> values = jedis.hgetAll(key);
if (values != null && values.size() > 0) {
for (Map.Entry<String, String> entry : values.entrySet()) {
URL u = URL.valueOf(entry.getKey());
if (!u.getParameter(Constants.DYNAMIC_KEY, true) // 非动态节点,因为动态节点,不受过期的限制
|| Long.parseLong(entry.getValue()) >= now) { // 未过期
if (UrlUtils.isMatch(url, u)) {
urls.add(u);
}
}
}
}
// 若不存在匹配,则创建 `empty://` 的 URL返回,用于清空该服务的该分类。
if (urls.isEmpty()) {
urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
.setAddress(Constants.ANYHOST_VALUE)
.setPath(toServiceName(key))
.addParameter(Constants.CATEGORY_KEY, category));
}
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
}
}
if (result.isEmpty()) {
return;
}
// 全量数据获取完成时,调用 `super#notify(...)` 方法,回调 NotifyListener
for (NotifyListener listener : listeners) {
super.notify(url, listener, result);
}
}
两个重载的 #doNotify(…) 方法,主要差异点在前者少 url 和 listeners 方法参数。所以:
- 第 3 行:我们可以看到调用
#getSubscribed()方法,获得所有监听器。代码如下:
1
2
3
4
5
6
/**
* 订阅 URL 的监听器集合
*
* key:订阅者的 URL,例如消费者的 URL
*/
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
第 3 至 5 行:循环调用
#doNotify(jedis, keys, url, listeners)方法,进行通知。但是呢?这样一来,通知的事件(key)和监听器未必匹配,因此在【第 20 至 30 行】的代码,进行匹配。- 第 15 行:获得分类层(Category),即分类数组。在 《精尽 Dubbo 源码分析 —— 注册中心(二)之 Zookeeper》「4. 调用」 小节中,我们也看到,不同角色关注不同的分类数据。
- 服务消费者,关注
providers、configurations、routes。 - 服务提供者,关注
consumers。 - 监控中心,关注所有。
- 服务消费者,关注
- 第 18 行:循环分类层,即每个元素为 Root + Service + Type,例如
/dubbo/com.alibaba.dubbo.demo.DemoService/providers。 - 第 32 至 44 行:调用
Jedis#hgetAll(key)方法,获得所有 URL 数组。并且,获取完成后,会过滤掉已过期的动态节点。 - 第 45 至 51 行:若不存在匹配,则创建
empty://的 URL返回,用于清空该服务的该分类。 - 第 52 行:添加到
result中。 - 第 60 至 63 行:全量数据获取完成时,调用
super#notify(…)方法,回调 NotifyListener。该方法,在 《精尽 Dubbo 源码分析 —— 注册中心(一)之抽象 API》 有详细解析。
2.6.1 toServiceName
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 获得服务名,从服务路径上
*
* Service
*
* @param categoryPath 服务路径
* @return 服务名
*/
private String toServiceName(String categoryPath) {
String servicePath = toServicePath(categoryPath);
return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
}
2.6.2 toCategoryName
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 获得分类名,从分类路径上
*
* Type
*
* @param categoryPath 分类路径
* @return 分类名
*/
private String toCategoryName(String categoryPath) {
int i = categoryPath.lastIndexOf(Constants.PATH_SEPARATOR);
return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
}
2.7 deferExpired
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void deferExpired() {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
Jedis jedis = jedisPool.getResource();
try {
// 循环已注册的 URL 集合
for (URL url : new HashSet<URL>(getRegistered())) {
// 动态节点
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
// 获得分类路径
String key = toCategoryPath(url);
// 写入 Redis Map 中
if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
// 发布 `register` 事件。
jedis.publish(key, Constants.REGISTER);
}
}
}
// 监控中心负责删除过期脏数据
if (admin) {
clean(jedis);
}
// 如果服务器端已同步数据,只需写入单台机器
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}
被 expireExecutor 中的定时调用,整体逻辑类似 #doRegister() 方法。
- 第 8 行:调用
#getRegistered()方法,获得已注册的 URL 集合。代码如下:
1
2
3
4
5
6
/**
* 已注册 URL 集合。
*
* 注意,注册的 URL 不仅仅可以是服务提供者的,也可以是服务消费者的
*/
private final Set<URL> registered = new ConcurrentHashSet<URL>();
- 第 8 行:循环 URL 集合。
- 第 10 行:判断是否为动态节点,只有动态节点需要延长过期时间。
- 第 14 行:调用
Jedis#hset(key, value, expire)方法,写入 Redis Map 中。注意,过期时间,作为 Map 的值。 - 第 16 行:若【第 14 行】写入返回的值为 1,说明 Map 中该键对应的值不存在(例如,多写 Redis 节点时,有个节点写入失败),发布
register事件。 - 第 21 至 23 行:若是注册中心 (
admin = true) 时,调用#clean(Jedis)方法,清理过期脏数据。 - 第 25 至 27 行:如果服务器端已同步数据,只需写入单台机器。
2.7.1 clean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void clean(Jedis jedis) {
// 获得所有服务
Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
if (keys != null && !keys.isEmpty()) {
for (String key : keys) {
// 获得所有 URL
Map<String, String> values = jedis.hgetAll(key);
if (values != null && values.size() > 0) {
boolean delete = false;
long now = System.currentTimeMillis();
for (Map.Entry<String, String> entry : values.entrySet()) {
URL url = URL.valueOf(entry.getKey());
// 动态节点
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
long expire = Long.parseLong(entry.getValue());
// 已经过期
if (expire < now) {
jedis.hdel(key, entry.getKey());
delete = true;
if (logger.isWarnEnabled()) {
logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
}
}
}
}
// 若删除成功,发布 `unregister` 事件
if (delete) {
jedis.publish(key, Constants.UNREGISTER);
}
}
}
}
}
整体逻辑类似 #doUnregister() 方法。胖友自己看方法的注释哈。
2.8 isAvailable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public boolean isAvailable() {
for (JedisPool jedisPool : jedisPools.values()) {
try {
Jedis jedis = jedisPool.getResource();
try {
if (jedis.isConnected()) { // 至少一个 Redis 节点可用
return true; // At least one single machine is available.
}
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable ignored) {
}
}
return false;
}
2.9 destroy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void destroy() {
// 父类关闭
super.destroy();
// 关闭定时任务
try {
expireFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
// 关闭通知器
try {
for (Notifier notifier : notifiers.values()) {
notifier.shutdown();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
// 关闭连接池
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
jedisPool.destroy();
} catch (Throwable t) {
logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}
3. Notifier
Notifier 是 RedisRegistry 的内部类。
Notifier,继承 Thread 类,负责向 Redis 发起订阅逻辑。
3.1 构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 服务名 Root + Service
*/
private final String service;
/**
* Jedis
*/
private volatile Jedis jedis;
/**
* 是否首次
*/
private volatile boolean first = true;
/**
* 是否运行中
*/
private volatile boolean running = true;
/**
* 连接次数随机数
*/
private volatile int connectRandom;
/**
* 需要忽略连接的次数
*/
private final AtomicInteger connectSkip = new AtomicInteger();
/**
* 已经忽略连接的次数
*/
private final AtomicInteger connectSkiped = new AtomicInteger();
/**
* 随机
*/
private final Random random = new Random();
service属性,服务名 Root + Service。first属性,是否首次。在#run()方法中,查看。
【第 13 至 32 行】的属性,相当于重连策略,用于和 Redis 断开时,忽略一定次数和 Redis 的连接,避免空跑。涉及方法如下:
#isSkip() 方法,判断是否忽略本次对 Redis 的连接。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private boolean isSkip() {
// 获得需要忽略连接的总次数。如果超过 10,则加上一个 10 以内的随机数。
int skip = connectSkip.get(); // Growth of skipping times
if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
if (connectRandom == 0) {
connectRandom = random.nextInt(10);
}
skip = 10 + connectRandom;
}
// 自增忽略次数。若忽略次数不够,则继续忽略。
if (connectSkiped.getAndIncrement() < skip) { // Check the number of skipping times
return true;
}
// 增加需要忽略的次数
connectSkip.incrementAndGet();
// 重置已忽略次数和随机数
connectSkiped.set(0);
connectRandom = 0;
return false;
}
- 第 2 至 9 行:获得需要忽略连接的总次数。如果超过 10,则加上一个 10 以内的随机数。思路是,连接失败的次数越多,每一轮加大需要忽略的总次数,并且带有一定的随机性。
- 第 10 至 13 行:自增忽略次数。若忽略次数不够,则继续忽略,即返回
true。 - 第 15 行:增加需要忽略的次数。也就是说,下一轮,不考虑随机数,会多一次。如下是一次模拟:
第一轮:
connectSkip: 0; connectSkiped: 0
第二轮:
connectSkip: 1; connectSkiped: 0connectSkip: 1; connectSkiped: 1
第三轮:
connectSkip: 2; connectSkiped: 0connectSkip: 2; connectSkiped: 1connectSkip: 2; connectSkiped: 2
当超过十轮后,增加随机数。
- 第 16 至 18 行:重置已忽略次数和随机数。
#resetSkip() 方法,重置忽略连接的信息。代码如下:
1
2
3
4
5
6
7
private void resetSkip() {
// 重置需要连接的次数
connectSkip.set(0);
// 重置已忽略次数和随机数
connectSkiped.set(0);
connectRandom = 0;
}
3.2 run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public void run() {
while (running) {
try {
// 是否跳过本次 Redis 连接
if (!isSkip()) {
try {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
jedis = jedisPool.getResource();
try {
// 监控中心
if (service.endsWith(Constants.ANY_VALUE)) {
if (!first) {
first = false;
Set<String> keys = jedis.keys(service);
if (keys != null && !keys.isEmpty()) {
for (String s : keys) {
doNotify(jedis, s);
}
}
resetSkip();
}
// 批订阅
jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
// 服务提供者或消费者
} else {
if (!first) {
first = false;
doNotify(jedis, service);
resetSkip();
}
// 批订阅
jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
}
break;
} finally {
jedisPool.returnBrokenResource(jedis);
}
} catch (Throwable t) { // Retry another server
logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
// If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
- 第 3 行:循环执行,直到关闭。
- 第 6 行:调用
#isSkip()方法,判断是否跳过本次 Redis 连接。But,即使跳过,也没有执行类似 sleep 的逻辑,有点奇怪。这样,会导致实际即使跳过,也会快速向 Redis 发起订阅。【TODO 8032】Redis 重连逻辑 - 第 8 行:循环连接池,发起订阅,直到一个成功。
【第一种情况】第 14 至 26 行:监控中心
- 第 15 至 24 行:目前这块代码有一些问题?!初始时
first=true,那么这块代码永远无法执行到。笔者猜测这块的意图是,在 ZookeeperRegistry 中,可以实现对连接状态的监听,从而实现断开重连成功后,从 Zookeeper 获取到最新的数据。代码如下:
1
2
3
4
5
6
7
8
9
10
11
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
而 JedisPool 中,不提供这样的连接监控机制。那么如果订阅 Redis 发生了异常,我们可以认为 Redis 连接断开了,需要重新发起订阅,并且需要重新从 Redis 中获取到最新的数据。
那么此处的代码可以这样改:
- 第 16 行:
first = true; - 第 42 行:增加
first = false; - 第 26 行:调用
Jedis#psubscribe(JedisPubSub jedisPubSub, String... patterns)方法,订阅所有 Service 层。
【第二种情况】第 27 至 36 行:服务提供者或消费者
- 第 29 至 34 行:和【第 15 至 24 行】类似。
- 第 35 行:调用
Jedis#psubscribe(JedisPubSub jedisPubSub, String... patterns)方法,订阅指定 Service 层。
======================================================
- 第 37 至 40 行:无法执行到,因为
Jedis#psubscribe(JedisPubSub jedisPubSub, String... patterns)方法,是阻塞的。这也是为什么 Notifier 是一个 Thread 的原因。 - 第 41 至 45 行:发生异常,说明 Redis 连接断开了。因此,调用
#sleep(millis)方法,等待 Redis 重连成功。通过这样的方式,避免执行,占用大量的 CPU 资源。
3.3 shutdown
1
2
3
4
5
6
7
8
9
10
public void shutdown() {
try {
// 停止运行
running = false;
// Jedis 断开连接
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
4. NotifySub
NotifySub 是 RedisRegistry 的内部类。
NotifySub,实现 redis.clients.jedis.JedisPubSub 抽象类,通知订阅实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private class NotifySub extends JedisPubSub {
private final JedisPool jedisPool;
public NotifySub(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
if (msg.equals(Constants.REGISTER)
|| msg.equals(Constants.UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
try {
doNotify(jedis, key);
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
logger.error(t.getMessage(), t);
}
}
}
@Override
public void onPMessage(String pattern, String key, String msg) {
onMessage(key, msg);
}
@Override
public void onSubscribe(String key, int num) {
}
@Override
public void onPSubscribe(String pattern, int num) {
}
@Override
public void onUnsubscribe(String key, int num) {
}
@Override
public void onPUnsubscribe(String pattern, int num) {
}
}
实现了 #onMessage(key, msg) 和 #onPMessage(pattern, key, msg) 方法,收到 register、unregister 事件,调用 #doNotify(jedis, key) 方法,通知监听器,数据变化,从而实现实时更新。
5. 可靠性
FROM 《Dubbo 用户指南 —— Redis 注册中心》
阿里内部并没有采用 Redis 做为注册中心,而是使用自己实现的基于数据库的注册中心,即:Redis 注册中心并没有在阿里内部长时间运行的可靠性保障,此 Redis 桥接实现只为开源版本提供,其可靠性依赖于 Redis 本身的可靠性。
FROM 《Dubbo 用户指南 —— 成熟度》
Redis注册中心
- Maturity:Stable
- Strength:支持基于客户端双写的集群方式,性能高
- Problem:要求服务器时间同步,用于检查心跳过期脏数据
- Advise:可用于生产环境
做个小笔记,Redis 主从复制的情况下,从节点的订阅(Subscribe),可以收到主节点的发布(Publish)。做这个笔记的原因是,原来担心 “failover” 模式下,Redis 主节点挂了,如果订阅从节点,会不会出现,Redis 主节点恢复后,收不到在其上的发布事件。
666. 彩蛋
看了 Redis 的注册中心的实现,收获还是蛮大的。原来的一直纠结,如何解决 Redis 自动过期,怎么监听到。原来,自己的思路错了!!!


