文章

NIO服务器(二)之Transport层

NIO服务器(二)之Transport层

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(一)之抽象 API》 一文,分享 dubbo-remoting-api 模块, transport 包,网络传输层

transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec

涉及的类图如下:

类图

  • 白色部分,为通用接口。
  • 蓝色部分,为 transport 包下的类。
  • 整个类图,我们分成六个部分:
    • Client
    • Server
    • Channel
    • ChannelHandler
    • Codec
    • Dispacher
  • 从流程上来说,我们分成:
    • Server
      • 启动
      • 关闭
    • Client
      • 启动
      • 关闭
    • ChannelHandler
      • 处理连接
      • 处理断开
      • 发送消息
      • 接收消息
      • 处理异常

艿艿的旁白:涉及较多类和流程,内容不是很线性,可能分享的比较凌乱,还望胖友谅解。建议,读 2-3 遍,并且做一些调试。

2. AbstractPeer

com.alibaba.dubbo.remoting.transport.AbstractPeer ,实现 Endpoint、ChannelHandler 接口,Peer 抽象类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * 通道处理器
 */
private final ChannelHandler handler;
/**
 * URL
 */
private volatile URL url;
/**
 * 正在关闭
 *
 * {@link #startClose()}
 */
// closing closed means the process is being closed and close is finished
private volatile boolean closing;
/**
 * 关闭完成
 *
 * {@link #close()}
 */
private volatile boolean closed;

public AbstractPeer(URL url, ChannelHandler handler) {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    this.url = url;
    this.handler = handler;
}

  • handler 属性,通道处理器,通过构造方法传入。实现的 ChannelHandler 的接口方法,直接调用 handler 的方法,进行执行逻辑处理。
    • 参见代码:传送门
    • 这种方式在设计模式中被称作 “装饰模式“。在下文中,我们会看到大量的装饰模式的使用。实际上,这也是 dubbo-remoting 抽象 API + 实现最核心的方式之一。
  • url 属性,URL,通过构造方法传入。通过该属性,传递 Dubbo 服务引用和服务暴露的配置项
  • closing 属性,正在关闭,调用 #startClose() 方法,变更。
  • closed 属性,关闭完成,调用 #close() 方法,变更。

发送消息

1
2
3
4
@Override
public void send(Object message) throws RemotingException {
    send(message, url.getParameter(Constants.SENT_KEY, false));
}

其他方法

胖友点击 AbstractPeer ,再看看所有的方法。

2.1 AbstractEndpint

com.alibaba.dubbo.remoting.transport.AbstractPeer.AbstractEndpint ,实现 Resetable 接口,继承 AbstractPeer 抽象类,端点抽象类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * 编解码器
 */
private Codec2 codec;
/**
 * 超时时间
 */
private int timeout;
/**
 * 连接超时时间
 */
private int connectTimeout;

public AbstractEndpoint(URL url, ChannelHandler handler) {
    super(url, handler);
    this.codec = getChannelCodec(url);
    this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}

  • codec 属性,编解码器。在构造方法中,可以看到调用 #getChannelCodec(url) 方法,基于 url 参数,加载对应的 Codec 实现对象。代码如下:
1
2
3
4
5
6
7
8
9
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        // 例如,在 DubboProtocol 中,会获得 DubboCodec
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}

  • 第 3 行:基于 Dubbo SPI 机制,加载对应的 Codec 实现对象。例如,在 DubboProtocol 中,会获得 DubboCodec 对象。
  • 第 6 行:Codec 接口,已经废弃了,目前 Dubbo 项目里,也没有它的拓展实现。

重置属性

#reset(url) 实现方法,使用新的 url 属性,可重置 codec、timeout、connectTimeout 属性。 已经添加了谅解,胖友点击可看。

3. Client

3.1 AbstractClient

com.alibaba.dubbo.remoting.transport.AbstractClient ,实现 Client 接口,继承 AbstractEndpoint 抽象类,客户端抽象类,重点实现了公用的重连逻辑,同时抽象了连接等模板方法,供子类实现。抽象方法如下:

