文章

NIO服务器(六)之Netty4实现

NIO服务器(六)之Netty4实现

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

在前面的文章,我们已经了解了 dubbo-remoting-api 如何实现 NIO 服务器的抽象 API 层。那么本文来看看,dubbo-remoting-netty4 ,如何将 Netty4 接入实现。

涉及如下类:

类图

友情提示:在当前版本,默认情况下,使用 Netty3 ,如果想配置成 Netty4 ,请参考文档:《Dubbo 用户指南 —— Netty4》

2. NettyTransporter

com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter ,实现 Transporter 接口,基于 Netty4 的网络传输实现类。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class NettyTransporter implements Transporter {

    /**
     * 拓展名
     */
    public static final String NAME = "netty4";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

  • NAME静态 属性,拓展名。
  • NettyTransporter 基于 Dubbo SPI 机制加载。
  • 创建 NettyServer 和 NettyClient 对象。

3. NettyChannel

io.netty.channel.ChannelFuture.NettyChannel ,实现 AbstractChannel 抽象类,封装 Netty Channel 的通道实现类。

NettyChannel 和 HeaderExchangeChannel 很类似。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * 通道集合
 */
private static final ConcurrentMap<io.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();

/**
 * 通道
 */
private final io.netty.channel.Channel channel;
/**
 * 属性集合
 */
private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();

private NettyChannel(io.netty.channel.Channel channel, URL url, ChannelHandler handler) {
    super(url, handler);
    if (channel == null) {
        throw new IllegalArgumentException("netty channel == null;");
    }
    this.channel = channel;
}

  • channel装饰器 属性,通道。NettyChannel 是传入 channel 属性的 ,每个实现的方法,都会调用 channel 。
  • attributes注意 属性,属性集合。 , setAttribute(…) 等方法,使用的是该属性,而不是 io.netty.channel.Channel 的。
  • channelMap静态 属性,通道集合。在实际 Netty Handler 里(例如下面我们会看到的 NettyServerHandler 和 NettyClientHandler),每个方法参数里,传递的是 io.netty.channel.Channel 对象。通过 NettyChannel.channelMap 中,获得对应的 NettyChannel 对象。
    • #getOrAddChannel(ch, url, handler)静态 方法,创建 NettyChannel 对象。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static NettyChannel getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler) {
    if (ch == null) {
        return null;
    }
    NettyChannel ret = channelMap.get(ch);
    if (ret == null) {
        NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
        if (ch.isActive()) { // 连接中
            ret = channelMap.putIfAbsent(ch, nettyChannel); // 添加到 channelMap
        }
        if (ret == null) {
            ret = nettyChannel;
        }
    }
    return ret;
}

  • #removeChannelIfDisconnected(ch) 静态方法,移除 NettyChannel 对象。代码如下:
1
2
3
4
5
static void removeChannelIfDisconnected(io.netty.channel.Channel ch) {
    if (ch != null && !ch.isActive()) { // 未连接
        channelMap.remove(ch); // 移除出channelMap
    }
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void send(Object message, boolean sent) throws RemotingException {
    // 检查连接状态
    super.send(message, sent);

    boolean success = true; // 如果没有等待发送成功,默认成功。
    int timeout = 0;
    try {
        // 发送消息
        ChannelFuture future = channel.writeAndFlush(message);
        // 等待发送成功
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        // 若发生异常,抛出
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    // 发送失败,抛出异常
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

  • 第 4 行:调用 #send(message, sent) 方法,检查连接状态。
  • 第 6 行: success ,是否执行成功。若不需要等待发送成功( sent = false ) ,默认成功。
  • 第 10 行:调用真正的 io.netty.channel.Channel#writeAndFlush(message) 方法,发送消息。
  • 第 11 至 15 行:若需要等待发送成功( sent = true ),等待直到成功或超时。
  • 第 16 至 20 行:若发生异常,抛出异常。
  • 第 26 至 29 行:若发送失败,抛出异常。

关闭通道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
@SuppressWarnings("Duplicates")
public void close() {
    // 标记关闭
    try {
        super.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    // 移除连接
    try {
        removeChannelIfDisconnected(channel);
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    // 清空属性 attributes
    try {
        attributes.clear();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    // 关闭真正的通道 channel
    try {
        if (logger.isInfoEnabled()) {
            logger.info("Close netty channel " + channel);
        }
        channel.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
}

  • 注意 ,最后一步才关闭真正的通道,避免中间状态。

其它方法

其它实现方法,比较简单,胖友自己瞅瞅。例如:

1
2
3
4
@Override
public boolean isConnected() {
    return !isClosed() && channel.isActive();
}

4. Server

4.1 NettyServer

com.alibaba.dubbo.remoting.transport.netty4.NettyServer ,实现 Server 接口,继承 AbstractServer 抽象类,Netty 服务器实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * 通道集合
 */
private Map<String, Channel> channels; // <ip:port, channel>

private ServerBootstrap bootstrap;

private io.netty.channel.Channel channel;

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) /* 设置线程名到 URL 上 */));
}

  • channels指定地址 属性,连接到服务器的客户端通道集合。笔者在看 NettyChannel 时,在有 NettyChannel.channels ,那么此处的 channels 不是重复了么?答案在 #getChannel(remoteAddress) 方法,获得 的 Channel 对象。代码如下:
1
2
3
4
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
    return channels.get(NetUtils.toAddressString(remoteAddress));
}

启动服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Override
protected void doOpen() {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();

    // 实例化 ServerBootstrap
    bootstrap = new ServerBootstrap();

    // 创建线程组
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    // 创建 NettyServerHandler 对象
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    // 设置 `channels` 属性
    channels = nettyServerHandler.getChannels();

    bootstrap
            // 设置它的线程组
            .group(bossGroup, workerGroup)
            // 设置 Channel类型
            .channel(NioServerSocketChannel.class) // Server
            // 设置可选项
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            // 设置责任链路
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    // 创建 NettyCodecAdapter 对象
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder()) // 解码
                            .addLast("encoder", adapter.getEncoder())  // 解码
                            .addLast("handler", nettyServerHandler); // 处理器
                }
            });

    // 服务器绑定端口监听
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}

  • 参考:《Netty4.x中文教程系列(六) 从头开始Bootstrap》
  • 第 4 行:设置 Netty 的日志工厂,在 「7. 日志处理」 详细解析。
  • 第 7 行:实例化 ServerBootstrap 对象。
  • 第 9 至 12 行:创建 bossGroup workerGroup 线程组。
  • 第 15 行:创建 NettyServerHandler 对象。
  • 第 17 行:设置 channels 属性,指向 NettyServerHandler.channels 属性。
  • 第 21 行:设置线程组。
  • 第 23 行:设置 Channel 类型为 NioServerSocketChannel 。
  • 第 24 至 27 行:设置可选项。
    • PooledByteBufAllocator.DEFAULT《Netty 调优》 ,对象池,重用缓冲区。参见 。
  • 第 29 至 39 行:设置责任链。
    • 第 33 行:创建 NettyCodecAdapter 对象。NettyCodecAdapter 在 「6. 2 NettyCodecAdapter」 详细解析。
    • 第 35 行:调用 NettyCodecAdapter#getDecoder() 方法,获得解码器,并设置。
    • 第 37 行:调用 NettyCodecAdapter#getEncoder() 方法,获得编码器,并设置。
    • 第 37 行:设置处理器 handler 。
  • 第 41 至 45 行:服务器绑定端口监听,正式启动 啦。

获得所有通道

1
2
3
4
5
6
7
8
9
10
11
public Collection<Channel> getChannels() {
    Collection<Channel> chs = new HashSet<Channel>();
    for (Channel channel : this.channels.values()) {
        if (channel.isConnected()) { // 已连接,返回
            chs.add(channel);
        } else { // 未连接,移除
            channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
        }
    }
    return chs;
}

