文章

NIO服务器(七)之Netty3实现

NIO服务器(七)之Netty3实现

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(六)之 Netty4 实现》 一文,分享在 dubbo-remoting-netty 中,Netty3 如何接入实现。

因为 Netty3 的接入代码,和 Netty4 基本是一致,主要是一些 Netty 不同版本的 API 差异,所以本文,会相对简介,只重点分享一些差异的地方。

涉及如下类:

类图

类图

友情提示:在当前版本,默认情况下,使用 Netty3 ,如果想配置成 Netty4 ,请参考文档:《Dubbo 用户指南 —— Netty4》

2. NettyTransporter

com.alibaba.dubbo.remoting.transport.netty.NettyTransporter ,和 dubbo-remoting-netty4 一致,省略。

3. NettyChannel

com.alibaba.dubbo.remoting.transport.netty.NettyChannel ,和 dubbo-remoting-netty4 一致,省略。

4. NettyHandler

com.alibaba.dubbo.remoting.transport.netty.NettyHandler ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyServer 和 NettyClient 的处理器,统一使用。这一点,不同于 dubbo-remoting-netty4 ,服务端和服务器使用不同的两个处理器。相比来说,dubbo-remoting-netty4 控制更精细,影响不大。

当然也有一个原因,Dubbo ChannelHandler 基于 Netty3 的 SimpleChannelHandler 为设计原型。因此,在 dubbo-remoting-netty4 中,需要将 DubboHandler 的方法,适配到 Netty4 的 ChannelDuplexHandler 的方法。

NettyHandler 和 HeaderExchangeHandler 类似。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Sharable
public class NettyHandler extends SimpleChannelHandler {

    /**
     * Dubbo Channel 集合
     */
    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>

    /**
     * URL
     */
    private final URL url;

    /**
     * Dubbo ChannelHandler
     */
    private final ChannelHandler handler;

    public NettyHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

    // ... 省略实现方法
}

实现方法

每个实现的方法,调用 handler 对应的方法。以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    // 创建 NettyChannel 对象
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        // 添加到 `channels` 中
        if (channel != null) {
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
        }
        // 提交给 `handler` 处理器。
        handler.connected(channel);
    } finally {
        // 移除 NettyChannel 对象,若已断开
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

其他方法,胖友自己查看。

5. NettyServer

com.alibaba.dubbo.remoting.transport.netty.NettyServer ,实现 Server 接口,继承 AbstractServer 抽象类,Netty 服务器实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
/**
 * 通道集合
 */
private Map<String, Channel> channels; // <ip:port, channel>

private ServerBootstrap bootstrap;

private org.jboss.netty.channel.Channel channel;

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

  • 和 dubbo-remoting-netty4 基本一致。

启动服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
protected void doOpen() {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();

    // 创建线程池
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));

    // 创建 ChannelFactory 对象
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    // 实例化 ServerBootstrap
    bootstrap = new ServerBootstrap(channelFactory);

    // 创建 NettyHandler 对象
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    // 设置 `channels` 属性
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            // 创建 NettyCodecAdapter 对象
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder()); // 解码
            pipeline.addLast("encoder", adapter.getEncoder()); // 解码
            pipeline.addLast("handler", nettyHandler); // 处理器
            return pipeline;
        }
    });
    // 服务器绑定端口监听
    // bind
    channel = bootstrap.bind(getBindAddress());
}

  • 和 dubbo-remoting-netty4 基本一致,下面只说一些差异的地方。
  • 第 6 至 8 行:创建线程池,不同于 dubbo-remoting-netty4 中,创建线程组 NioEventLoopGroup 。
  • 第 11 行:基于 boss 和 worker,创建 ChannelFactory 对象。
  • 并未设置 ServerBootstrap 的可选项。