1
2
3
4
5
6
7
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;

protected abstract void doConnect() throws Throwable;
protected abstract void doDisConnect() throws Throwable;

protected abstract Channel getChannel();

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
 * 重连定时任务执行器
 */
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
/**
 * 发送消息时,若断开,是否重连
 */
private final boolean send_reconnect;
/**
 * 重连 warning 的间隔.(waring多少次之后,warning一次) //for test
 */
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
/**
 * 关闭超时时间
 */
private final long shutdown_timeout;
/**
 * 线程池
 *
 * 在调用 {@link #wrapChannelHandler(URL, ChannelHandler)} 时,会调用 {@link com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler} 创建
 */
protected volatile ExecutorService executor;

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    // 从 URL 中,获得重连相关配置项
    send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
    // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
    reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

    // 初始化客户端
    try {
        doOpen();
    } catch (Throwable t) {
        close(); // 失败,则关闭
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }

    // 连接服务器
    try {
        // connect.
        connect();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
        }
    } catch (RemotingException t) {
        if (url.getParameter(Constants.CHECK_KEY, true)) {
            close(); // 失败,则关闭
            throw t;
        } else {
            logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                    + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
        }
    } catch (Throwable t) {
        close(); // 失败,则关闭
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }

    // 获得线程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
            .get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
            .remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

  • reconnectExecutorService 属性,重连定时任务执行器。在客户端连接服务端时,会创建后台任务,定时检查连接,若断开,会进行重连。
  • 第 27 至 31 行:从 URL 中,获得重连相关配置项
  • 第 33 至 41 行:调用 抽象 #doOpen() 方法,初始化客户端。若异常,调用 #close() 方法,进行关闭。
  • 第 43 至 63 行:调用 实现 #connect() 方法,连接服务器。若异常,调用 #close() 方法,进行关闭。
    • 第 51 至 57 行:若是连接失败 RemotingException ,若开启了 启动时检查,则调用 #close() 方法,进行关闭。
  • 第 66 至 69 行:从 DataStore 中,获得线程池。
    • DataStore 在 store(dubbo-common 模块,包下实现)。目前的实现比较简单,可以认为是 ConcurrentMap<String, ConcurrentMap<String, Object>> 的集合。胖友可以自己看相关实现。
    • 此处的线程池,实际就是 《Dubbo 用户指南 —— 线程模型》 线程池 中说的。在 「8. Dispacher」 中,详细解析。

连接服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
 * 连接锁,用于实现发起连接和断开连接互斥,避免并发。
 */
private final Lock connectLock = new ReentrantLock();

protected void connect() throws RemotingException {
    // 获得锁
    connectLock.lock();
    try {
        // 已连接,
        if (isConnected()) {
            return;
        }
        // 初始化重连线程
        initConnectStatusCheckCommand();
        // 执行连接
        doConnect();
        // 连接失败,抛出异常
        if (!isConnected()) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
        // 连接成功,打印日志
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", channel is " + this.getChannel());
            }
        }
        // 设置重连次数归零
        reconnect_count.set(0);
        // 设置未打印过错误日志
        reconnect_error_log_flag.set(false);
    } catch (RemotingException e) {
        throw e;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                + ", cause: " + e.getMessage(), e);
    } finally {
        // 释放锁
        connectLock.unlock();
    }
}

  • 第 3 行:获得锁。在连接和断开连接时,通过锁,避免并发冲突。
  • 第 5 至 8 行:调用 #isConnected() 方法,判断连接状态。若已经连接,就不重复连接。代码如下:
