服务调用(八)之远程调用(Redis)
服务调用(八)之远程调用(Redis)
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文,我们分享 redis:// 协议的远程调用,主要分成两个个部分:
服务暴露- 服务引用
- 服务调用
对应项目为 dubbo-rpc-redis 。
对应文档为 《Dubbo 用户指南 —— redis://》 。定义如下:
基于 Redis 实现的 RPC 协议。
简单的说,通过 Dubbo Service 的调用方式,透明化对 Redis 的访问。这样,如果未来希望,修改缓存的解决方案,不用修改代码,而只要修改 Dubbo Service 的配置。就好像,Java JDBC API 有 MySQL JDBC 、Oracle JDBC 等多种实现,只需要修改对应的 JDBC 驱动实现类,就可以连接上不同的数据库。
另外,Dubbo 提供 memcached:// 协议,和 redis:// 对等,差别点在前者使用 Memcached ,后者使用 Redis 。
2. RedisProtocol
com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol ,实现 AbstractProtocol 抽象类,redis:// 协议实现类。
2.1 export
1
2
3
4
@Override
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl());
}
实际访问的就是 Redis Server 实例,因此无需进行 Dubbo 服务暴露。客户端配置引用方式如下:
在客户端使用,注册中心读取:
或者,点对点直连:
1
<dubbo:reference id="store" interface="java.util.Map" url="redis://10.20.153.10:6379" />
2.2 refer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
try {
// 创建 GenericObjectPoolConfig 对象,设置配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
if (url.getParameter("max.idle", 0) > 0)
config.setMaxIdle(url.getParameter("max.idle", 0));
if (url.getParameter("min.idle", 0) > 0)
config.setMinIdle(url.getParameter("min.idle", 0));
if (url.getParameter("max.active", 0) > 0)
config.setMaxTotal(url.getParameter("max.active", 0));
if (url.getParameter("max.total", 0) > 0)
config.setMaxTotal(url.getParameter("max.total", 0));
if (url.getParameter("max.wait", 0) > 0)
config.setMaxWaitMillis(url.getParameter("max.wait", 0));
if (url.getParameter("num.tests.per.eviction.run", 0) > 0)
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
// 创建 JedisPool 对象
final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
// 处理方法名的映射
final int expiry = url.getParameter("expiry", 0);
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
// 创建 Invoker 对象
return new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) {
Jedis resource = null;
try {
// 获得 Redis Resource
resource = jedisPool.getResource();
// Redis get 指令
if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 获得值
byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes());
if (value == null) {
return new RpcResult();
}
// 反序列化
ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
// 返回结果
return new RpcResult(oin.readObject());
// Redis set/put 指令
} else if (set.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 序列化
byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutput value = getSerialization(url).serialize(url, output);
value.writeObject(invocation.getArguments()[1]);
// 设置值
resource.set(key, output.toByteArray());
if (expiry > 1000) {
resource.expire(key, expiry / 1000);
}
// 返回结果
return new RpcResult();
} else if (delete.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 删除值
resource.del(String.valueOf(invocation.getArguments()[0]).getBytes());
// 返回结果
return new RpcResult();
} else {
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
}
} catch (Throwable t) {
RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
re.setCode(RpcException.TIMEOUT_EXCEPTION);
} else if (t instanceof JedisConnectionException || t instanceof IOException) {
re.setCode(RpcException.NETWORK_EXCEPTION);
} else if (t instanceof JedisDataException) {
re.setCode(RpcException.SERIALIZATION_EXCEPTION);
}
throw re;
} finally {
// 归还 Redis Resource
if (resource != null) {
try {
jedisPool.returnResource(resource);
} catch (Throwable t) {
logger.warn("returnResource error: " + t.getMessage(), t);
}
}
}
}
@Override
public void destroy() {
// 标记销毁
super.destroy();
// 销毁 Redis Pool
try {
jedisPool.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
} catch (Throwable t) {
throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
}
}
- 使用 Jedis 访问 Redis Server 。
- 第 4 至 24 行:创建 GenericObjectPoolConfig 对象,从 Dubbo URL 中读取相关配置。此时,我们可以看到,为什么 Dubbo 的配置类中,有
arguments属性了。可以使用它,实现不同 Protocol 协议的自定义属性。 - 第 25 至 27 行:创建 JedisPool 对象。
- 第 29 至 33 行:处理方法名的映射。如果方法名和 redis 的标准方法名不相同,则需要配置映射关系:
- 当对应的服务接口是 Map
java.util.Map时,对应的 Redis 数据结构为。
- 当对应的服务接口是 Map
- 第 35 至 120 行:创建 Invoker 对象。
2.2.1 doInvoke
- 第 43 行:获得 Redis Resource 对象。
- 第 44 至 57 行:Redis get 指令。
- 第 58 至 74 行:Redis set/put 指令。
- 第 75 至 83 行:Redis delete/remove 指令。
- 第 84 至 85 行:目前其他命令,暂时不支持。
- 第 86 至 95 行:翻译异常成 Dubbo 错误码。
- 第 97 至 105 行:归还 Redis Resource 对象。
2.2.2 destroy
- 第 111 行:调用
super#destroy()方法,标记销毁。 - 第 112 至 117 行:调用
JedisPool#destroy()方法,销毁 Redis Pool 。
666. 彩蛋
没有彩蛋~
本文由作者按照 CC BY 4.0 进行授权