关闭服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void doClose() {
    // 关闭服务器通道
    try {
        if (channel != null) {
            // unbind.
            channel.close();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    // 关闭连接到服务器的客户端通道
    try {
        Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
        if (channels != null && channels.size() > 0) {
            for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                try {
                    channel.close();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    // 优雅关闭工作组
    try {
        if (bootstrap != null) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    // 清空连接到服务器的客户端通道
    try {
        if (channels != null) {
            channels.clear();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
}

  • 第 3 至 11 行:关闭服务器通道( io.netty.channel.Channel )。
  • 第 12 至 26 行:关闭连接到服务器的客户端通道( com.alibaba.dubbo.remoting.Channel ) 。
  • 第 27 至 35 行:优雅关闭工作组。
  • 第 36 至 43 行:清空连接到服务器的客户端通道。

4.2 NettyServerHandler

com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyServer 的处理器。

NettyServerHandler 和 HeaderExchangeHandler 类似。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@io.netty.channel.ChannelHandler.Sharable
public class NettyServerHandler extends ChannelDuplexHandler {

    /**
     * Dubbo Channel 集合
     */
    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
    /**
     * URL
     */
    private final URL url;
    /**
     * Dubbo ChannelHandler
     */
    private final ChannelHandler handler;

    public NettyServerHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

    // ... 省略实现方法

}

  • @io.netty.channel.ChannelHandler.Sharable 注解:《netty4中注解Sharable的使用场景?》 FROM Sharable 注解主要是用来标示一个 ChannelHandler 可以被安全地共享,即可以在多个Channel 的 ChannelPipeline 中使用同一个ChannelHandler ,而不必每一个ChannelPipeline 都重新 new 一个新的 ChannelHandler 。
  • channels 属性,连接到服务器的 Dubbo Channel 集合。
  • handler 属性,Dubbo ChannelHandler。NettyServerHandler 对每个事件的处理,会调用 handler 对应的方法。

实现方法

每个实现的方法,处理都比较类似,一般是提交给 handler 做相应的处理。艿艿已经添加了代码注释,胖友可以自己看看。下面以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 交给下一个节点处理
    // 芋艿:实际此处不要调用也没关系,因为 NettyServerHandler 没下一个节点。
    ctx.fireChannelActive();

    // 创建 NettyChannel 对象
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        // 添加到 `channels` 中
        if (channel != null) {
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
        }
        // 提交给 `handler` 处理器。
        handler.connected(channel);
    } finally {
        // 移除 NettyChannel 对象,若已断开
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

  • 《Netty 框架总结「ChannelHandler 及 EventLoop」》
  • 第 5 行:调用 此处不要调用也没关系 ChannelHandlerContext#fireChannelActive() 方法,交给下一个节点处理。实际上, ,因为 NettyServerHandler 没下一个节点。
  • 第 8 行:调用 NettyChannel#getOrAddChannel(channel, url, handler) 方法,创建 NettyChannel 对象。
  • 第 10 至 13 行:添加到 channels 中。
  • 第 15 行:调用 ChannelHandler#connected(channel) 方法,处理连接事件。
  • 第 16 至 19 行:调用 已断开 NettyChannel#removeChannelIfDisconnected(channel) 方法,移除 NettyChannel 对象,若 。

5. Client

5.1 NettyClient

com.alibaba.dubbo.remoting.transport.netty4.NettyClient ,继承 AbstractNettyClient 抽象类,Netty 客户端实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
// 【TODO 8027】为啥公用
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));

private Bootstrap bootstrap;

private volatile io.netty.channel.Channel channel; // volatile, please copy reference to use

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    super(url, wrapChannelHandler(url, handler));
}

启动客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
protected void doOpen() {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();

    // 创建 NettyClientHandler 对象
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);

    // 实例化 ServerBootstrap
    bootstrap = new Bootstrap();
    bootstrap
            // 设置它的线程组
            .group(nioEventLoopGroup)
            // 设置可选项
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            // 设置 Channel类型
            .channel(NioSocketChannel.class);

    // 设置连接超时时间
    if (getTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
    }

    // 设置责任链路
    bootstrap.handler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) {
            // 创建 NettyCodecAdapter 对象
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder", adapter.getDecoder()) // 解码
                    .addLast("encoder", adapter.getEncoder()) // 解码
                    .addLast("handler", nettyClientHandler); // 处理器
        }
    });
}

  • 和 NettyClient#doOpen() 方法类似。我们仅仅说一些差异点。
  • 第 7 行:创建 NettyClientHandler 对象。
  • 第 13 行:设置线程组,没有 bossGroup 。
  • 第 20 行:设置 Channel 类型为 NioSocketChannel 。
  • 第 22 至 27 行:设置连接超时时间。

连接服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
@SuppressWarnings("Duplicates")
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    // 连接服务器
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try {
        // 等待连接成功或者超时
        boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
        // 连接成功
        if (ret && future.isSuccess()) {
            Channel newChannel = future.channel();
            try {
                // 关闭老的连接
                // Close old channel
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        oldChannel.close();
                    } finally {
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                // 若 NettyClient 被关闭,关闭连接
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        }
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                // 设置新连接
                } else {
                    NettyClient.this.channel = newChannel;
                }
            }
        // 发生异常,抛出 RemotingException 异常
        } else if (future.cause() != null) {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
        // 无结果(连接超时),抛出 RemotingException 异常
        } else {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        if (!isConnected()) {
            //future.cancel(true);
        }
    }
}

  • 第 6 行:调用 Bootstrap#connect(remoteAddress) 方法,连接服务器。
  • 第 9 行:调用 ChannelFuture#awaitUninterruptibly(3000, TimeUnit) 方法,等待连接成功或超时。这里传入 3000 貌似不太正确,应该传入 ChannelOption.CONNECT_TIMEOUT_MILLIS 的实际值。
  • 第 10 至 43 行:连接成功。
    • 第 14 至 26 行:若存在老的 连接,调用 Channel#close() 方法,进行关闭。
    • 第 29 至 38 行:若 NettyClient 被关闭,调用 新的连接 Channel#close() 方法,关闭 。
    • 第 39 至 42 行:设置新的连接 到 channel 。
  • 第 44 至 47 行:发生异常,抛出 Server RemotingException 异常。
  • 第 48 至 54 行:无结果(连接超时),抛出 Client RemotingException 异常。
  • 第 55 至 59 行:// 【TODO 8028】为什么不取消 future TODO 可能,和 3000 有关系。< 3000 强制 3000 。以及等待 3000