关闭服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void doClose() {
    // 关闭服务器通道
    try {
        if (channel != null) {
            // unbind.
            channel.close();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    // 关闭连接到服务器的客户端通道
    try {
        Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
        if (channels != null && !channels.isEmpty()) {
            for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                try {
                    channel.close();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    // 优雅关闭 ServerBootstrap
    try {
        if (bootstrap != null) {
            // release external resource.
            bootstrap.releaseExternalResources();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    // 清空连接到服务器的客户端通道
    try {
        if (channels != null) {
            channels.clear();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
}

  • 和 dubbo-remoting-netty4 基本一致,下面只说一些差异的地方。
  • 第 27 至 35 行:调用 Bootstrap#releaseExternalResources() 方法,释放 ServerBootstrap 相关的资源。

6. NettyClient

com.alibaba.dubbo.remoting.transport.netty.NettyClient ,继承 AbstractNettyClient 抽象类,Netty 客户端实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
// ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
        Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
        Constants.DEFAULT_IO_THREADS);

private ClientBootstrap bootstrap;

private volatile org.jboss.netty.channel.Channel channel; // volatile, please copy reference to use

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    super(url, wrapChannelHandler(url, handler));
}

  • channelFactory 属性,【TODO 8027】为啥公用

启动客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
protected void doOpen() {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();

    // 实例化 ServerBootstrap
    bootstrap = new ClientBootstrap(channelFactory);
    // 设置可选项
    // config
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getTimeout());

    // 创建 NettyHandler 对象
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

    // 设置责任链路
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            // 创建 NettyCodecAdapter 对象
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder()); // 解码
            pipeline.addLast("encoder", adapter.getEncoder()); // 编码
            pipeline.addLast("handler", nettyHandler); // 处理器
            return pipeline;
        }
    });
}

  • 和 dubbo-remoting-netty4 基本一致。

连接服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    // 连接服务器
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try {
        // 等待连接成功或者超时
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
        // 连接成功
        if (ret && future.isSuccess()) {
            Channel newChannel = future.getChannel();
            newChannel.setInterestOps(Channel.OP_READ_WRITE);
            try {
                // 关闭老的连接
                // Close old channel
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        oldChannel.close();
                    } finally {
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                // 若 NettyClient 被关闭,关闭连接
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        }
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                // 设置新连接
                } else {
                    NettyClient.this.channel = newChannel;
                }
            }
        // 发生异常,抛出 RemotingException 异常
        } else if (future.getCause() != null) {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
        // 无结果(连接超时),抛出 RemotingException 异常
        } else {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        // 未连接,取消任务
        if (!isConnected()) {
            future.cancel();
        }
    }
}

  • 和 dubbo-remoting-netty4 基本一致,下面只说一些差异的地方。
  • 第 9 行:调用 ChannelFuture#awaitUninterruptibly(timeout, TimeUnit) 方法,等待连接成功或超时。这里传入的不是 3000 。
  • 第 55 至 60 行:最终结果为未连接,调用 ChannelFuture#cancel(true) 方法,取消任务。

关闭连接

1
2
3
4
5
6
7
8
@Override
protected void doClose() throws Throwable {
    /*try {
            bootstrap.releaseExternalResources();
        } catch (Throwable t) {
            logger.warn(t.getMessage());
        }*/
}

  • 因为 静态 channelFactory 是属性,被多个 NettyClient 共用。

7. Buffer

7.1 NettyBackedChannelBuffer

com.alibaba.dubbo.remoting.transport.netty.NettyBackedChannelBuffer ,实现 ChannelBuffer 接口,基于 Netty3 ChannelBuffer 的 ChannelBuffer 实现类。

构造方法

1
2
3
4
5
6
private org.jboss.netty.buffer.ChannelBuffer buffer;

public NettyBackedChannelBuffer(org.jboss.netty.buffer.ChannelBuffer buffer) {
    Assert.notNull(buffer, "buffer == null");
    this.buffer = buffer;
}

工厂

1
2
3
4
@Override
public ChannelBufferFactory factory() {
    return NettyBackedChannelBufferFactory.getInstance();
}

  • 对应的工厂是 NettyBackedChannelBufferFactory

实现方法

每个方法,直接调用 Netty3 ChannelBuffer 对应的方法。

7.2 NettyBackedChannelBufferFactory

com.alibaba.dubbo.remoting.transport.netty.NettyBackedChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 NettyBackedChannelBuffer 的工厂。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public ChannelBuffer getBuffer(int capacity) {
    return new NettyBackedChannelBuffer(ChannelBuffers.dynamicBuffer(capacity)); // ChannelBuffers 为 `org.jboss.netty.buffer` 包下
}

@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
    // 创建 Netty3 ChannelBuffer 对象
    org.jboss.netty.buffer.ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(length); // ChannelBuffers 为 `org.jboss.netty.buffer` 包下
    // 写入数据
    buffer.writeBytes(array, offset, length);
    // 创建 NettyBackedChannelBuffer 对象
    return new NettyBackedChannelBuffer(buffer);
}

@Override
public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
    return new NettyBackedChannelBuffer(ChannelBuffers.wrappedBuffer(nioBuffer)); // ChannelBuffers 为 `org.jboss.netty.buffer` 包下
}

  • 注意,此处的 ChannelBuffers 是 org.jboss.netty.buffer 包下的。

8. NettyCodecAdapter

com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter ,Netty 编解码适配器,将 Dubbo 编解码器 适配成 Netty3 的编码器和解码器。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * Netty 编码器
 */
private final ChannelHandler encoder = new InternalEncoder();
/**
 * Netty 解码器
 */
private final ChannelHandler decoder = new InternalDecoder();

/**
 * Dubbo 编解码器
 */
private final Codec2 codec;
/**
 * Dubbo URL
 */
private final URL url;
/**
 * 网络读写缓冲区大小
 */