1
2
3
4
5
@Override
public boolean isConnected() {
    Channel channel = getChannel();
    return channel != null && channel.isConnected();
}

  • 该方法,是因为实现 Channel 接口( Client 实现 Channel 接口 ),所以需要实现的。我们可以看到,实际方法内部,调用的是 channel 对象,进行判断。其它实现 Channel 的方法,也是这么处理的,例如 #getAttribute(key) 等方法。

  • 第 10 行:调用 #initConnectStatusCheckCommand() 方法,初始化重连线程
    • 方法会复杂一些,不杂糅在这里讲。
  • 第 14 至 17 行:连接失败,抛出异常 RemotingException 。
  • 第 18 至 25 行:连接成功,打印日志。
  • 第 26 至 29 行:设置重连次数归零,打印过错误日志状态为否。下面,我们会看到这些状态字段的变更。
  • 第 38 行:释放锁。

初始化重连线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
 * 重连次数
 */
private final AtomicInteger reconnect_count = new AtomicInteger(0);
/**
 * 重连时,是否已经打印过错误日志。
 */
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
/**
 * 重连执行任务 Future
 */
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
/**
 * 最后成功连接时间
 */
// the last successed connected time
private long lastConnectedTime = System.currentTimeMillis();

private synchronized void initConnectStatusCheckCommand() {
    //reconnect=false to close reconnect
    // 获得获得重连频率,默认开启。
    int reconnect = getReconnectParam(getUrl());
    // 若开启重连功能,创建重连线程
    if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
        // 创建 Runnable 对象
        Runnable connectStatusCheckCommand = new Runnable() {
            public void run() {
                try {
                    // 未连接,重连
                    if (!isConnected()) {
                        connect();
                    // 已连接,记录最后连接时间
                    } else {
                        lastConnectedTime = System.currentTimeMillis();
                    }
                } catch (Throwable t) {
                    // 超过一定时间未连接上,才打印异常日志。并且,仅打印一次。默认,15 分钟。
                    String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                    // wait registry sync provider list
                    if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                        if (!reconnect_error_log_flag.get()) {
                            reconnect_error_log_flag.set(true);
                            logger.error(errorMsg, t);
                            return;
                        }
                    }
                    // 每一定次发现未重连,才打印告警日志。默认,1800 次,1 小时。
                    if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                        logger.warn(errorMsg, t);
                    }
                }
            }
        };
        // 发起定时任务
        reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
    }
}

  • 第 4 行:调用 #getReconnectParam(url) 方法,获得重连频率。默认开启,2000 毫秒。
    • 代码比较简单,胖友自己点击方法查看。
  • 第 6 至 38 行:若开启重连功能,创建重连线程。
    • 第 8 至 35 行:创建 Runnable 对象。
      • 第 11 至 13 行:未连接时,调用 #connect() 方法,进行重连。
      • 第 14 至 17 行:已连接时,记录最后连接时间。
      • 第 18 至 33 行:符合错误告警条件时,打印 errorwarn 日志。为什么要符合条件才打印呢?之前也和朋友聊起来过,线上因为中间件组件,打印了太多的日志,结果整个 JVM 崩了。特别在网络场景 + 大量”无限”重试的场景,特别容易打出满屏的日志。这块,我们可以学习下。另外,Eureka 在集群同步,也有类似处理。
    • 第 36 行:发起任务,定时检查,是否需要重连。
  • reconnect=false to close reconnect,从目前代码上来看,未实现 #reset(url) 方法,在 URL 的 reconnect=false 配置项时,关闭重连线程。

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void send(Object message, boolean sent) throws RemotingException {
    // 未连接时,开启重连功能,则先发起连接
    if (send_reconnect && !isConnected()) {
        connect();
    }
    // 发送消息
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    channel.send(message, sent);
}

包装通道处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Constants.java
public static final String DEFAULT_CLIENT_THREADPOOL = "cached";

protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";

/**
 * 包装通道处理器
 *
 * @param url URL
 * @param handler 被包装的通道处理器
 * @return 包装后的通道处理器
 */
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
    // 设置线程名
    url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    // 设置使用的线程池类型
    url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
    // 包装通道处理器
    return ChannelHandlers.wrap(handler, url);
}

  • 第 10 行:调用 ExecutorUtil#setThreadName(url, CLIENT_THREAD_POOL_NAME) 方法,设置线程名,即 URL.threadname=xxx。代码如下:
1
2
3
4
5
6
public static URL setThreadName(URL url, String defaultName) {
    String name = url.getParameter(Constants.THREAD_NAME_KEY, defaultName);
    name = new StringBuilder(32).append(name).append("-").append(url.getAddress()).toString();
    url = url.addParameter(Constants.THREAD_NAME_KEY, name);
    return url;
}

  • 注意,线程名中,包含 URL 的地址信息

  • 第 12 行:设置线程类型,即 URL.threadpool=xxx。默认情况下,使用 "cached" 类型,这个和 Server 是不同的,下面我们会看到。
  • 第 14 行:调用 「8. Dispacher」 ChannelHandlers#wrap(handler, url) 方法,包装通道处理器。这里我们不细说,在 中,结合解析。
  • 这是一个非常关键的方法,在例如 NettyClient 等里,都会调用该方法。

其他方法

如下方法比较简单,艿艿就不重复啰嗦了。

子类类图

Client子类类图

3.2 ClientDelegate

com.alibaba.dubbo.remoting.transport.ClientDelegate ,实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 client 属性的方法。

目前 dubbo-rpc-default 模块中,ChannelWrapper 继承了 ClientDelegate 类。但实际上,ChannelWrapper 重新实现了所有的方法,并且,并未复用任何方法。所以,ClientDelegate 目前用途不大。

4. Server

4.1 AbstractServer

com.alibaba.dubbo.remoting.transport.AbstractServer ,实现 Server 接口,继承 AbstractEndpoint 抽象类,服务器抽象类,重点实现了公用的逻辑,同时抽象了开启、关闭等模板方法,供子类实现。抽象方法如下:

1
2
3
protected abstract void doOpen() throws Throwable;

protected abstract void doClose() throws Throwable;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
 * 线程池
 */
ExecutorService executor;
/**
 * 服务地址
 */
private InetSocketAddress localAddress;
/**
 * 绑定地址
 */
private InetSocketAddress bindAddress;
/**
 * 服务器最大可接受连接数
 */
private int accepts;
/**
 * 空闲超时时间,单位:毫秒
 */
private int idleTimeout; //600 seconds

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    // 服务地址
    localAddress = getUrl().toInetSocketAddress();
    // 绑定地址
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = NetUtils.ANYHOST;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    // 服务器最大可接受连接数
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    // 空闲超时时间
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

    // 开启服务器
    try {
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }

    // 获得线程池
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

  • 第 24 至 36 行:从 URL 中,加载 localAddressbindAddressacceptsidleTimeout 配置项。比较难理解的,可能是两个地址属性,如下是比例提供的一个例子:

地址配置示例

