文章

服务调用(三)之远程调用(HTTP)

服务调用(三)之远程调用(HTTP)

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文,我们分享 http:// 协议的远程调用,主要分成三个部分

  • 服务暴露
  • 服务引用
  • 服务调用

对应项目为 dubbo-rpc-http 。

对应文档为 《Dubbo 用户指南 —— http://》 。定义如下:

基于 HTTP 表单的远程调用协议,采用 Spring 的 HttpInvoker 实现

注意,从定义上我们可以看出,不是我们常规理解的 HTTP 调用,而是 Spring 的 HttpInvoker

本文涉及类图(红圈部分)如下:

类图

2. AbstractProxyProtocol

com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol ,实现 AbstractProtocol 抽象类,Proxy 协议抽象类。为 HttpProtocol 、RestProtocol 等子类,提供公用的服务暴露、服务引用的公用方法,同时定义了如下抽象方法,用于不同子类协议实现类的自定义的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * 执行暴露,并返回取消暴露的回调 Runnable
 *
 * @param impl 服务 Proxy 对象
 * @param type 服务接口
 * @param url URL
 * @param <T> 服务接口
 * @return 消暴露的回调 Runnable
 * @throws RpcException 当发生异常
 */
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;

/**
 * 执行引用,并返回调用远程服务的 Service 对象
 *
 * @param type 服务接口
 * @param url URL
 * @param <T> 服务接口
 * @return 调用远程服务的 Service 对象
 * @throws RpcException 当发生异常
 */
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;

2.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * 需要抛出的异常类集合,详见 {@link #reder(Class, URL)} 方法。
 */
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();
/**
 * ProxyFactory 对象
 */
private ProxyFactory proxyFactory;

public AbstractProxyProtocol() { }

public AbstractProxyProtocol(Class<?>... exceptions) {
    for (Class<?> exception : exceptions) {
        addRpcException(exception);
    }
}

public void addRpcException(Class<?> exception) {
    this.rpcExceptions.add(exception);
}

  • rpcExceptions 属性,不同协议的远程调用,会抛出的异常是不同的。在 #refer(Class, URL) 方法中,我们会看到对这个属性的使用,理解会更清晰一些。

2.2 export

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
 * Exporter 集合
 *
 * key: 服务键 {@link #serviceKey(URL)} 或 {@link URL#getServiceKey()} 。
 *      不同协议会不同
 */
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

// FROM AbstractProtocol.java
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
    // 获得服务键
    final String uri = serviceKey(invoker.getUrl());
    // 获得 Exporter 对象。若已经暴露,直接返回。
    Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
    if (exporter != null) {
        return exporter;
    }
    // 执行暴露服务
    final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl());
    // 创建 Exporter 对象
    exporter = new AbstractExporter<T>(invoker) {

        @Override
        public void unexport() {
            // 取消暴露
            super.unexport();
            exporterMap.remove(uri);
            // 执行取消暴露的回调
            if (runnable != null) {
                try {
                    runnable.run();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

    };
    // 添加到 Exporter 集合
    exporterMap.put(uri, exporter);
    return exporter;
}

  • 第 5 行:调用 #serviceKey(url) 方法,获得服务键。代码如下:
1
2
3
protected static String serviceKey(URL url) {
    return ProtocolUtils.serviceKey(url);
}

  • 第 6 至 10 行:从 exporterMap 中,获得 Exporter 对象。若已经暴露,直接返回。
  • 第 12 行:调用 ProxyFactory#getProxy(invoker) 方法,获得 Service Proxy 对象。
  • 第 12 行:调用 抽象子类实现 #doExport(impl, type, url) 方法,执行暴露服务。
  • 第 14 至 31 行:创建 Exporter 对象。基于 AbstractExporter 抽象类实现,覆写 #unexport() 方法,代码如下:
    • 第 18 至 20 行:取消暴露。
    • 第 22 至 28 行:调用 Runnable#run() 方法,执行取消暴露的回调方法。
  • 第 33 行:添加到 Exporter 集合。

2.3 refer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * Invoker 集合
 */
//TODO SOFEREFENCE
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();

// FROM AbstractProtocol.java
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
    // 执行引用服务
    final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
    // 创建 Invoker 对象
    Invoker<T> invoker = new AbstractInvoker<T>(type, url) {

        @Override
        protected Result doInvoke(Invocation invocation) throws Throwable {
            try {
                // 调用
                Result result = target.invoke(invocation);
                // 若返回结果带有异常,并且需要抛出,则抛出异常。
                Throwable e = result.getException();
                if (e != null) {
                    for (Class<?> rpcException : rpcExceptions) {
                        if (rpcException.isAssignableFrom(e.getClass())) {
                            throw getRpcException(type, url, invocation, e);
                        }
                    }
                }
                return result;
            } catch (RpcException e) {
                // 若是未知异常,获得异常对应的错误码
                if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                    e.setCode(getErrorCode(e.getCause()));
                }
                throw e;
            } catch (Throwable e) {
                // 抛出 RpcException 异常
                throw getRpcException(type, url, invocation, e);
            }
        }

    };
    // 添加到 Invoker 集合。
    invokers.add(invoker);
    return invoker;
}

  • 第 4 行:调用 抽象子类实现 #doRefer(type, url) 方法,执行引用服务。
  • 第 4 行:调用 ProxyFactory#getInvoker(proxy, type, url) 方法,获得 Invoker 对象。
  • 第 6 至 35 行:创建 Invoker 对象。基于 AbstractExporter 抽象类实现,覆写 #doInvoke(invocation) 方法,代码如下:
    • 第 12 行:调用 Invoker#invoke(invocation) 方法,执行 RPC 调用。
    • 第 13 至 21 行:若返回结果带有异常,并且需要抛出( 异常在 rpcExceptions 中),则抛出异常。
    • 第 22 行:返回调用结果。
    • 第 23 至 28 行:若捕捉到 RpcException 异常,调用 #getErrorCode(Throwable) 方法,获得异常对应的错误码。代码如下:
1
2
3
4
5
6
7
8
9
/**
 * 获得异常对应的错误码
 *
 * @param e 异常
 * @return 错误码
 */
protected int getErrorCode(Throwable e) {
    return RpcException.UNKNOWN_EXCEPTION;
}

  • 子类协议实现类,一般会覆写这个方法,实现自己异常的翻译。
  • 第 29 至 32 行:若捕捉到 Throwable 异常,调用 #getRpcException(type, url, invocation, e) 方法,包装成 RpcException 异常,代码如下:
1
2
3
4
5
6
protected RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) {
    RpcException re = new RpcException("Failed to invoke remote service: " + type + ", method: "
            + invocation.getMethodName() + ", cause: " + e.getMessage(), e);
    re.setCode(getErrorCode(e));
    return re;
}

  • 第 37 行:添加到 Invoker 集合。

