文章

服务调用(二)之远程调用(Dubbo)1通信实现

服务调用(二)之远程调用(Dubbo)1通信实现

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

从本文开始,我们开始分享 dubbo:// 协议的远程调用,主要分成四个部分

  1. 通信实现
  2. 同步调用
  3. 异步调用
  4. 参数回调

本文分享 通信实现 部分。

《精尽 Dubbo 源码解析 —— NIO 服务器》 系列,是本文的前置文章,所以胖友需要先读完这个系列。哈哈哈,当然,也可以凑合看看先。

本文涉及类图如下:

类图

类图

2. Server

《精尽 Dubbo 源码分析 —— 服务引用(二)之远程暴露(Dubbo)》 中,我们看到使用的 Server 实现类是 HeaderExchangeServer

3. Client

《精尽 Dubbo 源码分析 —— 服务引用(二)之远程引用(Dubbo)》 中,我们看到使用的 Client 实现类是 ReferenceCountExchangeClientLazyConnectExchangeClient

4. ExchangeHandler

在 DubboProtocol 中,实现了 ExchangeHandler ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    @Override
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        // ... 省略具体实现
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // ... 省略具体实现
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        this.invoke(channel, Constants.ON_CONNECT_KEY);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        // ... 省略具体实现
    }

    private void invoke(Channel channel, String methodKey) {
        // ... 省略具体实现
    }

};

这个处理器,负责将请求,转发到对应的 Invoker 对象,执行逻辑,返回结果。当然,本文不细分享,放在 同步调用 一文详细解析。

5. Codec

ExchangeCodec 中,我们看到对 Request 和 Response 的通用解析。但是它是不满足在 dubbo:// 协议中,对 RpcInvocationRpcResult 作为 内容体( Body ) 的编解码的需要的。

另外,在 dubbo:// 协议中,支持 参数回调 的特性,也是需要在编解码做一些特殊逻辑

下面,让我们来一起瞅瞅代码实现吧。

5.1 DubboCountCodec

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec ,实现 Codec2 接口,支持多消息的编解码器。

5.1.1 构造方法

1
2
3
4
/**
 * 编解码器
 */
private DubboCodec codec = new DubboCodec();
  • 在 Dubbo Client 和 Server 创建的过程,我们看到设置了编解码器为 “dubbo”,从而通过 Dubbo SPI 机制,加载到 DubboCountCodec。相关内容如下:
1
2
3
4
5
6
7
8
// DubboProtocol#createServer(...)
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);

// DubboProtocol#initClient(...)
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);

// META-INF/dubbo/internal/com.alibaba.dubbo.remoting.Codec2
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
  • 实际编解码的逻辑,使用 DubboCodec ,即 codec 属性。

5.1.2 编码

1
2
3
4
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    codec.encode(channel, buffer, msg);
}

5.1.3 解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // 记录当前读位置
    int save = buffer.readerIndex();
    // 创建 MultiMessage 对象
    MultiMessage result = MultiMessage.create();
    do {
        // 解码
        Object obj = codec.decode(channel, buffer);
        // 输入不够,重置读进度
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            buffer.readerIndex(save);
            break;
        // 解析到消息
        } else {
            // 添加结果消息
            result.addMessage(obj);
            // 记录消息长度到隐式参数集合,用于 MonitorFilter 监控
            logMessageLength(obj, buffer.readerIndex() - save);
            // 记录当前读位置
            save = buffer.readerIndex();
        }
    } while (true);
    // 需要更多的输入
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    // 返回解析到的消息
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}
  • 包含两块逻辑:1)多消息解析的支持。2)记录每条消息的长度,用于 MonitorFilter 监控。
  • 第 4 行:记录当前读位置,用于下面计算每条消息的长度。
  • 第 6 行:创建 MultiMessage 对象。MultiMessageHandler 支持对它的处理分发。
  • 第 7 至 23 行:循环解析消息,直到结束。
  • 第 9 行:调用 DubboCodec#decode(channel, buffer) 方法,解码。
  • 第 11 至 13 行:字节数组不够,重置读进度,结束解析。
  • 第 15 至 22 行:解析到消息,添加到 result。
    • 第 19 行:调用 #logMessageLength(obj, length) 方法,记录消息长度到隐式参数集合,用于 MonitorFilter 监控。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void logMessageLength(Object result, int bytes) {
    if (bytes <= 0) {
        return;
    }
    if (result instanceof Request) {
        try {
            ((RpcInvocation) ((Request) result).getData()).setAttachment(Constants.INPUT_KEY, String.valueOf(bytes)); // 请求
        } catch (Throwable e) {
            /* ignore */
        }
    } else if (result instanceof Response) {
        try {
            ((RpcResult) ((Response) result).getResult()).setAttachment(Constants.OUTPUT_KEY, String.valueOf(bytes)); // 响应
        } catch (Throwable e) {
            /* ignore */
        }
    }
}
  • 第 21 行:记录当前读位置,用于计算下一条消息的长度。
  • 第 24 至 27 行:需要更多的输入。
  • 第 28 至 32 行:返回结果。

