NIO服务器(七)之Netty3实现
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(六)之 Netty4 实现》 一文,分享在 dubbo-remoting-netty 中,Netty3 如何接入实现。
因为 Netty3 的接入代码,和 Netty4 基本是一致,主要是一些 Netty 不同版本的 API 差异,所以本文,会相对简介,只重点分享一些差异的地方。
涉及如下类:
类图
友情提示:在当前版本,默认情况下,使用 Netty3 ,如果想配置成 Netty4 ,请参考文档:《Dubbo 用户指南 —— Netty4》
2. NettyTransporter
com.alibaba.dubbo.remoting.transport.netty.NettyTransporter ,和 dubbo-remoting-netty4 一致,省略。
3. NettyChannel
com.alibaba.dubbo.remoting.transport.netty.NettyChannel ,和 dubbo-remoting-netty4 一致,省略。
4. NettyHandler
com.alibaba.dubbo.remoting.transport.netty.NettyHandler ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyServer 和 NettyClient 的处理器,统一使用。这一点,不同于 dubbo-remoting-netty4 ,服务端和服务器使用不同的两个处理器。相比来说,dubbo-remoting-netty4 控制更精细,影响不大。
当然也有一个原因,Dubbo ChannelHandler 基于 Netty3 的 SimpleChannelHandler 为设计原型。因此,在 dubbo-remoting-netty4 中,需要将 DubboHandler 的方法,适配到 Netty4 的 ChannelDuplexHandler 的方法。
NettyHandler 和 HeaderExchangeHandler 类似。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Sharable
public class NettyHandler extends SimpleChannelHandler {
/**
* Dubbo Channel 集合
*/
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
/**
* URL
*/
private final URL url;
/**
* Dubbo ChannelHandler
*/
private final ChannelHandler handler;
public NettyHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
// ... 省略实现方法
}
实现方法
每个实现的方法,调用 handler 对应的方法。以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// 创建 NettyChannel 对象
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// 添加到 `channels` 中
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
}
// 提交给 `handler` 处理器。
handler.connected(channel);
} finally {
// 移除 NettyChannel 对象,若已断开
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
其他方法,胖友自己查看。
5. NettyServer
com.alibaba.dubbo.remoting.transport.netty.NettyServer ,实现 Server 接口,继承 AbstractServer 抽象类,Netty 服务器实现类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 通道集合
*/
private Map<String, Channel> channels; // <ip:port, channel>
private ServerBootstrap bootstrap;
private org.jboss.netty.channel.Channel channel;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
- 和 dubbo-remoting-netty4 基本一致。
启动服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 创建线程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
// 创建 ChannelFactory 对象
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// 实例化 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
// 创建 NettyHandler 对象
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
// 设置 `channels` 属性
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder()); // 解码
pipeline.addLast("encoder", adapter.getEncoder()); // 解码
pipeline.addLast("handler", nettyHandler); // 处理器
return pipeline;
}
});
// 服务器绑定端口监听
// bind
channel = bootstrap.bind(getBindAddress());
}
- 和 dubbo-remoting-netty4 基本一致,下面只说一些差异的地方。
- 第 6 至 8 行:创建线程池,不同于 dubbo-remoting-netty4 中,创建线程组 NioEventLoopGroup 。
- 第 11 行:基于 boss 和 worker,创建 ChannelFactory 对象。
- 并未设置 ServerBootstrap 的可选项。
关闭服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void doClose() {
// 关闭服务器通道
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
// 关闭连接到服务器的客户端通道
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && !channels.isEmpty()) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
// 优雅关闭 ServerBootstrap
try {
if (bootstrap != null) {
// release external resource.
bootstrap.releaseExternalResources();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
// 清空连接到服务器的客户端通道
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
- 和 dubbo-remoting-netty4 基本一致,下面只说一些差异的地方。
- 第 27 至 35 行:调用 Bootstrap#releaseExternalResources() 方法,释放 ServerBootstrap 相关的资源。
6. NettyClient
com.alibaba.dubbo.remoting.transport.netty.NettyClient ,继承 AbstractNettyClient 抽象类,Netty 客户端实现类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
// ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
Constants.DEFAULT_IO_THREADS);
private ClientBootstrap bootstrap;
private volatile org.jboss.netty.channel.Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
- channelFactory 属性,【TODO 8027】为啥公用
启动客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 实例化 ServerBootstrap
bootstrap = new ClientBootstrap(channelFactory);
// 设置可选项
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
// 创建 NettyHandler 对象
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
// 设置责任链路
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder()); // 解码
pipeline.addLast("encoder", adapter.getEncoder()); // 编码
pipeline.addLast("handler", nettyHandler); // 处理器
return pipeline;
}
});
}
- 和 dubbo-remoting-netty4 基本一致。
连接服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 连接服务器
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
// 等待连接成功或者超时
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
// 连接成功
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// 关闭老的连接
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 若 NettyClient 被关闭,关闭连接
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
// 设置新连接
} else {
NettyClient.this.channel = newChannel;
}
}
// 发生异常,抛出 RemotingException 异常
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
// 无结果(连接超时),抛出 RemotingException 异常
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
// 未连接,取消任务
if (!isConnected()) {
future.cancel();
}
}
}
- 和 dubbo-remoting-netty4 基本一致,下面只说一些差异的地方。
- 第 9 行:调用 ChannelFuture#awaitUninterruptibly(timeout, TimeUnit) 方法,等待连接成功或超时。这里传入的不是 3000 。
- 第 55 至 60 行:最终结果为未连接,调用 ChannelFuture#cancel(true) 方法,取消任务。
关闭连接
1
2
3
4
5
6
7
8
@Override
protected void doClose() throws Throwable {
/*try {
bootstrap.releaseExternalResources();
} catch (Throwable t) {
logger.warn(t.getMessage());
}*/
}
- 因为 静态 channelFactory 是属性,被多个 NettyClient 共用。
7. Buffer
7.1 NettyBackedChannelBuffer
com.alibaba.dubbo.remoting.transport.netty.NettyBackedChannelBuffer ,实现 ChannelBuffer 接口,基于 Netty3 ChannelBuffer 的 ChannelBuffer 实现类。
构造方法
1
2
3
4
5
6
private org.jboss.netty.buffer.ChannelBuffer buffer;
public NettyBackedChannelBuffer(org.jboss.netty.buffer.ChannelBuffer buffer) {
Assert.notNull(buffer, "buffer == null");
this.buffer = buffer;
}
工厂
1
2
3
4
@Override
public ChannelBufferFactory factory() {
return NettyBackedChannelBufferFactory.getInstance();
}
- 对应的工厂是 NettyBackedChannelBufferFactory
实现方法
每个方法,直接调用 Netty3 ChannelBuffer 对应的方法。
7.2 NettyBackedChannelBufferFactory
com.alibaba.dubbo.remoting.transport.netty.NettyBackedChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 NettyBackedChannelBuffer 的工厂。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public ChannelBuffer getBuffer(int capacity) {
return new NettyBackedChannelBuffer(ChannelBuffers.dynamicBuffer(capacity)); // ChannelBuffers 为 `org.jboss.netty.buffer` 包下
}
@Override
public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
// 创建 Netty3 ChannelBuffer 对象
org.jboss.netty.buffer.ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(length); // ChannelBuffers 为 `org.jboss.netty.buffer` 包下
// 写入数据
buffer.writeBytes(array, offset, length);
// 创建 NettyBackedChannelBuffer 对象
return new NettyBackedChannelBuffer(buffer);
}
@Override
public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
return new NettyBackedChannelBuffer(ChannelBuffers.wrappedBuffer(nioBuffer)); // ChannelBuffers 为 `org.jboss.netty.buffer` 包下
}
- 注意,此处的 ChannelBuffers 是 org.jboss.netty.buffer 包下的。
8. NettyCodecAdapter
com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter ,Netty 编解码适配器,将 Dubbo 编解码器 适配成 Netty3 的编码器和解码器。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Netty 编码器
*/
private final ChannelHandler encoder = new InternalEncoder();
/**
* Netty 解码器
*/
private final ChannelHandler decoder = new InternalDecoder();
/**
* Dubbo 编解码器
*/
private final Codec2 codec;
/**
* Dubbo URL
*/
private final URL url;
/**
* 网络读写缓冲区大小
*/
private final int bufferSize;
/**
* Dubbo ChannelHandler
*/
private final com.alibaba.dubbo.remoting.ChannelHandler handler;
public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
// 设置 `bufferSize`
int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
- bufferSize 属性,网络读写缓冲区大小,默认 8K 。这是 dubbo-remoting-netty4 的 NettyCodecAdapter 所不需要的。用于下面 「8.2 InternalDecoder」 ,消息解码时使用。
8.1 InternalEncoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Sharable
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
// 创建 HeapChannelBuffer 对象
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
// 获得 NettyChannel 对象
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
// 编码
codec.encode(channel, buffer, msg);
} finally {
// 移除 NettyChannel 对象,若断开连接
NettyChannel.removeChannelIfDisconnected(ch);
}
// 返回 Netty ChannelBuffer 对象
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}
- org.jboss.netty.handler.codec.oneone.OneToOneEncoder 抽象类,Netty3 编码器。
- 代码比较简单,胖友自己看注释。
8.2 InternalDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
private class InternalDecoder extends SimpleChannelUpstreamHandler {
/**
* 未读完的消息 Buffer
*/
private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
// 跳过非 ChannelBuffer
Object o = event.getMessage();
if (!(o instanceof ChannelBuffer)) {
ctx.sendUpstream(event);
return;
}
// 无可读,跳过
ChannelBuffer input = (ChannelBuffer) o;
int readable = input.readableBytes();
if (readable <= 0) {
return;
}
// 合并 `buffer` + `input` 成 `message`
com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
if (buffer.readable()) { // 有未读完的,需要拼接
if (buffer instanceof DynamicChannelBuffer) {
buffer.writeBytes(input.toByteBuffer());
message = buffer;
} else { // Netty3 ChannelBuffer ,转成 DynamicChannelBuffer 。
int size = buffer.readableBytes() + input.readableBytes();
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
message.writeBytes(buffer, buffer.readableBytes());
message.writeBytes(input.toByteBuffer());
}
} else { // 无未读完的,直接创建
message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(input.toByteBuffer());
}
// 获得 NettyChannel 对象
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
// 循环解析,直到结束
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
try {
msg = codec.decode(channel, message);
} catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
// 解码到消息,触发一条消息
} else {
//is it possible to go here ? 芋艿:不可能,哈哈哈
if (saveReaderIndex == message.readerIndex()) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
if (msg != null) {
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while (message.readable());
} finally {
// 有剩余可读的,压缩并缓存
if (message.readable()) {
message.discardReadBytes();
buffer = message;
// 无剩余的,设置空 Buffer
} else {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
}
// 移除 NettyChannel 对象,若断开连接
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
ctx.sendUpstream(e);
}
}
- 继承 org.jboss.netty.channel.SimpleChannelUpstreamHandler 类。
- buffer 属性,未读完的消息 Buffer 。在 #messageReceived(ctx, event) 方法中,我们在做拆包粘包的处理过程中,可能收到数据是不完整的。例如,不足以解析成一条 Dubbo Request 。那么,我们就需要将收到的,缓存到 buffer 中。
- 第 10 至 15 行:跳过非 ChannelBuffer 。
- 第 17 至 22 行:跳过无可读的。
- 第 24 至 38 行:合并 两类三种 buffer + input 成 message 。有情况,胖友看下注释。
- 第 41 行:获得 NettyChannel 对象。
- 第 42 至 69 行:循环解析,直到结束。此处,和 dubbo-remoting-netty4 的解码流程,就是一致的了。
- 第 71 至 75 行:有剩余的部分,压缩并缓存到 buffer 中。
- 第 75 至 78 行:完全读完,设置 buffer 为空( EMPTY_BUFFER )。
- 第 80 行:移除 NettyChannel 对象,若断开连接。
9. 日志工厂
和 netty-remoting-netty4 的日志工厂,基本一致。差异点是 DubboLogger ,无需实现类似 #log(format, arguments) 等需要格式化的方法。因此,无需复制 FormattingTuple 、MessageFormatter 类。
