文章

NIO服务器(一)之抽象API

NIO服务器(一)之抽象API

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

从本小节开始,我们来分享 Dubbo 自己实现的 NIO 服务器,使用在 dubbo://thrift:// 协议上。

在 NIO 框架的选型上,强大的 Java 社区里有 mina、netty、grizzly 等,甚至 netty 提供了 3.x 和 4.x 的版本。那么该咋办呢?

Dubbo 开发团队的选择是:

  • API 层:
    • dubbo-remoting-api
  • 实现层:
    • dubbo-remoting-netty3
    • dubbo-remoting-netty4
    • dubbo-remoting-mina
    • dubbo-remoting-grizzly
    • dubbo-remoting-p2p

再配合上 Dubbo SPI 的机制,使用者可以自定义使用哪一种具体的实现。美滋滋。

2. 一览

还是老样子,笔者习惯性对代码量进行下统计,dubbo-remoting 的代码量如下图:

代码量

WTF !!! dubbo-remoting-api 的代码量竟然近万行

{__/}( • - •)/つ淡定 @胖友

我们来首先看一张图:

FROM 《Dubbo 开发指南 —— 框架设计》

整体设计

红框部分,Protocol => Exchange => Transport => Serialize 的调用顺序。

  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

在笔者初看 dubbo-remoting-api 的代码时,对 exchangetransport 的理解是比较模糊的。简单来说,exchangetransport 之上,构造了 Request,Response 模型,一个请求对应一个响应。这样的方式,才符合我们实际业务开发的需要。当然,即使是 Request,Response 也分成同步和异步返回,重要的是,能够一一映射

胖友如果有兴趣,可以看看:

看完以上知识,我们在回过头看 dubbo-remoting-api 的项目结构就清晰了:

dubbo-remoting-api

  • 最外层:通用接口。
  • buffer 包:缓冲区。
  • exchange 包:信息交换层
  • transporter 包:网络传输层
  • telnet 包:Telnet 命令

如上的每一层/包,我们都会独立一篇文章,进行分享。当然,dubbo-remoting-api 模块,只负责 API 层抽象和部分实现,最终能够真正通信,需要 dubbo-remoting-netty 等等模块来实现。对应的每一个实现模块,我们也是独立一篇文章。

是不是很清晰,美滋滋?

3. 最外层:通用接口

胖友先看看 《Netty4.0学习笔记系列之一:Server与Client的通讯》教程文章。在该文章中,我们可以看到使用 Netty 在四个类:

  • 1、HelloServer :server类,启动Netty server
  • 2、HelloServerInHandler:server的handler,接收客户端消息,并向客户端发送消息
  • 3、HelloClient:client类,建立于Netty server的连接
  • 4、HelloClientIntHandler:client的handler,接收server端的消息,并向服务端发送消息

恰好,和 dubbo-remoting-api 模块,定义的 Server、Client、ChannelHandler 接口对应。我们以 dubbo-remoting-netty4 模块的,举例子。整理如下:

上文dubbo-remoting-apidubbo-remoting-netty4
HelloServerServer 接口NettyServer 实现类
HelloServerInHandler:serverChannelHandler 接口NettyServerHandler 实现类
HelloClientClient 接口NettyClient 实现类
HelloServerInHandlerChannelHandler 接口NettyClientHandler 实现类

因为教程文章,以教程 Demo 为准,实际会有更多需要抽象的,例如:Codec 协议编解码,Dispatcher 消息等分发。胖友再来看看 dubbo:// 的处理流程:

FROM 《Dubbo 用户指南 —— dubbo://》

dubbo://

  • Transporter: mina, netty, grizzy
  • Serialization: dubbo, hessian2, java, json
  • Dispatcher: all, direct, message, execution, connection
  • ThreadPool: fixed, cached

如果这个图读不懂,没关系,下面我们来看看每个接口。后面,胖友可以回过头来理解这个图。

本文涉及的类图如下

类图

4. Endpoint

com.alibaba.dubbo.remoting.Endpoint ,端点接口。方法如下:

  • 属性相关
1
2
3
URL getUrl();
InetSocketAddress getLocalAddress();
ChannelHandler getChannelHandler();

  • 发送消息
1
2
void send(Object message) throws RemotingException;
void send(Object message, boolean sent) throws RemotingException;

  • 关系相关
1
2
3
4
void close();
void close(int timeout);
void startClose();
boolean isClosed();

