文章

NIO服务器(六)之Netty4实现

NIO服务器(六)之Netty4实现

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

在前面的文章,我们已经了解了 dubbo-remoting-api 如何实现 NIO 服务器的抽象 API 层。那么本文来看看,dubbo-remoting-netty4 ,如何将 Netty4 接入实现。

涉及如下类:

类图

友情提示:在当前版本,默认情况下,使用 Netty3 ,如果想配置成 Netty4 ,请参考文档:《Dubbo 用户指南 —— Netty4》

2. NettyTransporter

com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter ,实现 Transporter 接口,基于 Netty4 的网络传输实现类。代码如下:

```plain text plain public class NettyTransporter implements Transporter { /** * 拓展名 */ public static final String NAME = “netty4”; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
---

- NAME**静态**
属性,拓展名。
- NettyTransporter 基于 Dubbo SPI 机制加载。
- 创建 NettyServer 和 NettyClient 对象。

# 3. NettyChannel

io.netty.channel.ChannelFuture.NettyChannel ,实现 AbstractChannel 抽象类,**封装 Netty Channel** 的通道实现类。

NettyChannel 和 HeaderExchangeChannel 很类似。

**构造方法**

```plain text
plain /**  * 通道集合  */ private static final ConcurrentMap<io.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();  /**  * 通道  */ private final io.netty.channel.Channel channel; /**  * 属性集合  */ private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();  private NettyChannel(io.netty.channel.Channel channel, URL url, ChannelHandler handler) {     super(url, handler);     if (channel == null) {         throw new IllegalArgumentException("netty channel == null;");     }     this.channel = channel; }

  • channel装饰器 属性,通道。NettyChannel 是传入 channel 属性的 ,每个实现的方法,都会调用 channel 。
  • attributes注意 属性,属性集合。 , setAttribute(…) 等方法,使用的是该属性,而不是 io.netty.channel.Channel 的。
  • channelMap静态 属性,通道集合。在实际 Netty Handler 里(例如下面我们会看到的 NettyServerHandler 和 NettyClientHandler),每个方法参数里,传递的是 io.netty.channel.Channel 对象。通过 NettyChannel.channelMap 中,获得对应的 NettyChannel 对象。
    • #getOrAddChannel(ch, url, handler)静态 方法,创建 NettyChannel 对象。代码如下:

```plain text plain static NettyChannel getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } NettyChannel ret = channelMap.get(ch); if (ret == null) { NettyChannel nettyChannel = new NettyChannel(ch, url, handler); if (ch.isActive()) { // 连接中 ret = channelMap.putIfAbsent(ch, nettyChannel); // 添加到 channelMap } if (ret == null) { ret = nettyChannel; } } return ret; }

1
2
3
4
5
6
---

```plain text
    * <font style="color:rgb(51, 51, 51);">x</font>
- <font style="color:rgb(51, 51, 51);">#removeChannelIfDisconnected(ch)</font>**<font style="color:rgb(51, 51, 51);">静态</font>**<font style="color:rgb(51, 51, 51);">方法,移除 NettyChannel 对象。代码如下:</font>

```plain text plain static void removeChannelIfDisconnected(io.netty.channel.Channel ch) { if (ch != null && !ch.isActive()) { // 未连接 channelMap.remove(ch); // 移除出channelMap } }

1
2
3
4
5
---

```plain text
    * <font style="color:rgb(51, 51, 51);">x</font>

发送消息