断开连接

1
2
3
4
5
6
7
8
@Override
protected void doDisConnect() {
    try {
        NettyChannel.removeChannelIfDisconnected(channel);
    } catch (Throwable t) {
        logger.warn(t.getMessage());
    }
}

关闭连接

1
2
3
4
5
@Override
protected void doClose() throws Throwable {
    //can't shutdown nioEventLoopGroup
    //nioEventLoopGroup.shutdownGracefully();
}

5.2 NettyClientHandler

com.alibaba.dubbo.remoting.transport.netty4.NettyClientHandler ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyClient 的处理器。

NettyServerHandler 和 HeaderExchangeHandler 类似。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@io.netty.channel.ChannelHandler.Sharable
public class NettyClientHandler extends ChannelDuplexHandler {

    /**
     * Dubbo URL
     */
    private final URL url;
    /**
     * Dubbo ChannelHandler
     */
    private final ChannelHandler handler;

    public NettyClientHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

    // ... 省略实现方法
}

实现方法

NettyClientHandler 的处理方式,和 NettyServerHandler 大体一致,但是也存在一定的差异,以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:

1
2
3
4
@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
}

  • 不同于 NettyServerHandler 的该方法,会提交给 handler 继续处理。因为,客户端不会被连接,无需做连入 Channel 的管理。

其他方法,胖友自己查看。

6. NettyBackedChannelBuffer

com.alibaba.dubbo.remoting.transport.netty4.NettyBackedChannelBuffer ,实现 ChannelBuffer 接口,基于 Netty ByteBuf 的 ChannelBuffer 实现类。

构造方法

1
2
3
4
5
6
private ByteBuf buffer;

public NettyBackedChannelBuffer(ByteBuf buffer) {
    Assert.notNull(buffer, "buffer == null");
    this.buffer = buffer;
}

工厂

1
2
3
4
5
@Override
//has nothing use
public ChannelBufferFactory factory() {
    return null;
}

  • 无使用工厂的地方。
  • 另外,ByteBuf 默认情况下,容量为 Integer.MAX_VALUE 。

实现方法

每个方法,直接调用 ByteBuf 对应的方法。 ChannelBuffer 是以 ByteBuf 为原型,设计的接口 API 。

7. NettyCodecAdapter

com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter ,Netty 编解码适配器,将 Dubbo 编解码器 适配成 Netty4 的编码器和解码器。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
 * Netty 编码器
 */
private final ChannelHandler encoder = new InternalEncoder();
/**
 * Netty 解码器
 */
private final ChannelHandler decoder = new InternalDecoder();

/**
 * Dubbo 编解码器
 */
private final Codec2 codec;
/**
 * Dubbo URL
 */
private final URL url;
/**
 * Dubbo ChannelHandler
 */
private final com.alibaba.dubbo.remoting.ChannelHandler handler;

public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
    this.codec = codec;
    this.url = url;
    this.handler = handler;
}