5.2 DubboCodec

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec ,实现 Codec2 接口,继承 ExchangeCodec 类,Dubbo 编解码器实现类。

5.2.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 协议名
 */
public static final String NAME = "dubbo";
/**
 * 协议版本
 */
public static final String DUBBO_VERSION = Version.getVersion(DubboCodec.class, Version.getVersion());

/**
 * 响应 - 异常
 */
public static final byte RESPONSE_WITH_EXCEPTION = 0;
/**
 * 响应 - 正常(空返回)
 */
public static final byte RESPONSE_VALUE = 1;
/**
 * 响应 - 正常(有返回)
 */
public static final byte RESPONSE_NULL_VALUE = 2;

/**
 * 方法参数 - 空(参数)
 */
public static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
/**
 * 方法参数 - 空(类型)
 */
public static final Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0];

5.2.2 编码内容体

5.2.2.1 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;

    // 写入 `dubbo` `path` `version`
    out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

    // 写入方法、方法签名、方法参数集合
    out.writeUTF(inv.getMethodName());
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
    Object[] args = inv.getArguments();
    if (args != null) {
        for (int i = 0; i < args.length; i++) {
            out.writeObject(CallbackServiceCodec.encodeInvocationArgument(channel, inv, i));
        }
    }

    // 写入隐式传参集合
    out.writeObject(inv.getAttachments());
}
  • 胖友看下代码注释。
  • 编码 RpcInvocation 对象,写入需要编码的字段。
  • 对应的解码,在 DecodeableRpcInvocation 中。
  • 第 16 行:调用 CallbackServiceCodec#encodeInvocationArgument(…) 方法,编码参数。主要用于 参数回调 功能,后面的文章,详细解析。

5.2.2.2 响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
    Result result = (Result) data;

    Throwable th = result.getException();
    // 正常
    if (th == null) {
        Object ret = result.getValue();
        // 空返回
        if (ret == null) {
            out.writeByte(RESPONSE_NULL_VALUE);
        // 有返回
        } else {
            out.writeByte(RESPONSE_VALUE);
            out.writeObject(ret);
        }
    // 异常
    } else {
        out.writeByte(RESPONSE_WITH_EXCEPTION);
        out.writeObject(th);
    }
}
  • 胖友看下代码注释。
  • 编码 Result 对象,写入需要编码的字段。
  • 对应的解码,在 DecodeableRpcResult 中。

5.2.3 解码内容体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2];
    // 获得 Serialization 对象
    byte proto = (byte) (flag & SERIALIZATION_MASK);
    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
    // 获得请求||响应编号
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    // 解析响应
    if ((flag & FLAG_REQUEST) == 0) {
        // decode response.
        Response res = new Response(id);
        // ... 省略代码
        return res;
    // 解析请求
    } else {
        // decode request.
        Request req = new Request(id);
        // ... 省略代码
        return req;
    }
}
  • 第 4 至 6 行:调用 CodeSupport#getSerialization(url, proto) 方法,获得 Serialization 对象,用于下面反序列化内容体的每个字段。
  • 第 9 行:获得请求或响应的编号。
  • 第 10 至 15 行:解析响应( Response )。
  • 第 16 至 22 行:解析请求( Request )。

5.2.3.1 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// decode response.
Response res = new Response(id);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
    res.setEvent(Response.HEARTBEAT_EVENT);
}
// 设置状态
// get status.
byte status = header[3];
res.setStatus(status);
// 正常响应状态
if (status == Response.OK) {
    try {
        Object data;
        // 解码心跳事件
        if (res.isHeartbeat()) {
            data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
        // 解码其它事件
        } else if (res.isEvent()) {
            data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
        // 解码普通响应
        } else {
            DecodeableRpcResult result;
            // 在通信框架(例如,Netty)的 IO 程程,解码
            if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
                result.decode();
            // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
            } else {
                result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
            }
            data = result;
        }
        // 设置结果
        res.setResult(data);
    } catch (Throwable t) {
        if (log.isWarnEnabled()) {
            log.warn("Decode response failed: " + t.getMessage(), t);
        }
        res.setStatus(Response.CLIENT_ERROR);
        res.setErrorMessage(StringUtils.toString(t));
    }
// 异常响应状态
} else {
    res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
  • 胖友看下代码注释。我们重点讲下可能比较绕的地方。
  • 第 21 至 33 行:解码普通响应。我们可以看到代码分成【第 25 至 27 行】【第 28 至 31 行】两段
    • 相同点,使用 DecodeableRpcResult 解码。前者,比较好理解,【第 27 行】已经调用;后者,在 DecodeHandler 中,才最终调用 DecodeableRpcResult#decode() 方法。
    • 差异点,使用哪个线程解码。前者,还是比较好理解,当前线程,即通信框架(例如,Netty)的 IO 程程。后者,Dubbo ThreadPool 线程中。
    • decode.in.io 配置项,目前在 Dubbo 文档中,并未说明,应该是性能调优,具体笔者还没测试过。嘿嘿。

5.2.3.2 响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
// 是否需要响应
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
    req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
    Object data;
    // 解码心跳事件
    if (req.isHeartbeat()) {
        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
    // 解码其它事件
    } else if (req.isEvent()) {
        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
    // 解码普通请求
    } else {
        // 在通信框架(例如,Netty)的 IO 线程程,解码
        DecodeableRpcInvocation inv;
        if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
            inv = new DecodeableRpcInvocation(channel, req, is, proto);
            inv.decode();
        // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
        } else {
            inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
        }
        data = inv;
    }
    req.setData(data);
} catch (Throwable t) {
    if (log.isWarnEnabled()) {
        log.warn("Decode request failed: " + t.getMessage(), t);
    }
    // bad request
    req.setBroken(true);
    req.setData(t);
}
return req;
  • 「5.2.3.1 请求」 类似,差异点在使用 DecodeableRpcInvocation
  • 胖友看下代码注释。