1
2
- 配置项可在 [#reset(url)](https://github.com/YunaiV/dubbo/blob/31b3f1e868ed2d62c97a26b5cd233a921ce2205a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractServer.java#L80-L129) 方法中,重置属性。 - 第 38 至 47 行:调用 `#doOpen()` 方法,开启服务器。 - 第 49 至 52 行:从 [DataStore](https://github.com/YunaiV/dubbo/blob/31b3f1e868ed2d62c97a26b5cd233a921ce2205a/dubbo-common/src/main/java/com/alibaba/dubbo/common/store/DataStore.java) 中,获得线程池。
- `fixme replace this with better method` **官方**说明,在这块实现上,也不是很满意,后面会优化掉。

被客户端连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void connected(Channel ch) throws RemotingException {
    // If the server has entered the shutdown process, reject any new connection
    if (this.isClosing() || this.isClosed()) {
        logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
        ch.close();
        return;
    }

    // 超过上限,关闭新的链接
    Collection<Channel> channels = getChannels();
    if (accepts > 0 && channels.size() > accepts) {
        logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
        ch.close(); // 关闭新的链接
        return;
    }
    // 连接
    super.connected(ch);
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
@Override
public void send(Object message, boolean sent) throws RemotingException {
    // 获得所有的客户端的通道
    Collection<Channel> channels = getChannels();
    // 群发消息
    for (Channel channel : channels) {
        if (channel.isConnected()) {
            channel.send(message, sent);
        }
    }
}

其他方法

如下方法比较简单,艿艿就不重复啰嗦了。

子类类图

Server子类类图

4.2 ServerDelegate

com.alibaba.dubbo.remoting.transport.ServerDelegate ,实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 server 属性的方法。

目前 dubbo-remoting-p2p 模块中,PeerServer 会继承该类,后续再看。

5. Channel

5.1 AbstractChannel

com.alibaba.dubbo.remoting.transport.AbstractChannel ,实现 Channel 接口,实现 AbstractPeer 抽象类,通道抽象类。

发送消息

1
2
3
4
5
6
7
8
@Override
public void send(Object message, boolean sent) throws RemotingException {
    if (isClosed()) {
        throw new RemotingException(this, "Failed to send message "
                + (message == null ? "" : message.getClass().getName()) + ":" + message
                + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
    }
}

  • 具体的发送方法,子类实现。在 AbstractChannel 中,目前只做状态检查。

子类类图

Channel子类类图

5.2 ChannelDelegate

com.alibaba.dubbo.remoting.transport.ChannelDelegate ,实现 Channel 接口,通道装饰者实现类。在每个实现的方法里,直接调用被装饰的 channel 属性的方法。

目前 Dubbo 中,暂未用到。

7. ChannelHandler

7.1 ChannelHandlerAdapter

com.alibaba.dubbo.remoting.transport.ChannelHandlerAdapter ,实现 ChannelHandler 接口,通道处理器适配器,每个方法为空实现。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void connected(Channel channel) throws RemotingException { }

@Override
public void disconnected(Channel channel) throws RemotingException { }

@Override
public void sent(Channel channel, Object message) { }

@Override
public void received(Channel channel, Object message) throws RemotingException { }

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException { }

子类,可继承它,仅实现想要的方法。

7.2 ChannelHandlerDispatcher

com.alibaba.dubbo.remoting.transport.ChannelHandlerDispatcher ,实现 ChannelHandler 接口,通道处理器调度器。在它内部,有一个通道处理器数组 channelHandlers 属性。

每个实现的方法,都会循环调用 channelHandlers 的方法,例如:

1
2
3
4
5
6
7
8
9
public void received(Channel channel, Object message) {
    for (ChannelHandler listener : channelHandlers) {
        try {
            listener.received(channel, message);
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }
}

搜索了下 ChannelHandlerDispatcher 的使用情况,主要用在 dubbo-remoting-p2p 的 AbstractGroup 中。

7.3 ChannelHandlerDelegate

com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate ,实现 ChannelHandler 接口,通道处理器装饰者接口。方法如下:

1
ChannelHandler getHandler();

正如,我们在上文中说道,装饰器模式,在 dubbo-remoting-api 扮演了非常重要的角色,那么最佳演员就是 ChannelHandlerDelegate 们。下面,开始他们的表演。

7.3.1 AbstractChannelHandlerDelegate

com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate ,实现 ChannelHandlerDelegate 接口,通道处理器装饰者抽象实现类。在每个实现的方法里,直接调用被装饰的 handler 属性的方法。

7.3.2 DecodeHandler

com.alibaba.dubbo.remoting.transport.DecodeHandler ,实现 AbstractChannelHandlerDelegate 抽象类,解码处理器,处理接收到的消息,实现了 Decodeable 接口的情况。

覆写了 #received(channel, message) 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }

    if (message instanceof Request) {
        decode(((Request) message).getData());
    }

    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }

    handler.received(channel, message);
}

  • 第 3 至 5 行:当消息是 Decodeable 类型时,调用 #decode(message) 方法,解析消息。
  • 第 7 至 9 行:当消息是 Request 类型时,调用 #decode(message) 方法,解析 data 属性。
  • 第 11 至 13 行:当消息是 Response 类型时,调用 #decode(message) 方法,解析 result 属性。
  • 第 15 行:调用 ChannelHandler#received(channel, message) 方法,将消息交给委托的 handler,继续处理。 胖友是否感受到,装饰器模式的好处:通过组合的方式,实现功能的叠加。

解析消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void decode(Object message) {
    if (message != null && message instanceof Decodeable) {
        try {
            ((Decodeable) message).decode(); // 解析消息
            if (log.isDebugEnabled()) {
                log.debug(new StringBuilder(32).append("Decode decodeable message ").append(message.getClass().getName()).toString());
            }
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn(new StringBuilder(32).append("Call Decodeable.decode failed: ").append(e.getMessage()).toString(), e);
            }
        } // ~ end of catch
    } // ~ end of if
} // ~ end of method decode

  • 第 2 至 4 行:当类型是 Decodeable 时,调用 Decodeable#decode() 方法,进一步解析。
  • dubbo-rpc-default 项目中,DecodeableRpcInvocation 和 DecodeableRpcResult 实现 Decodeable 接口,后面我们来分享。

7.3.3 MultiMessageHandler

com.alibaba.dubbo.remoting.transport.MultiMessageHandler,实现 AbstractChannelHandlerDelegate 抽象类,多消息处理器,处理一次性接收到多条消息的情况。

覆写了 #received(channel, message) 方法

1
2
3
4
5
6
7
8
9
10
11
@Override
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) { // 多消息
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        handler.received(channel, message);
    }
}

  • 第 3 至 7 行:当消息是 MultiMessage 类型,即多消息循环,提交给 handler 处理。
  • 第 8 至 10 行:当单消息直接时,提交给 handler 处理。