```plain text plain 1: @Override 2: public void send(Object message, boolean sent) throws RemotingException { 3: // 检查连接状态 4: super.send(message, sent); 5: 6: boolean success = true; // 如果没有等待发送成功,默认成功。 7: int timeout = 0; 8: try { 9: // 发送消息 10: ChannelFuture future = channel.writeAndFlush(message); 11: // 等待发送成功 12: if (sent) { 13: timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 14: success = future.await(timeout); 15: } 16: // 若发生异常,抛出 17: Throwable cause = future.cause(); 18: if (cause != null) { 19: throw cause; 20: } 21: } catch (Throwable e) { 22: throw new RemotingException(this, “Failed to send message “ + message + “ to “ + getRemoteAddress() + “, cause: “ + e.getMessage(), e); 23: } 24: 25: // 发送失败,抛出异常 26: if (!success) { 27: throw new RemotingException(this, “Failed to send message “ + message + “ to “ + getRemoteAddress() 28: + “in timeout(“ + timeout + “ms) limit”); 29: } 30: }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
---

- 第 4 行:调用
#send(message, sent)
方法,检查连接状态。
- 第 6 行:
success
,是否执行成功。若不需要等待发送成功(
sent = false
) ,默认成功。
- 第 10 行:调用**真正的**
io.netty.channel.Channel#writeAndFlush(message)
方法,发送消息。
- 第 11 至 15 行:若需要等待发送成功(
sent = true
),等待直到成功或超时。
- 第 16 至 20 行:若发生异常,抛出异常。
- 第 26 至 29 行:若发送失败,抛出异常。

**关闭通道**

```plain text
plain @Override @SuppressWarnings("Duplicates") public void close() {     // 标记关闭     try {         super.close();     } catch (Exception e) {         logger.warn(e.getMessage(), e);     }     // 移除连接     try {         removeChannelIfDisconnected(channel);     } catch (Exception e) {         logger.warn(e.getMessage(), e);     }     // 清空属性 attributes     try {         attributes.clear();     } catch (Exception e) {         logger.warn(e.getMessage(), e);     }     // 关闭真正的通道 channel     try {         if (logger.isInfoEnabled()) {             logger.info("Close netty channel " + channel);         }         channel.close();     } catch (Exception e) {         logger.warn(e.getMessage(), e);     } }

  • 注意 ,最后一步才关闭真正的通道,避免中间状态。

其它方法

其它实现方法,比较简单,胖友自己瞅瞅。例如:

```plain text plain @Override public boolean isConnected() { return !isClosed() && channel.isActive(); }

1
2
3
4
5
6
7
8
9
10
11
12
13
---

# 4. Server

## 4.1 NettyServer

[com.alibaba.dubbo.remoting.transport.netty4.NettyServer](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-netty4/src/main/java/com/alibaba/dubbo/remoting/transport/netty4/NettyServer.java) ,实现 Server 接口,继承 AbstractServer 抽象类,Netty 服务器实现类。

**构造方法**

```plain text
plain /**  * 通道集合  */ private Map<String, Channel> channels; // <ip:port, channel>  private ServerBootstrap bootstrap;  private io.netty.channel.Channel channel;  private EventLoopGroup bossGroup; private EventLoopGroup workerGroup;  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {     super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) /* 设置线程名到 URL 上 */)); }

  • channels指定地址 属性,连接到服务器的客户端通道集合。笔者在看 NettyChannel 时,在有 NettyChannel.channels ,那么此处的 channels 不是重复了么?答案在 #getChannel(remoteAddress) 方法,获得 的 Channel 对象。代码如下:

```plain text plain @Override public Channel getChannel(InetSocketAddress remoteAddress) { return channels.get(NetUtils.toAddressString(remoteAddress)); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
---

- bootstrap
channel
bossGroup
workerGroup
属性, 不熟悉这几个的胖友,请 Google 一下 Netty 入门噶。
- ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)
代码段,包装 ChannelHandler ,实现 Dubbo 线程模型的功能。
    - [《精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层》「8. Dispacher」](http://svip.iocoder.cn/Dubbo/remoting-api-transport/?self=)
有详细解析。
    - [《Dubbo 用户指南 —— 线程模型》](http://dubbo.apache.org/zh-cn/docs/user/demos/thread-model.html)

**启动服务器**

```plain text
plain 1: @Override  2: protected void doOpen() {  3:     // 设置日志工厂  4:     NettyHelper.setNettyLoggerFactory();  5:   6:     // 实例化 ServerBootstrap  7:     bootstrap = new ServerBootstrap();  8:   9:     // 创建线程组 10:     bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); 11:     workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), 12:             new DefaultThreadFactory("NettyServerWorker", true)); 13:  14:     // 创建 NettyServerHandler 对象 15:     final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); 16:     // 设置 `channels` 属性 17:     channels = nettyServerHandler.getChannels(); 18:  19:     bootstrap 20:             // 设置它的线程组 21:             .group(bossGroup, workerGroup) 22:             // 设置 Channel类型 23:             .channel(NioServerSocketChannel.class) // Server 24:             // 设置可选项 25:             .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) 26:             .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) 27:             .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 28:             // 设置责任链路 29:             .childHandler(new ChannelInitializer<NioSocketChannel>() { 30:                 @Override 31:                 protected void initChannel(NioSocketChannel ch) { 32:                     // 创建 NettyCodecAdapter 对象 33:                     NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); 34:                     ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug 35:                             .addLast("decoder", adapter.getDecoder()) // 解码 36:                             .addLast("encoder", adapter.getEncoder())  // 解码 37:                             .addLast("handler", nettyServerHandler); // 处理器 38:                 } 39:             }); 40:  41:     // 服务器绑定端口监听 42:     // bind 43:     ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); 44:     channelFuture.syncUninterruptibly(); 45:     channel = channelFuture.channel(); 46: }

  • 参考:《Netty4.x中文教程系列(六) 从头开始Bootstrap》
  • 第 4 行:设置 Netty 的日志工厂,在 「7. 日志处理」 详细解析。
  • 第 7 行:实例化 ServerBootstrap 对象。
  • 第 9 至 12 行:创建 bossGroup workerGroup 线程组。
  • 第 15 行:创建 NettyServerHandler 对象。
  • 第 17 行:设置 channels 属性,指向 NettyServerHandler.channels 属性。
  • 第 21 行:设置线程组。
  • 第 23 行:设置 Channel 类型为 NioServerSocketChannel 。
  • 第 24 至 27 行:设置可选项。
    • PooledByteBufAllocator.DEFAULT《Netty 调优》 ,对象池,重用缓冲区。参见 。
  • 第 29 至 39 行:设置责任链。
    • 第 33 行:创建 NettyCodecAdapter 对象。NettyCodecAdapter 在 「6. 2 NettyCodecAdapter」 详细解析。
    • 第 35 行:调用 NettyCodecAdapter#getDecoder() 方法,获得解码器,并设置。
    • 第 37 行:调用 NettyCodecAdapter#getEncoder() 方法,获得编码器,并设置。
    • 第 37 行:设置处理器 handler 。
  • 第 41 至 45 行:服务器绑定端口监听,正式启动 啦。

获得所有通道

```plain text plain public Collection getChannels() { Collection chs = new HashSet(); for (Channel channel : this.channels.values()) { if (channel.isConnected()) { // 已连接,返回 chs.add(channel); } else { // 未连接,移除 channels.remove(NetUtils.toAddressString(channel.getRemoteAddress())); } } return chs; }

1
2
3
4
5
6
7
---

**关闭服务器**

```plain text
plain 1: @Override  2: protected void doClose() {  3:     // 关闭服务器通道  4:     try {  5:         if (channel != null) {  6:             // unbind.  7:             channel.close();  8:         }  9:     } catch (Throwable e) { 10:         logger.warn(e.getMessage(), e); 11:     } 12:     // 关闭连接到服务器的客户端通道 13:     try { 14:         Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels(); 15:         if (channels != null && channels.size() > 0) { 16:             for (com.alibaba.dubbo.remoting.Channel channel : channels) { 17:                 try { 18:                     channel.close(); 19:                 } catch (Throwable e) { 20:                     logger.warn(e.getMessage(), e); 21:                 } 22:             } 23:         } 24:     } catch (Throwable e) { 25:         logger.warn(e.getMessage(), e); 26:     } 27:     // 优雅关闭工作组 28:     try { 29:         if (bootstrap != null) { 30:             bossGroup.shutdownGracefully(); 31:             workerGroup.shutdownGracefully(); 32:         } 33:     } catch (Throwable e) { 34:         logger.warn(e.getMessage(), e); 35:     } 36:     // 清空连接到服务器的客户端通道 37:     try { 38:         if (channels != null) { 39:             channels.clear(); 40:         } 41:     } catch (Throwable e) { 42:         logger.warn(e.getMessage(), e); 43:     } 44: }

  • 第 3 至 11 行:关闭服务器通道( io.netty.channel.Channel )。
  • 第 12 至 26 行:关闭连接到服务器的客户端通道( com.alibaba.dubbo.remoting.Channel ) 。
  • 第 27 至 35 行:优雅关闭工作组。
  • 第 36 至 43 行:清空连接到服务器的客户端通道。

4.2 NettyServerHandler

com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyServer 的处理器。

NettyServerHandler 和 HeaderExchangeHandler 类似。

构造方法

```plain text plain @io.netty.channel.ChannelHandler.Sharable public class NettyServerHandler extends ChannelDuplexHandler { /** * Dubbo Channel 集合 */ private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel> /** * URL */ private final URL url; /** * Dubbo ChannelHandler */ private final ChannelHandler handler; public NettyServerHandler(URL url, ChannelHandler handler) { if (url == null) { throw new IllegalArgumentException(“url == null”); } if (handler == null) { throw new IllegalArgumentException(“handler == null”); } this.url = url; this.handler = handler; } // … 省略实现方法 }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
---

- [`@io.netty.channel.ChannelHandler.Sharable](http://svip.iocoder.cn/Dubbo/remoting-impl-netty4/mailto:%60@io.netty.channel.ChannelHandler.Sharable)<font style="color:rgb(51, 51, 51);">` 注解:[《netty4中注解Sharable的使用场景?》](https://www.zhihu.com/question/50198921)
FROM
Sharable 注解主要是用来标示一个 ChannelHandler 可以被安全地共享,即可以在多个Channel 的 ChannelPipeline 中使用同一个ChannelHandler ,而不必每一个ChannelPipeline 都重新 new 一个新的 ChannelHandler 。
- channels
属性,连接到服务器的 Dubbo Channel 集合。
- handler
属性,Dubbo ChannelHandler。NettyServerHandler 对每个事件的处理,会调用
handler
对应的方法。

**实现方法**

每个实现的方法,处理都比较类似,一般是提交给 handler 做相应的处理。艿艿已经添加了代码注释,胖友可以自己看看。下面以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:

```plain text
plain 1: @Override  2: public void channelActive(ChannelHandlerContext ctx) throws Exception {  3:     // 交给下一个节点处理  4:     // 芋艿:实际此处不要调用也没关系,因为 NettyServerHandler 没下一个节点。  5:     ctx.fireChannelActive();  6:   7:     // 创建 NettyChannel 对象  8:     NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);  9:     try { 10:         // 添加到 `channels` 中 11:         if (channel != null) { 12:             channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel); 13:         } 14:         // 提交给 `handler` 处理器。 15:         handler.connected(channel); 16:     } finally { 17:         // 移除 NettyChannel 对象,若已断开 18:         NettyChannel.removeChannelIfDisconnected(ctx.channel()); 19:     } 20: }

  • 《Netty 框架总结「ChannelHandler 及 EventLoop」》
  • 第 5 行:调用 此处不要调用也没关系 ChannelHandlerContext#fireChannelActive() 方法,交给下一个节点处理。实际上, ,因为 NettyServerHandler 没下一个节点。
  • 第 8 行:调用 NettyChannel#getOrAddChannel(channel, url, handler) 方法,创建 NettyChannel 对象。
  • 第 10 至 13 行:添加到 channels 中。
  • 第 15 行:调用 ChannelHandler#connected(channel) 方法,处理连接事件。
  • 第 16 至 19 行:调用 已断开 NettyChannel#removeChannelIfDisconnected(channel) 方法,移除 NettyChannel 对象,若 。

5. Client

5.1 NettyClient

com.alibaba.dubbo.remoting.transport.netty4.NettyClient ,继承 AbstractNettyClient 抽象类,Netty 客户端实现类。

构造方法

```plain text plain // 【TODO 8027】为啥公用 private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory(“NettyClientWorker”, true)); private Bootstrap bootstrap; private volatile io.netty.channel.Channel channel; // volatile, please copy reference to use public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { super(url, wrapChannelHandler(url, handler)); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
---

- nioEventLoopGroup
属性,【TODO 8027】为啥公用
- channel**volatile多线程**
属性,通道,有
修饰符。因为客户端可能会断开重连,需要保证
的可见性。
- #wrapChannelHandler(url, handler)
代码段,包装 ChannelHandler ,实现 Dubbo 线程模型的功能。
    - [《精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层》「8. Dispacher」](http://svip.iocoder.cn/Dubbo/remoting-api-transport/?self=)
有详细解析。
    - [《Dubbo 用户指南 —— 线程模型》](http://dubbo.apache.org/zh-cn/docs/user/demos/thread-model.html)

**启动客户端**

```plain text
plain 1: @Override  2: protected void doOpen() {  3:     // 设置日志工厂  4:     NettyHelper.setNettyLoggerFactory();  5:   6:     // 创建 NettyClientHandler 对象  7:     final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);  8:   9:     // 实例化 ServerBootstrap 10:     bootstrap = new Bootstrap(); 11:     bootstrap 12:             // 设置它的线程组 13:             .group(nioEventLoopGroup) 14:             // 设置可选项 15:             .option(ChannelOption.SO_KEEPALIVE, true) 16:             .option(ChannelOption.TCP_NODELAY, true) 17:             .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 18:             //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) 19:             // 设置 Channel类型 20:             .channel(NioSocketChannel.class); 21:  22:     // 设置连接超时时间 23:     if (getTimeout() < 3000) { 24:         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); 25:     } else { 26:         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()); 27:     } 28:  29:     // 设置责任链路 30:     bootstrap.handler(new ChannelInitializer() { 31:         @Override 32:         protected void initChannel(Channel ch) { 33:             // 创建 NettyCodecAdapter 对象 34:             NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); 35:             ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug 36:                     .addLast("decoder", adapter.getDecoder()) // 解码 37:                     .addLast("encoder", adapter.getEncoder()) // 解码 38:                     .addLast("handler", nettyClientHandler); // 处理器 39:         } 40:     }); 41: }

  • 和 NettyClient#doOpen() 方法类似。我们仅仅说一些差异点。
  • 第 7 行:创建 NettyClientHandler 对象。
  • 第 13 行:设置线程组,没有 bossGroup 。
  • 第 20 行:设置 Channel 类型为 NioSocketChannel 。
  • 第 22 至 27 行:设置连接超时时间。

连接服务器

```plain text plain 1: @Override 2: @SuppressWarnings(“Duplicates”) 3: protected void doConnect() throws Throwable { 4: long start = System.currentTimeMillis(); 5: // 连接服务器 6: ChannelFuture future = bootstrap.connect(getConnectAddress()); 7: try { 8: // 等待连接成功或者超时 9: boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS); 10: // 连接成功 11: if (ret && future.isSuccess()) { 12: Channel newChannel = future.channel(); 13: try { 14: // 关闭老的连接 15: // Close old channel 16: Channel oldChannel = NettyClient.this.channel; // copy reference 17: if (oldChannel != null) { 18: try { 19: if (logger.isInfoEnabled()) { 20: logger.info(“Close old netty channel “ + oldChannel + “ on create new netty channel “ + newChannel); 21: } 22: oldChannel.close(); 23: } finally { 24: NettyChannel.removeChannelIfDisconnected(oldChannel); 25: } 26: } 27: } finally { 28: // 若 NettyClient 被关闭,关闭连接 29: if (NettyClient.this.isClosed()) { 30: try { 31: if (logger.isInfoEnabled()) { 32: logger.info(“Close new netty channel “ + newChannel + “, because the client closed.”); 33: } 34: newChannel.close(); 35: } finally { 36: NettyClient.this.channel = null; 37: NettyChannel.removeChannelIfDisconnected(newChannel); 38: } 39: // 设置新连接 40: } else { 41: NettyClient.this.channel = newChannel; 42: } 43: } 44: // 发生异常,抛出 RemotingException 异常 45: } else if (future.cause() != null) { 46: throw new RemotingException(this, “client(url: “ + getUrl() + “) failed to connect to server “ 47: + getRemoteAddress() + “, error message is:” + future.cause().getMessage(), future.cause()); 48: // 无结果(连接超时),抛出 RemotingException 异常 49: } else { 50: throw new RemotingException(this, “client(url: “ + getUrl() + “) failed to connect to server “ 51: + getRemoteAddress() + “ client-side timeout “ 52: + getConnectTimeout() + “ms (elapsed: “ + (System.currentTimeMillis() - start) + “ms) from netty client “ 53: + NetUtils.getLocalHost() + “ using dubbo version “ + Version.getVersion()); 54: } 55: } finally { 56: if (!isConnected()) { 57: //future.cancel(true); 58: } 59: } 60: }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
---

- 第 6 行:调用
Bootstrap#connect(remoteAddress)
方法,连接服务器。
- 第 9 行:调用
ChannelFuture#awaitUninterruptibly(3000, TimeUnit)
方法,等待连接成功或超时。这里传入
3000
貌似不太正确,应该传入
ChannelOption.CONNECT_TIMEOUT_MILLIS
的实际值。
- 第 10 至 43 行:连接成功。
    - 第 14 至 26 行:若存在**老的**
连接,调用
Channel#close()
方法,进行关闭。
    - 第 29 至 38 行:若 NettyClient 被关闭,调用 **新的连接**
Channel#close()
方法,关闭
。
    - 第 39 至 42 行:设置**新的连接**
到
channel
。
- 第 44 至 47 行:发生异常,抛出 **Server**
RemotingException 异常。
- 第 48 至 54 行:无结果(连接超时),抛出 **Client**
RemotingException 异常。
- 第 55 至 59 行:// 【TODO 8028】为什么不取消 future TODO 可能,和 3000 有关系。< 3000 强制 3000 。以及等待 3000

**断开连接**

```plain text
plain @Override protected void doDisConnect() {     try {         NettyChannel.removeChannelIfDisconnected(channel);     } catch (Throwable t) {         logger.warn(t.getMessage());     } }

关闭连接

```plain text plain @Override protected void doClose() throws Throwable { //can’t shutdown nioEventLoopGroup //nioEventLoopGroup.shutdownGracefully(); }

1
2
3
4
5
6
7
8
9
10
11
12
13
---

## 5.2 NettyClientHandler

[com.alibaba.dubbo.remoting.transport.netty4.NettyClientHandler](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-netty4/src/main/java/com/alibaba/dubbo/remoting/transport/netty4/NettyClientHandler.java) ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyClient 的处理器。

NettyServerHandler 和 HeaderExchangeHandler 类似。

**构造方法**

```plain text
plain @io.netty.channel.ChannelHandler.Sharable public class NettyClientHandler extends ChannelDuplexHandler {      /**      * Dubbo URL      */     private final URL url;     /**      * Dubbo ChannelHandler      */     private final ChannelHandler handler;      public NettyClientHandler(URL url, ChannelHandler handler) {         if (url == null) {             throw new IllegalArgumentException("url == null");         }         if (handler == null) {             throw new IllegalArgumentException("handler == null");         }         this.url = url;         this.handler = handler;     }          // ... 省略实现方法 }

实现方法

NettyClientHandler 的处理方式,和 NettyServerHandler 大体一致,但是也存在一定的差异,以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:

```plain text plain @Override public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
---

- 不同于 NettyServerHandler 的该方法,会提交给
handler
继续处理。因为,客户端不会被连接,无需做连入 Channel 的管理。

其他方法,胖友自己查看。

# 6. NettyBackedChannelBuffer

[com.alibaba.dubbo.remoting.transport.netty4.NettyBackedChannelBuffer](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-netty4/src/main/java/com/alibaba/dubbo/remoting/transport/netty4/NettyBackedChannelBuffer.java) ,实现 ChannelBuffer 接口,基于 **Netty ByteBuf** 的 ChannelBuffer 实现类。

**构造方法**

```plain text
plain private ByteBuf buffer;  public NettyBackedChannelBuffer(ByteBuf buffer) {     Assert.notNull(buffer, "buffer == null");     this.buffer = buffer; }

工厂

```plain text plain @Override //has nothing use public ChannelBufferFactory factory() { return null; }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
---

- 无使用工厂的地方。
- 另外,ByteBuf 默认情况下,容量为 Integer.MAX_VALUE 。

**实现方法**

每个方法,直接调用 ByteBuf 对应的方法。 ChannelBuffer 是以 ByteBuf 为**原型**,设计的接口 API 。

# 7. NettyCodecAdapter

[com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-netty4/src/main/java/com/alibaba/dubbo/remoting/transport/netty4/NettyCodecAdapter.java) ,Netty 编解码**适配器**,将 **Dubbo 编解码器** 适配成 Netty4 的编码器和解码器。

**构造方法**

```plain text
plain /**  * Netty 编码器  */ private final ChannelHandler encoder = new InternalEncoder(); /**  * Netty 解码器  */ private final ChannelHandler decoder = new InternalDecoder();  /**  * Dubbo 编解码器  */ private final Codec2 codec; /**  * Dubbo URL  */ private final URL url; /**  * Dubbo ChannelHandler  */ private final com.alibaba.dubbo.remoting.ChannelHandler handler;  public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {     this.codec = codec;     this.url = url;     this.handler = handler; }

7.1 InternalEncoder

```plain text plain private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { // 创建 NettyBackedChannelBuffer 对象 com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); // 获得 NettyChannel 对象 Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { // 编码 codec.encode(channel, buffer, msg); } finally { // 移除 NettyChannel 对象,若断开连接 NettyChannel.removeChannelIfDisconnected(ch); } } }

1
2
3
4
5
6
7
8
9
10
11
12
---

- io.netty.handler.codec.MessageToByteEncoder**抽象类**
,Netty4 编码器
。
- 代码比较简单,胖友自己看注释。

## 7.2 InternalDecoder

```plain text
plain private class InternalDecoder extends ByteToMessageDecoder {      @Override     protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {         // 创建 NettyBackedChannelBuffer 对象         ChannelBuffer message = new NettyBackedChannelBuffer(input);         // 获得 NettyChannel 对象         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);         // 循环解析,直到结束         Object msg;         int saveReaderIndex;         try {             // decode object.             do {                 // 记录当前读进度                 saveReaderIndex = message.readerIndex();                 // 解码                 try {                     msg = codec.decode(channel, message);                 } catch (IOException e) {                     throw e;                 }                 // 需要更多输入,即消息不完整,标记回原有读进度,并结束                 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {                     message.readerIndex(saveReaderIndex);                     break;                 // 解码到消息,添加到 `out`                 } else {                     //is it possible to go here ? 芋艿:不可能,哈哈哈                     if (saveReaderIndex == message.readerIndex()) {                         throw new IOException("Decode without read data.");                     }                     if (msg != null) {                         out.add(msg);                     }                 }             } while (message.readable());         } finally {             // 移除 NettyChannel 对象,若断开连接             NettyChannel.removeChannelIfDisconnected(ctx.channel());         }     } }

  • io.netty.handler.codec.ByteToMessageDecoder抽象类 ,Netty4 解码器 。
  • 代码比较简单,胖友自己看注释。

8. 日志工厂

《Dubbo 用户指南 —— 日志适配》 文档,提到:

自 2.2.1 开始,dubbo 开始内置 log4j、slf4j、jcl、jdk 这些日志框架的适配。

《Netty源码笔记 —— Netty日志处理》 文档,我们可以看到 Netty 支持实现自定义的日志工厂。通过这样的方式,我们可以接入 Dubbo 的日志适配

下面,我们来看看具体的代码实现。

调用 NettyHelper#setNettyLoggerFactory() 方法,设置日志工厂,基于 Dubbo Logger 组件。代码如下:

```plain text plain public static void setNettyLoggerFactory() { InternalLoggerFactory factory = InternalLoggerFactory.getDefaultFactory(); if (factory == null || !(factory instanceof DubboLoggerFactory)) { InternalLoggerFactory.setDefaultFactory(new DubboLoggerFactory()); } }

1
2
3
4
5
6
7
8
9
10
11
---

- 设置 Netty 日志工厂为 DubboLoggerFactory 。

---

[DubboLoggerFactory](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-netty4/src/main/java/com/alibaba/dubbo/remoting/transport/netty4/logging/NettyHelper.java#L27-L32) ,代码如下:

```plain text
plain static class DubboLoggerFactory extends InternalLoggerFactory {      @Override     public InternalLogger newInstance(String name) {         return new DubboLogger(LoggerFactory.getLogger(name));     }  }

  • 创建 DubboLogger 对象,并传入 Logger name 对应的 Dubbo 对象,而 Dubbo Logger 的对象,基于 Dubbo SPI 机制加载。

DubboLogger ,代码如下:

```plain text plain static class DubboLogger extends AbstractInternalLogger { /** * 日志组件 */ private com.alibaba.dubbo.common.logger.Logger logger; DubboLogger(Logger logger) { super(logger.getClass().getName()); this.logger = logger; } @Override public void info(String msg) { if (isInfoEnabled()) { logger.info(msg); } } // … 省略类似代码 }

1
2
3
4
5
6
7
8
9
10
11
12
---

- 在实现的每个方法中,直接调用
logger
对应的方法。
- 在类似
#info(String format, Object… arguments)
方法中,需要对日志内容进行格式化,代码如下:

```plain text
plain @Override public void info(String format, Object... arguments) {     if (isInfoEnabled()) {         FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);         logger.info(ft.getMessage(), ft.getThrowable());     } }

```plain text

  • 我们看到需要

    FormattingTupleMessageFormatter 这两个类,用于格式化。实际上,这两个类是直接从 Netty4 中拷贝出来的两个类,因为它们是 package 修饰的类,在 DubboLogger 中,无法访问到它们,所以进行复制解决。 ```

本文由作者按照 CC BY 4.0 进行授权