5.3 DecodeableRpcInvocation

com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation ,实现 Codec 和 Decodeable 接口,继承 RpcInvocation 类,可解码的 RpcInvocation 实现类。

当服务消费者,调用服务提供者,前者编码的 RpcInvocation 对象,后者解码成 DecodeableRpcInvocation 对象。

从目前的代码实现来看,Codec 接口,可不实现。

5.3.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * 通道
 */
private Channel channel;
/**
 * Serialization 类型编号
 */
private byte serializationType;
/**
 * 输入流
 */
private InputStream inputStream;
/**
 * 请求
 */
private Request request;
/**
 * 是否已经解码完成
 */
private volatile boolean hasDecoded;

5.3.2 解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Override
public void decode() {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
            }
            request.setBroken(true);
            request.setData(e);
        } finally {
            hasDecoded = true;
        }
    }
}

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType).deserialize(channel.getUrl(), input);

    // 解码 `dubbo` `path` `version`
    setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
    setAttachment(Constants.PATH_KEY, in.readUTF());
    setAttachment(Constants.VERSION_KEY, in.readUTF());

    // 解码方法、方法签名、方法参数集合
    setMethodName(in.readUTF());
    try {
        Object[] args;
        Class<?>[] pts;
        String desc = in.readUTF();
        if (desc.length() == 0) {
            pts = DubboCodec.EMPTY_CLASS_ARRAY;
            args = DubboCodec.EMPTY_OBJECT_ARRAY;
        } else {
            pts = ReflectUtils.desc2classArray(desc);
            args = new Object[pts.length];
            for (int i = 0; i < args.length; i++) {
                try {
                    args[i] = in.readObject(pts[i]);
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument failed: " + e.getMessage(), e);
                    }
                }
            }
        }
        setParameterTypes(pts);

        // 解码隐式传参集合
        Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
        if (map != null && map.size() > 0) {
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            attachment.putAll(map);
            setAttachments(attachment);
        }

        // 进一步解码方法参数,主要为了参数返回
        // decode argument ,may be callback
        for (int i = 0; i < args.length; i++) {
            args[i] = CallbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
        }
        setArguments(args);
    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data failed.", e));
    } finally {
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
    }
    return this;
}
  • 胖友看下代码注释。

5.4 DecodeableRpcResult

和 DecodeableRpcInvocation 一致。

com.alibaba.dubbo.rpc.protocol.dubbo.DecodeableRpcResult ,实现 Codec 和 Decodeable 接口,继承 RpcResult 类,可解码的 RpcResult 实现类。

当服务提供者者,返回服务消费者调用结果,前者编码的 RpcResult 对象,后者解码成 DecodeableRpcResult 对象。

从目前的代码实现来看,Codec 接口,可不实现。

5.4.1 构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
 * 通道
 */
private Channel channel;
/**
 * Serialization 类型编号
 */
private byte serializationType;
/**
 * 输入流
 */
private InputStream inputStream;
/**
 * 请求
 */
private Response response;
/**
 * Invocation 对象
 */
private Invocation invocation;
/**
 * 是否已经解码完成
 */
private volatile boolean hasDecoded;

5.4.2 解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public void decode() {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc result failed: " + e.getMessage(), e);
            }
            response.setStatus(Response.CLIENT_ERROR);
            response.setErrorMessage(StringUtils.toString(e));
        } finally {
            hasDecoded = true;
        }
    }
}

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType).deserialize(channel.getUrl(), input);

    // 读取标记位
    byte flag = in.readByte();
    switch (flag) {
        case DubboCodec.RESPONSE_NULL_VALUE: // 无返回值
            break;
        case DubboCodec.RESPONSE_VALUE: // 有返回值
            try {
                Type[] returnType = RpcUtils.getReturnTypes(invocation);
                setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                        (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                // 返回结果:Type[]{method.getReturnType(), method.getGenericReturnType()}
                                : in.readObject((Class<?>) returnType[0], returnType[1])));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        case DubboCodec.RESPONSE_WITH_EXCEPTION: // 异常
            try {
                Object obj = in.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                }
                setException((Throwable) obj);
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
    }
    if (in instanceof Cleanable) {
        ((Cleanable) in).cleanup();
    }
    return this;
}
  • 胖友看下代码注释。

666. 彩蛋

知识星球

知识星球

清明节,扫代码第一波。

本文由作者按照 CC BY 4.0 进行授权