在下面的文章,我们可以看到 ChannelHandlerDelegate 的组合使用的例子

8. Dispacher

本小节内容,对应 《Dubbo 用户指南 —— 线程模型》

简单概括这节,以接收消息举例子,代码如下:

1
2
3
executor.execute(new Runnable() {
    handler.received(channel, message)
});

将 ChannelHandler 的具体操作,调度到线程池中,这也是为什么这个模块叫 dispacher 的原因。

8.1 ChannelHandlers

com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers ,通道处理器工厂。在上文 「3.1 AbstractClient」,我们看到 AbstractClient#wrapChannelHandler(url, handler) 方法中,会调用 ChannelHandlers#wrap(url, handler) 方法。实际上,Server 部分也会有这样类似的逻辑,只是代码实现上暂未统一。以 dubbo-remoting-netty4 来举例子:

  • NettyClient :
1
2
3
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    super(url, wrapChannelHandler(url, handler));
}

  • NettyServer :
1
2
3
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) /* 设置线程名到 URL 上 */));
}

无论 Client 还是 Server,都是类似的,将传入的 handler,最终使用 ChannelHandlers 进行一次包装。OK,我们来看看包装通道处理器的具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
 * 单例
 */
private static ChannelHandlers INSTANCE = new ChannelHandlers();

public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(
            new HeartbeatHandler(
                    ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
            )
    );
}

  • 第 11 至 15 行:在这里,我们就看到了多个 ChannelHandlerDelegate 的组合。包括,第 15 行的,返回 Dispatcher#dispatch(handler, url) 方法,实际上也是一个 ChannelHandlerDelegate 对象。

8.2 Dispatcher 实现类

在 Dubbo 中,有多种 Dispatcher 的实现,如下:

FROM 《Dubbo 用户指南 —— 线程模型》

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

子类类图

Dispatcher子类类图

8.2.1 AllDispatcher

我们以 all 对应的 AllDispatcher 举例子,代码如下:

1
2
3
4
5
6
7
8
public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

在该类的 #dispatch(…) 的方法中,我们可以看到创建 AllChannelHandler 对象,并传入 handler 属性。聪慧如你,已经猜到 AllChannelHandler 也是 ChannelHandlerDelegate 类型。也就是说”线程模型“,也是通过装饰器模式,组合而成。