3. HttpProtocol

com.alibaba.dubbo.rpc.protocol.http.HttpProtocol ,实现 AbstractProxyProtocol 抽象类,dubbo:// 协议实现类。

3.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * 默认服务器端口
 */
public static final int DEFAULT_PORT = 80;
/**
 * Http 服务器集合
 *
 * key:ip:port
 */
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();
/**
 * Spring HttpInvokerServiceExporter 集合
 *
 * key:path 服务名
 */
private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>();
/**
 * HttpBinder$Adaptive 对象
 */
private HttpBinder httpBinder;

public HttpProtocol() {
    super(RemoteAccessException.class);
}

public void setHttpBinder(HttpBinder httpBinder) {
    this.httpBinder = httpBinder;
}

  • serverMap 属性,HttpServer 集合。键为 ip:port,通过 #getAddr(url) 方法,计算。代码如下:
1
2
3
4
5
6
7
8
// AbstractProxyProtocol.java
protected String getAddr(URL url) {
    String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
    if (url.getParameter(Constants.ANYHOST_KEY, false)) {
        bindIp = Constants.ANYHOST_VALUE;
    }
    return NetUtils.getIpByHost(bindIp) + ":" + url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
}

  • skeletonMap 属性,Spring HttpInvokerServiceExporter 集合。请求处理过程为 HttpServer => DispatcherServlet => InternalHandler => HttpInvokerServiceExporter
  • httpBinder 属性,HttpBinder$Adaptive 对象,通过 #setHttpBinder(httpBinder) 方法,Dubbo SPI 调用设置。
  • rpcExceptions = RemoteAccessException.class

