本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文,我们分享 http:// 协议的远程调用,主要分成三个部分:
对应项目为 dubbo-rpc-http 。
对应文档为 《Dubbo 用户指南 —— http://》 。定义如下:
基于 HTTP 表单的远程调用协议,采用 Spring 的 HttpInvoker 实现
注意,从定义上我们可以看出,不是我们常规理解的 HTTP 调用,而是 Spring 的 HttpInvoker 。
本文涉及类图(红圈部分)如下:
2. AbstractProxyProtocol
com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol ,实现 AbstractProtocol 抽象类,Proxy 协议抽象类。为 HttpProtocol 、RestProtocol 等子类,提供公用的服务暴露、服务引用的公用方法,同时定义了如下抽象方法,用于不同子类协议实现类的自定义的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| /**
* 执行暴露,并返回取消暴露的回调 Runnable
*
* @param impl 服务 Proxy 对象
* @param type 服务接口
* @param url URL
* @param <T> 服务接口
* @return 消暴露的回调 Runnable
* @throws RpcException 当发生异常
*/
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;
/**
* 执行引用,并返回调用远程服务的 Service 对象
*
* @param type 服务接口
* @param url URL
* @param <T> 服务接口
* @return 调用远程服务的 Service 对象
* @throws RpcException 当发生异常
*/
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;
|
2.1 构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| /**
* 需要抛出的异常类集合,详见 {@link #reder(Class, URL)} 方法。
*/
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();
/**
* ProxyFactory 对象
*/
private ProxyFactory proxyFactory;
public AbstractProxyProtocol() { }
public AbstractProxyProtocol(Class<?>... exceptions) {
for (Class<?> exception : exceptions) {
addRpcException(exception);
}
}
public void addRpcException(Class<?> exception) {
this.rpcExceptions.add(exception);
}
|
rpcExceptions 属性,不同协议的远程调用,会抛出的异常是不同的。在 #refer(Class, URL) 方法中,我们会看到对这个属性的使用,理解会更清晰一些。
2.2 export
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| /**
* Exporter 集合
*
* key: 服务键 {@link #serviceKey(URL)} 或 {@link URL#getServiceKey()} 。
* 不同协议会不同
*/
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
// FROM AbstractProtocol.java
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
// 获得服务键
final String uri = serviceKey(invoker.getUrl());
// 获得 Exporter 对象。若已经暴露,直接返回。
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
return exporter;
}
// 执行暴露服务
final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
// 创建 Exporter 对象
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
// 取消暴露
super.unexport();
exporterMap.remove(uri);
// 执行取消暴露的回调
if (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
// 添加到 Exporter 集合
exporterMap.put(uri, exporter);
return exporter;
}
|
- 第 5 行:调用
#serviceKey(url) 方法,获得服务键。代码如下:
1
2
3
| protected static String serviceKey(URL url) {
return ProtocolUtils.serviceKey(url);
}
|
- 第 6 至 10 行:从
exporterMap 中,获得 Exporter 对象。若已经暴露,直接返回。 - 第 12 行:调用
ProxyFactory#getProxy(invoker) 方法,获得 Service Proxy 对象。 - 第 12 行:调用 抽象子类实现
#doExport(impl, type, url) 方法,执行暴露服务。 - 第 14 至 31 行:创建 Exporter 对象。基于 AbstractExporter 抽象类实现,覆写
#unexport() 方法,代码如下:- 第 18 至 20 行:取消暴露。
- 第 22 至 28 行:调用
Runnable#run() 方法,执行取消暴露的回调方法。
- 第 33 行:添加到 Exporter 集合。
2.3 refer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| /**
* Invoker 集合
*/
//TODO SOFEREFENCE
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
// FROM AbstractProtocol.java
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
// 执行引用服务
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
// 创建 Invoker 对象
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
// 调用
Result result = target.invoke(invocation);
// 若返回结果带有异常,并且需要抛出,则抛出异常。
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
}
}
}
return result;
} catch (RpcException e) {
// 若是未知异常,获得异常对应的错误码
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
// 抛出 RpcException 异常
throw getRpcException(type, url, invocation, e);
}
}
};
// 添加到 Invoker 集合。
invokers.add(invoker);
return invoker;
}
|
- 第 4 行:调用 抽象子类实现
#doRefer(type, url) 方法,执行引用服务。 - 第 4 行:调用
ProxyFactory#getInvoker(proxy, type, url) 方法,获得 Invoker 对象。 - 第 6 至 35 行:创建 Invoker 对象。基于 AbstractExporter 抽象类实现,覆写
#doInvoke(invocation) 方法,代码如下:- 第 12 行:调用
Invoker#invoke(invocation) 方法,执行 RPC 调用。 - 第 13 至 21 行:若返回结果带有异常,并且需要抛出( 异常在
rpcExceptions 中),则抛出异常。 - 第 22 行:返回调用结果。
- 第 23 至 28 行:若捕捉到 RpcException 异常,调用
#getErrorCode(Throwable) 方法,获得异常对应的错误码。代码如下:
1
2
3
4
5
6
7
8
9
| /**
* 获得异常对应的错误码
*
* @param e 异常
* @return 错误码
*/
protected int getErrorCode(Throwable e) {
return RpcException.UNKNOWN_EXCEPTION;
}
|
- 子类协议实现类,一般会覆写这个方法,实现自己异常的翻译。
- 第 29 至 32 行:若捕捉到 Throwable 异常,调用
#getRpcException(type, url, invocation, e) 方法,包装成 RpcException 异常,代码如下:
1
2
3
4
5
6
| protected RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) {
RpcException re = new RpcException("Failed to invoke remote service: " + type + ", method: "
+ invocation.getMethodName() + ", cause: " + e.getMessage(), e);
re.setCode(getErrorCode(e));
return re;
}
|
3. HttpProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol ,实现 AbstractProxyProtocol 抽象类,dubbo:// 协议实现类。
3.1 构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| /**
* 默认服务器端口
*/
public static final int DEFAULT_PORT = 80;
/**
* Http 服务器集合
*
* key:ip:port
*/
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();
/**
* Spring HttpInvokerServiceExporter 集合
*
* key:path 服务名
*/
private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>();
/**
* HttpBinder$Adaptive 对象
*/
private HttpBinder httpBinder;
public HttpProtocol() {
super(RemoteAccessException.class);
}
public void setHttpBinder(HttpBinder httpBinder) {
this.httpBinder = httpBinder;
}
|
serverMap 属性,HttpServer 集合。键为 ip:port,通过 #getAddr(url) 方法,计算。代码如下:
1
2
3
4
5
6
7
8
| // AbstractProxyProtocol.java
protected String getAddr(URL url) {
String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
if (url.getParameter(Constants.ANYHOST_KEY, false)) {
bindIp = Constants.ANYHOST_VALUE;
}
return NetUtils.getIpByHost(bindIp) + ":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
}
|
skeletonMap 属性,Spring HttpInvokerServiceExporter 集合。请求处理过程为 HttpServer => DispatcherServlet => InternalHandler => HttpInvokerServiceExporter。httpBinder 属性,HttpBinder$Adaptive 对象,通过 #setHttpBinder(httpBinder) 方法,Dubbo SPI 调用设置。rpcExceptions = RemoteAccessException.class。
3.2 doExport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| @Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
// 获得服务器地址
String addr = getAddr(url);
// 获得 HttpServer 对象。若不存在,进行创建。
HttpServer server = serverMap.get(addr);
if (server == null) {
server = httpBinder.bind(url, new InternalHandler()); // InternalHandler
serverMap.put(addr, server);
}
// 创建 HttpInvokerServiceExporter 对象
final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
httpServiceExporter.setServiceInterface(type);
httpServiceExporter.setService(impl);
try {
httpServiceExporter.afterPropertiesSet();
} catch (Exception e) {
throw new RpcException(e.getMessage(), e);
}
// 添加到 skeletonMap 中
final String path = url.getAbsolutePath();
skeletonMap.put(path, httpServiceExporter);
// 返回取消暴露的回调 Runnable
return new Runnable() {
public void run() {
skeletonMap.remove(path);
}
};
}
|
- 基于 通信服务器 dubbo-remoting-http 项目,作为服务端。
- 第 4 行:调用
#getAddr(url) 方法,获得服务器地址。 - 第 5 至 10 行:从
serverMap 中,获得 HttpServer 对象。若不存在,调用 HttpBinder#bind(url, handler) 方法,创建 HttpServer 对象。此处使用的 InternalHandler ,下文详细解析。 - 第 11 至 19 行:创建 HttpInvokerServiceExporter 对象。
- 第 20 至 22 行:添加到
skeletonMap 集合中。 - 第 23 至 28 行:返回取消暴露的回调 Runnable 对象。
3.2.1 InternalHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| private class InternalHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response) throws ServletException {
String uri = request.getRequestURI();
// 获得 HttpInvokerServiceExporter 对象
HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
// 必须是 POST 请求
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
// 执行调用
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
skeleton.handleRequest(request, response);
} catch (Throwable e) {
throw new ServletException(e);
}
}
}
}
|
3.3 doRefer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| @Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
// 创建 HttpInvokerProxyFactoryBean 对象
final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
httpProxyFactoryBean.setServiceUrl(url.toIdentityString());
httpProxyFactoryBean.setServiceInterface(serviceType);
// 创建执行器 SimpleHttpInvokerRequestExecutor 对象
String client = url.getParameter(Constants.CLIENT_KEY);
if (client == null || client.length() == 0 || "simple".equals(client)) {
SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
protected void prepareConnection(HttpURLConnection con,
int contentLength) throws IOException {
super.prepareConnection(con, contentLength);
con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
}
};
httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
// 创建执行器 HttpComponentsHttpInvokerRequestExecutor 对象
} else if ("commons".equals(client)) {
HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
} else {
throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons");
}
httpProxyFactoryBean.afterPropertiesSet();
// 返回 HttpInvokerProxyFactoryBean 对象
return (T) httpProxyFactoryBean.getObject();
}
|
- 基于 HttpClient ,作为通信客户端。
- 第 4 至 7 行:创建 HttpInvokerProxyFactoryBean 对象。
- 第 9 至 27 行:获得
client 配置项,根据该配置项,创建对应的执行器。- “simple”:第 10 至 19 行:创建执行器 SimpleHttpInvokerRequestExecutor 对象。
- “commons”:第 20 至 24 行:创建执行器 HttpComponentsHttpInvokerRequestExecutor 对象。
- 两者的差异点在于使用的 HttpClient 不同,前者使用 JDK HttpClient ,后者使用 Apache HttpClient。
- 第 30 行:返回 HttpInvokerProxyFactoryBean 对象。
- 具体 RPC 调用的实现,在父类
#refer() 方法里。
3.3.1 getErrorCode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| @Override
@SuppressWarnings("Duplicates")
protected int getErrorCode(Throwable e) {
if (e instanceof RemoteAccessException) {
e = e.getCause();
}
if (e != null) {
Class<?> cls = e.getClass();
if (SocketTimeoutException.class.equals(cls)) {
return RpcException.TIMEOUT_EXCEPTION;
} else if (IOException.class.isAssignableFrom(cls)) {
return RpcException.NETWORK_EXCEPTION;
} else if (ClassNotFoundException.class.isAssignableFrom(cls)) {
return RpcException.SERIALIZATION_EXCEPTION;
}
}
return super.getErrorCode(e);
}
|
666. 彩蛋
来自清明节,一边食物中毒,一边更新。