每个 Dispatcher 实现类,都对应一个 ChannelHandler 实现类。默认未配置的情况下,使用 AllDispatcher 调度。

8.2.2 AllChannelHandler

com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler ,实现 WrappedChannelHandler 抽象类。覆写 #connected(channel) 方法如下:

WrappedChannelHandler 是实现 ChannelHandlerDelegate 的抽象类,下文再看。

1
2
3
4
5
6
7
8
9
@Override
public void connected(Channel channel) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    }
}

  • 创建 ChannelEventRunnable 对象,提交给线程池执行。
  • 注意,传入的状态为 ChannelState.CONNECTED。不同的实现方法,对应不同的状态。

8.3 ChannelEventRunnable

com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable ,实现 Runnable 接口。代码比较简单,胖友自己看噢。主要分成三部分:

1
2
3
4
5
6
7
8
9
10
11
@Override
public void run() {
    switch (state) {
        case CONNECTED: handler.connected(channel); break;
        case DISCONNECTED: handler.disconnected(channel); break;
        case SENT: handler.sent(channel, message); break;
        case RECEIVED: handler.received(channel, message); break;
        case CAUGHT: handler.caught(channel, exception); break;
        default: logger.warn("unknown state: " + state + ", message is " + message);
    }
}

8.4 WrappedChannelHandler

com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler,实现 ChannelHandlerDelegate 接口,包装的 WrappedChannelHandler 实现类。

从目前的实现来看,WrappedChannelHandler 继承 AbstractChannelHandlerDelegate 更合适,因为 #connected(channel) 等,实现的方法都是相同的。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * 线程池
 */
protected final ExecutorService executor;
/**
 * 通道处理器
 */
protected final ChannelHandler handler;
/**
 * URL
 */
protected final URL url;

public WrappedChannelHandler(ChannelHandler handler, URL url) {
    this.handler = handler;
    this.url = url;

    // 创建线程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

    // 添加线程池到 DataStore 中
    String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
    if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
        componentKey = Constants.CONSUMER_SIDE;
    }
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

  • 第 19 行:基于 Dubbo SPI Adaptive 机制,创建线程池。
  • 第 21 至 27 行:添加线程池到 DataStore 中。这就是上文 AbstractClient 或 AbstractServer 从 DataStore 获得线程池的方式。当然,官方也说了,这种方式不是很优雅,有点奇淫技巧,未来会优化掉。

共享线程池

在 WrappedChannelHandler 中,有一个内置的共享线程池,如下:

1
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));

【TODO 8024】搞不懂,这个设计的意图,先mark留着。

子类类图

WrappedChannelHandler子类类图

9. Codec

9.1 CodecSupport

com.alibaba.dubbo.remoting.transport.CodecSupport,编解码工具类,提供查询 Serialization 的功能。

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * 序列化对象集合
 * key:序列化类型编号 {@link Serialization#getContentTypeId()}
 */
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
/**
 * 序列化名集合
 * key:序列化类型编号 {@link Serialization#getContentTypeId()}
 * value: 序列化拓展名
 */
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();

static {
    // 基于 Dubbo SPI ,初始化
    Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
    for (String name : supportedExtensions) {
        Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
        byte idByte = serialization.getContentTypeId();
        if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
            logger.error("Serialization extension " + serialization.getClass().getName()
                    + " has duplicate id to Serialization extension "
                    + ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
                    + ", ignore this Serialization extension");
            continue;
        }
        ID_SERIALIZATION_MAP.put(idByte, serialization);
        ID_SERIALIZATIONNAME_MAP.put(idByte, name);
    }
}

Dubbo 提供了多种序列化方式,此处初始化结果,如下图:

序列化方式集合

查找 Serialization 对象