private final int bufferSize;
/**
 * Dubbo ChannelHandler
 */
private final com.alibaba.dubbo.remoting.ChannelHandler handler;

public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
    this.codec = codec;
    this.url = url;
    this.handler = handler;
    // 设置 `bufferSize`
    int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
    this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}

  • bufferSize 属性,网络读写缓冲区大小,默认 8K 。这是 dubbo-remoting-netty4 的 NettyCodecAdapter 所不需要的。用于下面 「8.2 InternalDecoder」 ,消息解码时使用。

8.1 InternalEncoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Sharable
private class InternalEncoder extends OneToOneEncoder {

    @Override
    protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
        // 创建 HeapChannelBuffer 对象
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
        // 获得 NettyChannel 对象
        NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
        try {
            // 编码
            codec.encode(channel, buffer, msg);
        } finally {
            // 移除 NettyChannel 对象,若断开连接
            NettyChannel.removeChannelIfDisconnected(ch);
        }
        // 返回 Netty ChannelBuffer 对象
        return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
    }

}

  • org.jboss.netty.handler.codec.oneone.OneToOneEncoder 抽象类,Netty3 编码器。
  • 代码比较简单,胖友自己看注释。

8.2 InternalDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
private class InternalDecoder extends SimpleChannelUpstreamHandler {

    /**
     * 未读完的消息 Buffer
     */
    private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
        // 跳过非 ChannelBuffer
        Object o = event.getMessage();
        if (!(o instanceof ChannelBuffer)) {
            ctx.sendUpstream(event);
            return;
        }

        // 无可读,跳过
        ChannelBuffer input = (ChannelBuffer) o;
        int readable = input.readableBytes();
        if (readable <= 0) {
            return;
        }

        // 合并 `buffer` + `input` 成 `message`
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
        if (buffer.readable()) { // 有未读完的,需要拼接
            if (buffer instanceof DynamicChannelBuffer) {
                buffer.writeBytes(input.toByteBuffer());
                message = buffer;
            } else { // Netty3 ChannelBuffer ,转成 DynamicChannelBuffer 。
                int size = buffer.readableBytes() + input.readableBytes();
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
                message.writeBytes(buffer, buffer.readableBytes());
                message.writeBytes(input.toByteBuffer());
            }
        } else { // 无未读完的,直接创建
            message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(input.toByteBuffer());
        }

        // 获得 NettyChannel 对象
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        // 循环解析,直到结束
        Object msg;
        int saveReaderIndex;
        try {
            // decode object.
            do {
                saveReaderIndex = message.readerIndex();
                try {
                    msg = codec.decode(channel, message);
                } catch (IOException e) {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                    throw e;
                }
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    message.readerIndex(saveReaderIndex);
                    break;
                // 解码到消息,触发一条消息
                } else {
                    //is it possible to go here ? 芋艿:不可能,哈哈哈
                    if (saveReaderIndex == message.readerIndex()) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw new IOException("Decode without read data.");
                    }
                    if (msg != null) {
                        Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                    }
                }
            } while (message.readable());
        } finally {
            // 有剩余可读的,压缩并缓存
            if (message.readable()) {
                message.discardReadBytes();
                buffer = message;
            // 无剩余的,设置空 Buffer
            } else {
                buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
            }
            // 移除 NettyChannel 对象,若断开连接
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        ctx.sendUpstream(e);
    }
}

  • 继承 org.jboss.netty.channel.SimpleChannelUpstreamHandler 类。
  • buffer 属性,未读完的消息 Buffer 。在 #messageReceived(ctx, event) 方法中,我们在做拆包粘包的处理过程中,可能收到数据是不完整的。例如,不足以解析成一条 Dubbo Request 。那么,我们就需要将收到的,缓存到 buffer 中。
  • 第 10 至 15 行:跳过非 ChannelBuffer 。
  • 第 17 至 22 行:跳过无可读的。
  • 第 24 至 38 行:合并 两类三种 buffer + input 成 message 。有情况,胖友看下注释。
  • 第 41 行:获得 NettyChannel 对象。
  • 第 42 至 69 行:循环解析,直到结束。此处,和 dubbo-remoting-netty4 的解码流程,就是一致的了。
  • 第 71 至 75 行:有剩余的部分,压缩并缓存到 buffer 中。
  • 第 75 至 78 行:完全读完,设置 buffer 为空( EMPTY_BUFFER )。
  • 第 80 行:移除 NettyChannel 对象,若断开连接。

9. 日志工厂

和 netty-remoting-netty4 的日志工厂,基本一致。差异点是 DubboLogger ,无需实现类似 #log(format, arguments) 等需要格式化的方法。因此,无需复制 FormattingTuple 、MessageFormatter 类。

本文由作者按照 CC BY 4.0 进行授权