Endpoint ,从中文上解释来说是,”端点“。从字面上来看,不太容易理解。在 dubbo-remoting-api 中,一个 Client 或 Server ,都是一个 Endpoint 。 不同系统的,Endpoint 代表的会略有差距,例如 SpringMVC 中,一个请求 Restful URL 也可以是一个 Endpoint ,胖友可以 Google 查询,理解更多。

4.1 Channel

com.alibaba.dubbo.remoting.Channel ,继承 Endpoint 接口,通道接口。方法如下:

  • 连接相关
1
2
InetSocketAddress getRemoteAddress();
boolean isConnected();

  • 属性相关
1
2
3
4
boolean hasAttribute(String key);
Object getAttribute(String key);
void setAttribute(String key, Object value);
void removeAttribute(String key);

和 Netty Channel 一致,通讯的载体。在后面的文章,我们会看到在 dubbo-remoting-netty4 项目中,NettyChannel 是 Dubbo Channel 的实现,内部有真正的 Netty Channel 属性,用于通讯

4.2 Client

com.alibaba.dubbo.remoting.Client ,实现 Endpoint 和 Channel 和 Resetable 接口,客户端接口。方法如下:

1
2
// 重连
void reconnect() throws RemotingException;

4.3 Server

com.alibaba.dubbo.remoting.Server ,继承 Endpoint 和 Resetable 接口,服务器接口。方法如下:

1
2
3
4
5
// 是否绑定本地端口,提供服务。即,是否启动成功,可连接,接收消息等。
boolean isBound();
// 获得连接上服务器的通道(客户端)们
Collection<Channel> getChannels();
Channel getChannel(InetSocketAddress remoteAddress);

4.3.1 Resetable

com.alibaba.dubbo.common.Resetable ,可重置接口。方法如下:

1
void reset(URL url);

Server 实现 Resetable 接口,在实现 #reset(url) 方法,用于根据新传入的 url 属性,重置自己内部的一些属性,例如 AbstractServer#reset(url) 方法。

5. ChannelHandler

com.alibaba.dubbo.remoting.ChannelHandler ,通道处理器接口。方法如下:

1
2
3
4
5
6
7
void connected(Channel channel) throws RemotingException;
void disconnected(Channel channel) throws RemotingException;

void sent(Channel channel, Object message) throws RemotingException;
void received(Channel channel, Object message) throws RemotingException;

void caught(Channel channel, Throwable exception) throws RemotingException;

和 Netty ChannelHandler 一致,负责 Channel 中的逻辑处理。在后面的文章,我们会看到在 dubbo-remoting-netty4 项目中,NettyServerHandler 是 Netty ChannelHandler 的实现,内部调用 Netty ChannelHandler 的方法,进行逻辑处理。

6. Transporter

com.alibaba.dubbo.remoting.Transporter ,网络传输接口。方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SPI("netty")
public interface Transporter {

    /**
     * Bind a server.
     *
     * 绑定一个服务器
     *
     * @param url     server url
     * @param handler 通道处理器
     * @return server 服务器
     * @throws RemotingException 当绑定发生异常时
     * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, Receiver, ChannelHandler)
     */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * Connect to a server.
     *
     * 连接一个服务器,即创建一个客户端
     *
     * @param url     server url 服务器地址
     * @param handler 通道处理器
     * @return client 客户端
     * @throws RemotingException 当连接发生异常时
     * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, Receiver, ChannelListener)
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

  • @SPI("netty") 拓展点注解,Dubbo SPI,默认为 "netty"。注意,此处的 netty 对应的是 netty3 ,因为 Dubbo 项目在开发时,netty4 并未发布。配置方式见 《Dubbo 用户指南 —— Netty4》 文档。
  • @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Server 实现,使用 URL.server 或 URL.transporter 属性。
  • @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Client 实现,使用 URL.client 或 URL.transporter 属性。

6.1 Transporters

com.alibaba.dubbo.remoting.Transporters ,Transporter 门面类。

友情提示:Facade 设计模式,参见 《 设计模式(九)外观模式Facade(结构型)》 文章。