1
2
3
4
5
6
7
8
9
10
11
12
public static Serialization getSerialization(URL url, Byte id) throws IOException {
    Serialization serialization = getSerializationById(id);
    String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); // 默认,hessian2
    // 出于安全的目的,针对 JDK 的序列化方式(对应编号为 3、4、7),检查连接到服务器的 URL 和实际传输的数据,协议是否一致。
    // https://github.com/apache/incubator-dubbo/issues/1138
    // Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose.
    if (serialization == null
            || ((id == 3 || id == 7 || id == 4) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
        throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
    }
    return serialization;
}

在最新的 Dubbo 版本中,已经将 serialization 模块,从 dubbo-common 中,独立成 dubbo-serialization。So,我们后面开一个系列来分享。

9.2 AbstractCodec

com.alibaba.dubbo.remoting.transport.AbstractCodec,实现 Codec2 接口,提供如下公用方法:

子类类图

Codec子类类图

编解码器的实现,通过继承的方式,获得更多的功能。每一个 Codec2 类实现对不同消息的编解码。通过协议头来判断,具体使用哪个编解码逻辑。听起来有点绕,我们来看一段简化 ExchangeCodec 的 #decode(…) 例子:

1
2
3
4
5
6
7
8
9
10
11
12
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        // ... 省略
        return super.decode(channel, buffer, readable, header);
    }

    // ... 省略

    return decodeBody(channel, is, header);
}

  • 第 2 至 7 行:通过 magic number 判断到,并非信息交易的协议头(Dubbo Exchange),转交给父类 TelnetCodec 处理,一般此时是 Telnet 消息。
  • 第 8 至 11 行:通过 magic number 判断到,符合信息交易的协议头(Dubbo Exchange),ExchangeCodec 自己处理。

9.2.1 TransportCodec

com.alibaba.dubbo.remoting.transport.codec.TransportCodec,传输编解码器,使用 Serialization 进行序列化/反序列化,直接编解码。

编码消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
    // 获得反序列化的 ObjectOutput 对象
    OutputStream output = new ChannelBufferOutputStream(buffer);
    ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
    // 写入 ObjectOutput
    encodeData(channel, objectOutput, message);
    objectOutput.flushBuffer();
    // 释放
    if (objectOutput instanceof Cleanable) {
        ((Cleanable) objectOutput).cleanup();
    }
}

  • 第 3 至 5 行:获得对应的 Serialization 对象,并创建用于反序列化的 ObjectOutput 对象。不同的 Serialization 实现,对应不同的 ObjectOutput 实现类。这里,我们只要读懂大体流程,详细的,我们后面文章见。
  • 第 7 行:调用 #encodeData(channel, objectOutput, message) 方法,写入 ObjectOutput。代码如下:
1
2
3
4
5
6
7
protected void encodeData(Channel channel, ObjectOutput output, Object message) throws IOException {
    encodeData(output, message);
}

protected void encodeData(ObjectOutput output, Object message) throws IOException {
    output.writeObject(message);
}

  • 第 9 至 12 行:释放资源。目前,仅有 kryo 的 KryoObjectInput、KryoObjectOutput 实现了 Cleanable 接口,需要释放资源。

解码消息

#decode(channel, buffer) 实现方法,和解码消息基本一致,胖友自己查看。

9.3 CodecAdapter

com.alibaba.dubbo.remoting.transport.codec.CodecAdapter,实现 Code2 接口,Codec 适配器,将 Codec 适配成 Codec2。

代码比较简单,胖友自己查看。

666. 彩蛋

代码比较多,如果不熟悉 Netty 等框架的胖友,可能会一脸懵逼的看到文末。建议胖友结合如下的代码:

1
2
3
4
5
6
7
8
9
10
// Netty.java
new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) {
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
        ch.pipeline().addLast("decoder", adapter.getDecoder()) // 解码
                    .addLast("encoder", adapter.getEncoder())  // 解码
                    .addLast("handler", nettyServerHandler); // 处理器
    }
}

在脑补 Debug + IDE Debug,多考虑下。

推荐阅读:

本文由作者按照 CC BY 4.0 进行授权