7.1 InternalEncoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private class InternalEncoder extends MessageToByteEncoder {

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        // 创建 NettyBackedChannelBuffer 对象
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
        // 获得 NettyChannel 对象
        Channel ch = ctx.channel();
        NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
        try {
            // 编码
            codec.encode(channel, buffer, msg);
        } finally {
            // 移除 NettyChannel 对象,若断开连接
            NettyChannel.removeChannelIfDisconnected(ch);
        }
    }

}

  • io.netty.handler.codec.MessageToByteEncoder抽象类 ,Netty4 编码器 。
  • 代码比较简单,胖友自己看注释。

7.2 InternalDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private class InternalDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
        // 创建 NettyBackedChannelBuffer 对象
        ChannelBuffer message = new NettyBackedChannelBuffer(input);
        // 获得 NettyChannel 对象
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        // 循环解析,直到结束
        Object msg;
        int saveReaderIndex;
        try {
            // decode object.
            do {
                // 记录当前读进度
                saveReaderIndex = message.readerIndex();
                // 解码
                try {
                    msg = codec.decode(channel, message);
                } catch (IOException e) {
                    throw e;
                }
                // 需要更多输入,即消息不完整,标记回原有读进度,并结束
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    message.readerIndex(saveReaderIndex);
                    break;
                // 解码到消息,添加到 `out`
                } else {
                    //is it possible to go here ? 芋艿:不可能,哈哈哈
                    if (saveReaderIndex == message.readerIndex()) {
                        throw new IOException("Decode without read data.");
                    }
                    if (msg != null) {
                        out.add(msg);
                    }
                }
            } while (message.readable());
        } finally {
            // 移除 NettyChannel 对象,若断开连接
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
}

  • io.netty.handler.codec.ByteToMessageDecoder抽象类 ,Netty4 解码器 。
  • 代码比较简单,胖友自己看注释。

8. 日志工厂

《Dubbo 用户指南 —— 日志适配》 文档,提到:

自 2.2.1 开始,dubbo 开始内置 log4j、slf4j、jcl、jdk 这些日志框架的适配。

《Netty源码笔记 —— Netty日志处理》 文档,我们可以看到 Netty 支持实现自定义的日志工厂。通过这样的方式,我们可以接入 Dubbo 的日志适配

下面,我们来看看具体的代码实现。

调用 NettyHelper#setNettyLoggerFactory() 方法,设置日志工厂,基于 Dubbo Logger 组件。代码如下:

1
2
3
4
5
6
public static void setNettyLoggerFactory() {
    InternalLoggerFactory factory = InternalLoggerFactory.getDefaultFactory();
    if (factory == null || !(factory instanceof DubboLoggerFactory)) {
        InternalLoggerFactory.setDefaultFactory(new DubboLoggerFactory());
    }
}

  • 设置 Netty 日志工厂为 DubboLoggerFactory 。

DubboLoggerFactory ,代码如下:

1
2
3
4
5
6
7
8
static class DubboLoggerFactory extends InternalLoggerFactory {

    @Override
    public InternalLogger newInstance(String name) {
        return new DubboLogger(LoggerFactory.getLogger(name));
    }

}

  • 创建 DubboLogger 对象,并传入 Logger name 对应的 Dubbo 对象,而 Dubbo Logger 的对象,基于 Dubbo SPI 机制加载。

DubboLogger ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static class DubboLogger extends AbstractInternalLogger {

    /**
     * 日志组件
     */
    private com.alibaba.dubbo.common.logger.Logger logger;

    DubboLogger(Logger logger) {
        super(logger.getClass().getName());
        this.logger = logger;
    }

    @Override
    public void info(String msg) {
        if (isInfoEnabled()) {
            logger.info(msg);
        }
    }

    // ... 省略类似代码
}

  • 在实现的每个方法中,直接调用 logger 对应的方法。
  • 在类似 #info(String format, Object… arguments) 方法中,需要对日志内容进行格式化,代码如下:
1
2
3
4
5
6
7
@Override
public void info(String format, Object... arguments) {
    if (isInfoEnabled()) {
        FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);
        logger.info(ft.getMessage(), ft.getThrowable());
    }
}

我们看到需要 FormattingTupleMessageFormatter 这两个类,用于格式化。实际上,这两个类是直接从 Netty4 中拷贝出来的两个类,因为它们是 package 修饰的类,在 DubboLogger 中,无法访问到它们,所以进行复制解决。

本文由作者按照 CC BY 4.0 进行授权