#bind(String url, ChannelHandler… handler)静态方法,绑定一个服务器。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static Server bind(String url, ChannelHandler... handler) throws RemotingException {
    return bind(URL.valueOf(url), handler);
}

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    // 创建 handler
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 创建 Server 对象
    return getTransporter().bind(url, handler);
}

  • 和 Transporter#bind(url, handler) 方法,对应。
  • 第 12 至 18 行:创建 ChannelHandlerDispatcher handler。若 handlers 是多个,使用进行封装。在 ChannelHandlerDispatcher 中,会循环调用 handlers ,对应的方法。
  • 第 20 行:调用 #getTransporter() 方法,基于 Dubbo SPI 机制,获得 Transporter$Adaptive 对象。代码如下:
1
2
3
public static Transporter getTransporter() {
    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}

  • 第 20 行:调用 Transporter 实现对象 NettyTransporter Transporter#bind(url, handler) 方法,在 Transporter$Adaptive 对象中,会根据 url 参数,获得对应的(例如,NettyServer),从而创建对应的 Server 对象(例如,)。

另外,还有一个 #connect(url, handler)静态方法,连接一个服务器,即创建一个客户端。 和上面方法类似,胖友自己看咯。

7. Codec2

com.alibaba.dubbo.remoting.Codec2 ,编解码器接口。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * 编码
 *
 * @param channel 通道
 * @param buffer Buffer
 * @param message 消息
 * @throws IOException 当编码发生异常时
 */
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

/**
 * 解码
 *
 * @param channel 通道
 * @param buffer Buffer
 * @return 消息
 * @throws IOException 当解码发生异常时
 */
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;

  • @SPI("netty") 拓展点注解,Dubbo SPI。
  • @Adaptive({Constants.CODEC_KEY}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Codec2 实现,使用 URL.codec 属性。

另外,解码过程中,需要解决 TCP 拆包、粘包的场景,因此解码结果如下:

1
2
3
4
5
6
7
8
9
10
11
// Codec2.java
enum DecodeResult {
    /**
     * 需要更多输入
     */
    NEED_MORE_INPUT,
    /**
     * 忽略一些输入
     */
    SKIP_SOME_INPUT
}

7.1 Codec

com.alibaba.dubbo.remoting.Codec老的编解码器接口,被 Codec2 取代。

通过 CodecAdapter ,将 Codec 适配成 Codec2 。

7.2 Decodeable

com.alibaba.dubbo.remoting.Decodeable ,可解码的接口。方法如下:

1
2
// 解码
void decode() throws Exception;

8. Dispatcher

com.alibaba.dubbo.remoting.Dispatcher ,调度器接口。方法如下:

1
2
3
4
5
6
7
8
@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    // The last two parameters are reserved for compatibility with the old configuration
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}

  • @SPI(AllDispatcher.NAME) 拓展点注解,Dubbo SPI,默认为 "all"
  • @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 ChanelHander 实现,使用 URL.dispatcher 属性。
  • 为什么传入的 AllChannelHandler handler 参数,创建返回的还是 ChannelHandler 对象呢?感兴趣的胖友,可以提前看下和 《Dubbo 用户指南 —— 线程模型》 文章,后面见。

9. RemotingException

com.alibaba.dubbo.remoting.RemotingException ,实现 Exception 类,dubbo-remoting-api 的基础异常。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RemotingException extends Exception {

    /**
     * 本地地址
     */
    private InetSocketAddress localAddress;
    /**
     * 远程地址
     */
    private InetSocketAddress remoteAddress;

    // ... 省略方法
}

9.1 ExecutionException

com.alibaba.dubbo.remoting.ExecutionException ,实现 RemotingException 类,执行异常。代码如下:

1
2
3
4
5
6
7
8
9
public class ExecutionException extends RemotingException {

    /**
     * 请求
     */
    private final Object request;

    // ... 省略方法
}

9.2 TimeoutException

com.alibaba.dubbo.remoting.TimeoutException ,实现 RemotingException 类,超时异常。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TimeoutException extends RemotingException {

    /**
     * 客户端
     */
    public static final int CLIENT_SIDE = 0;
    /**
     * 服务端
     */
    public static final int SERVER_SIDE = 1;

    /**
     * 阶段
     */
    private final int phase;

    // ... 省略方法
}

666. 彩蛋

dubbo-remoting-api 模块,涉及到大量的封装,如果不太理解的胖友,建议多多调试。

毕业那年,也尝试过想,封装一个通用的 API 层,屏蔽 Netty、Mina 差异的细节,写的比较戳。现在看到 Dubbo 实现的方式,Get 新姿势了。

本文由作者按照 CC BY 4.0 进行授权