3.2 doExport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
    // 获得服务器地址
    String addr = getAddr(url);
    // 获得 HttpServer 对象。若不存在,进行创建。
    HttpServer server = serverMap.get(addr);
    if (server == null) {
        server = httpBinder.bind(url, new InternalHandler()); // InternalHandler
        serverMap.put(addr, server);
    }
    // 创建 HttpInvokerServiceExporter 对象
    final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
    httpServiceExporter.setServiceInterface(type);
    httpServiceExporter.setService(impl);
    try {
        httpServiceExporter.afterPropertiesSet();
    } catch (Exception e) {
        throw new RpcException(e.getMessage(), e);
    }
    // 添加到 skeletonMap 中
    final String path = url.getAbsolutePath();
    skeletonMap.put(path, httpServiceExporter);
    // 返回取消暴露的回调 Runnable
    return new Runnable() {
        public void run() {
            skeletonMap.remove(path);
        }
    };
}

  • 基于 通信服务器 dubbo-remoting-http 项目,作为服务端。
  • 第 4 行:调用 #getAddr(url) 方法,获得服务器地址。
  • 第 5 至 10 行:从 serverMap 中,获得 HttpServer 对象。若不存在,调用 HttpBinder#bind(url, handler) 方法,创建 HttpServer 对象。此处使用的 InternalHandler ,下文详细解析。
  • 第 11 至 19 行:创建 HttpInvokerServiceExporter 对象。
  • 第 20 至 22 行:添加到 skeletonMap 集合中。
  • 第 23 至 28 行:返回取消暴露的回调 Runnable 对象。

3.2.1 InternalHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private class InternalHandler implements HttpHandler {

    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response) throws ServletException {
        String uri = request.getRequestURI();
        // 获得 HttpInvokerServiceExporter 对象
        HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
        // 必须是 POST 请求
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            response.setStatus(500);
        // 执行调用
        } else {
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
            try {
                skeleton.handleRequest(request, response);
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }

}

3.3 doRefer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
    // 创建 HttpInvokerProxyFactoryBean 对象
    final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
    httpProxyFactoryBean.setServiceUrl(url.toIdentityString());
    httpProxyFactoryBean.setServiceInterface(serviceType);
    // 创建执行器 SimpleHttpInvokerRequestExecutor 对象
    String client = url.getParameter(Constants.CLIENT_KEY);
    if (client == null || client.length() == 0 || "simple".equals(client)) {
        SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
            protected void prepareConnection(HttpURLConnection con,
                                             int contentLength) throws IOException {
                super.prepareConnection(con, contentLength);
                con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
                con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
            }
        };
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    // 创建执行器 HttpComponentsHttpInvokerRequestExecutor 对象
    } else if ("commons".equals(client)) {
        HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
        httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    } else {
        throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons");
    }
    httpProxyFactoryBean.afterPropertiesSet();
    // 返回 HttpInvokerProxyFactoryBean 对象
    return (T) httpProxyFactoryBean.getObject();
}

  • 基于 HttpClient ,作为通信客户端
  • 第 4 至 7 行:创建 HttpInvokerProxyFactoryBean 对象。
  • 第 9 至 27 行:获得 client 配置项,根据该配置项,创建对应的执行器。
    • “simple”:第 10 至 19 行:创建执行器 SimpleHttpInvokerRequestExecutor 对象。
    • “commons”:第 20 至 24 行:创建执行器 HttpComponentsHttpInvokerRequestExecutor 对象。
    • 两者的差异点在于使用的 HttpClient 不同,前者使用 JDK HttpClient ,后者使用 Apache HttpClient。
  • 第 30 行:返回 HttpInvokerProxyFactoryBean 对象。
  • 具体 RPC 调用的实现,在父类 #refer() 方法里。

3.3.1 getErrorCode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
@SuppressWarnings("Duplicates")
protected int getErrorCode(Throwable e) {
    if (e instanceof RemoteAccessException) {
        e = e.getCause();
    }
    if (e != null) {
        Class<?> cls = e.getClass();
        if (SocketTimeoutException.class.equals(cls)) {
            return RpcException.TIMEOUT_EXCEPTION;
        } else if (IOException.class.isAssignableFrom(cls)) {
            return RpcException.NETWORK_EXCEPTION;
        } else if (ClassNotFoundException.class.isAssignableFrom(cls)) {
            return RpcException.SERIALIZATION_EXCEPTION;
        }
    }
    return super.getErrorCode(e);
}

  • 将异常,翻译成 Dubbo 异常码。

666. 彩蛋

知识星球

来自清明节,一边食物中毒,一边更新。

本文由作者按照 CC BY 4.0 进行授权