NIO服务器(六)之Netty4实现
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
在前面的文章,我们已经了解了 dubbo-remoting-api 如何实现 NIO 服务器的抽象 API 层。那么本文来看看,dubbo-remoting-netty4 ,如何将 Netty4 接入实现。
涉及如下类:
友情提示:在当前版本,默认情况下,使用 Netty3 ,如果想配置成 Netty4 ,请参考文档:《Dubbo 用户指南 —— Netty4》
2. NettyTransporter
com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter ,实现 Transporter 接口,基于 Netty4 的网络传输实现类。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class NettyTransporter implements Transporter {
/**
* 拓展名
*/
public static final String NAME = "netty4";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
- NAME静态 属性,拓展名。
- NettyTransporter 基于 Dubbo SPI 机制加载。
- 创建 NettyServer 和 NettyClient 对象。
3. NettyChannel
io.netty.channel.ChannelFuture.NettyChannel ,实现 AbstractChannel 抽象类,封装 Netty Channel 的通道实现类。
NettyChannel 和 HeaderExchangeChannel 很类似。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 通道集合
*/
private static final ConcurrentMap<io.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();
/**
* 通道
*/
private final io.netty.channel.Channel channel;
/**
* 属性集合
*/
private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
private NettyChannel(io.netty.channel.Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
- channel装饰器 属性,通道。NettyChannel 是传入 channel 属性的 ,每个实现的方法,都会调用 channel 。
- attributes注意 属性,属性集合。 , setAttribute(…) 等方法,使用的是该属性,而不是 io.netty.channel.Channel 的。
- channelMap静态 属性,通道集合。在实际 Netty Handler 里(例如下面我们会看到的 NettyServerHandler 和 NettyClientHandler),每个方法参数里,传递的是 io.netty.channel.Channel 对象。通过 NettyChannel.channelMap 中,获得对应的 NettyChannel 对象。
- #getOrAddChannel(ch, url, handler)静态 方法,创建 NettyChannel 对象。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static NettyChannel getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
if (ch.isActive()) { // 连接中
ret = channelMap.putIfAbsent(ch, nettyChannel); // 添加到 channelMap
}
if (ret == null) {
ret = nettyChannel;
}
}
return ret;
}
#removeChannelIfDisconnected(ch)静态方法,移除 NettyChannel 对象。代码如下:
1
2
3
4
5
static void removeChannelIfDisconnected(io.netty.channel.Channel ch) {
if (ch != null && !ch.isActive()) { // 未连接
channelMap.remove(ch); // 移除出channelMap
}
}
发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void send(Object message, boolean sent) throws RemotingException {
// 检查连接状态
super.send(message, sent);
boolean success = true; // 如果没有等待发送成功,默认成功。
int timeout = 0;
try {
// 发送消息
ChannelFuture future = channel.writeAndFlush(message);
// 等待发送成功
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
// 若发生异常,抛出
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
// 发送失败,抛出异常
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
- 第 4 行:调用 #send(message, sent) 方法,检查连接状态。
- 第 6 行: success ,是否执行成功。若不需要等待发送成功( sent = false ) ,默认成功。
- 第 10 行:调用真正的 io.netty.channel.Channel#writeAndFlush(message) 方法,发送消息。
- 第 11 至 15 行:若需要等待发送成功( sent = true ),等待直到成功或超时。
- 第 16 至 20 行:若发生异常,抛出异常。
- 第 26 至 29 行:若发送失败,抛出异常。
关闭通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
@SuppressWarnings("Duplicates")
public void close() {
// 标记关闭
try {
super.close();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
// 移除连接
try {
removeChannelIfDisconnected(channel);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
// 清空属性 attributes
try {
attributes.clear();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
// 关闭真正的通道 channel
try {
if (logger.isInfoEnabled()) {
logger.info("Close netty channel " + channel);
}
channel.close();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
- 注意 ,最后一步才关闭真正的通道,避免中间状态。
其它方法
其它实现方法,比较简单,胖友自己瞅瞅。例如:
1
2
3
4
@Override
public boolean isConnected() {
return !isClosed() && channel.isActive();
}
4. Server
4.1 NettyServer
com.alibaba.dubbo.remoting.transport.netty4.NettyServer ,实现 Server 接口,继承 AbstractServer 抽象类,Netty 服务器实现类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 通道集合
*/
private Map<String, Channel> channels; // <ip:port, channel>
private ServerBootstrap bootstrap;
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) /* 设置线程名到 URL 上 */));
}
- channels指定地址 属性,连接到服务器的客户端通道集合。笔者在看 NettyChannel 时,在有 NettyChannel.channels ,那么此处的 channels 不是重复了么?答案在
#getChannel(remoteAddress)方法,获得 的 Channel 对象。代码如下:
1
2
3
4
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
return channels.get(NetUtils.toAddressString(remoteAddress));
}
- bootstrap channel bossGroup workerGroup 属性, 不熟悉这几个的胖友,请 Google 一下 Netty 入门噶。
- ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) 代码段,包装 ChannelHandler ,实现 Dubbo 线程模型的功能。
启动服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Override
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 实例化 ServerBootstrap
bootstrap = new ServerBootstrap();
// 创建线程组
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// 创建 NettyServerHandler 对象
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
// 设置 `channels` 属性
channels = nettyServerHandler.getChannels();
bootstrap
// 设置它的线程组
.group(bossGroup, workerGroup)
// 设置 Channel类型
.channel(NioServerSocketChannel.class) // Server
// 设置可选项
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 设置责任链路
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码
.addLast("encoder", adapter.getEncoder()) // 解码
.addLast("handler", nettyServerHandler); // 处理器
}
});
// 服务器绑定端口监听
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
- 参考:《Netty4.x中文教程系列(六) 从头开始Bootstrap》
- 第 4 行:设置 Netty 的日志工厂,在 「7. 日志处理」 详细解析。
- 第 7 行:实例化 ServerBootstrap 对象。
- 第 9 至 12 行:创建 bossGroup workerGroup 线程组。
- 第 15 行:创建 NettyServerHandler 对象。
- 第 17 行:设置 channels 属性,指向 NettyServerHandler.channels 属性。
- 第 21 行:设置线程组。
- 第 23 行:设置 Channel 类型为 NioServerSocketChannel 。
- 第 24 至 27 行:设置可选项。
- PooledByteBufAllocator.DEFAULT《Netty 调优》 ,对象池,重用缓冲区。参见 。
- 第 29 至 39 行:设置责任链。
- 第 33 行:创建 NettyCodecAdapter 对象。NettyCodecAdapter 在 「6. 2 NettyCodecAdapter」 详细解析。
- 第 35 行:调用 NettyCodecAdapter#getDecoder() 方法,获得解码器,并设置。
- 第 37 行:调用 NettyCodecAdapter#getEncoder() 方法,获得编码器,并设置。
- 第 37 行:设置处理器 handler 。
- 第 41 至 45 行:服务器绑定端口监听,正式启动 啦。
获得所有通道
1
2
3
4
5
6
7
8
9
10
11
public Collection<Channel> getChannels() {
Collection<Channel> chs = new HashSet<Channel>();
for (Channel channel : this.channels.values()) {
if (channel.isConnected()) { // 已连接,返回
chs.add(channel);
} else { // 未连接,移除
channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
}
}
return chs;
}
关闭服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void doClose() {
// 关闭服务器通道
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
// 关闭连接到服务器的客户端通道
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
// 优雅关闭工作组
try {
if (bootstrap != null) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
// 清空连接到服务器的客户端通道
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
- 第 3 至 11 行:关闭服务器通道( io.netty.channel.Channel )。
- 第 12 至 26 行:关闭连接到服务器的客户端通道( com.alibaba.dubbo.remoting.Channel ) 。
- 第 27 至 35 行:优雅关闭工作组。
- 第 36 至 43 行:清空连接到服务器的客户端通道。
4.2 NettyServerHandler
com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyServer 的处理器。
NettyServerHandler 和 HeaderExchangeHandler 类似。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@io.netty.channel.ChannelHandler.Sharable
public class NettyServerHandler extends ChannelDuplexHandler {
/**
* Dubbo Channel 集合
*/
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
/**
* URL
*/
private final URL url;
/**
* Dubbo ChannelHandler
*/
private final ChannelHandler handler;
public NettyServerHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
// ... 省略实现方法
}
@io.netty.channel.ChannelHandler.Sharable注解:《netty4中注解Sharable的使用场景?》 FROM Sharable 注解主要是用来标示一个 ChannelHandler 可以被安全地共享,即可以在多个Channel 的 ChannelPipeline 中使用同一个ChannelHandler ,而不必每一个ChannelPipeline 都重新 new 一个新的 ChannelHandler 。- channels 属性,连接到服务器的 Dubbo Channel 集合。
- handler 属性,Dubbo ChannelHandler。NettyServerHandler 对每个事件的处理,会调用 handler 对应的方法。
实现方法
每个实现的方法,处理都比较类似,一般是提交给 handler 做相应的处理。艿艿已经添加了代码注释,胖友可以自己看看。下面以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 交给下一个节点处理
// 芋艿:实际此处不要调用也没关系,因为 NettyServerHandler 没下一个节点。
ctx.fireChannelActive();
// 创建 NettyChannel 对象
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
// 添加到 `channels` 中
if (channel != null) {
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
// 提交给 `handler` 处理器。
handler.connected(channel);
} finally {
// 移除 NettyChannel 对象,若已断开
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
- 《Netty 框架总结「ChannelHandler 及 EventLoop」》
- 第 5 行:调用 此处不要调用也没关系 ChannelHandlerContext#fireChannelActive() 方法,交给下一个节点处理。实际上, ,因为 NettyServerHandler 没下一个节点。
- 第 8 行:调用 NettyChannel#getOrAddChannel(channel, url, handler) 方法,创建 NettyChannel 对象。
- 第 10 至 13 行:添加到 channels 中。
- 第 15 行:调用 ChannelHandler#connected(channel) 方法,处理连接事件。
- 第 16 至 19 行:调用 已断开 NettyChannel#removeChannelIfDisconnected(channel) 方法,移除 NettyChannel 对象,若 。
5. Client
5.1 NettyClient
com.alibaba.dubbo.remoting.transport.netty4.NettyClient ,继承 AbstractNettyClient 抽象类,Netty 客户端实现类。
构造方法
1
2
3
4
5
6
7
8
9
10
// 【TODO 8027】为啥公用
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile io.netty.channel.Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
- nioEventLoopGroup 属性,【TODO 8027】为啥公用
- channelvolatile多线程 属性,通道,有 修饰符。因为客户端可能会断开重连,需要保证 的可见性。
- #wrapChannelHandler(url, handler) 代码段,包装 ChannelHandler ,实现 Dubbo 线程模型的功能。
启动客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
protected void doOpen() {
// 设置日志工厂
NettyHelper.setNettyLoggerFactory();
// 创建 NettyClientHandler 对象
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
// 实例化 ServerBootstrap
bootstrap = new Bootstrap();
bootstrap
// 设置它的线程组
.group(nioEventLoopGroup)
// 设置可选项
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
// 设置 Channel类型
.channel(NioSocketChannel.class);
// 设置连接超时时间
if (getTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
}
// 设置责任链路
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) {
// 创建 NettyCodecAdapter 对象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码
.addLast("encoder", adapter.getEncoder()) // 解码
.addLast("handler", nettyClientHandler); // 处理器
}
});
}
- 和 NettyClient#doOpen() 方法类似。我们仅仅说一些差异点。
- 第 7 行:创建 NettyClientHandler 对象。
- 第 13 行:设置线程组,没有 bossGroup 。
- 第 20 行:设置 Channel 类型为 NioSocketChannel 。
- 第 22 至 27 行:设置连接超时时间。
连接服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
@SuppressWarnings("Duplicates")
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 连接服务器
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
// 等待连接成功或者超时
boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
// 连接成功
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// 关闭老的连接
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 若 NettyClient 被关闭,关闭连接
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
// 设置新连接
} else {
NettyClient.this.channel = newChannel;
}
}
// 发生异常,抛出 RemotingException 异常
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
// 无结果(连接超时),抛出 RemotingException 异常
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
- 第 6 行:调用 Bootstrap#connect(remoteAddress) 方法,连接服务器。
- 第 9 行:调用 ChannelFuture#awaitUninterruptibly(3000, TimeUnit) 方法,等待连接成功或超时。这里传入 3000 貌似不太正确,应该传入 ChannelOption.CONNECT_TIMEOUT_MILLIS 的实际值。
- 第 10 至 43 行:连接成功。
- 第 14 至 26 行:若存在老的 连接,调用 Channel#close() 方法,进行关闭。
- 第 29 至 38 行:若 NettyClient 被关闭,调用 新的连接 Channel#close() 方法,关闭 。
- 第 39 至 42 行:设置新的连接 到 channel 。
- 第 44 至 47 行:发生异常,抛出 Server RemotingException 异常。
- 第 48 至 54 行:无结果(连接超时),抛出 Client RemotingException 异常。
- 第 55 至 59 行:// 【TODO 8028】为什么不取消 future TODO 可能,和 3000 有关系。< 3000 强制 3000 。以及等待 3000
断开连接
1
2
3
4
5
6
7
8
@Override
protected void doDisConnect() {
try {
NettyChannel.removeChannelIfDisconnected(channel);
} catch (Throwable t) {
logger.warn(t.getMessage());
}
}
关闭连接
1
2
3
4
5
@Override
protected void doClose() throws Throwable {
//can't shutdown nioEventLoopGroup
//nioEventLoopGroup.shutdownGracefully();
}
5.2 NettyClientHandler
com.alibaba.dubbo.remoting.transport.netty4.NettyClientHandler ,实现 io.netty.channel.ChannelDuplexHandler 类,NettyClient 的处理器。
NettyServerHandler 和 HeaderExchangeHandler 类似。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@io.netty.channel.ChannelHandler.Sharable
public class NettyClientHandler extends ChannelDuplexHandler {
/**
* Dubbo URL
*/
private final URL url;
/**
* Dubbo ChannelHandler
*/
private final ChannelHandler handler;
public NettyClientHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
// ... 省略实现方法
}
实现方法
NettyClientHandler 的处理方式,和 NettyServerHandler 大体一致,但是也存在一定的差异,以 #channelActive(ChannelHandlerContext) 方法举例子,代码如下:
1
2
3
4
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
}
- 不同于 NettyServerHandler 的该方法,会提交给 handler 继续处理。因为,客户端不会被连接,无需做连入 Channel 的管理。
其他方法,胖友自己查看。
6. NettyBackedChannelBuffer
com.alibaba.dubbo.remoting.transport.netty4.NettyBackedChannelBuffer ,实现 ChannelBuffer 接口,基于 Netty ByteBuf 的 ChannelBuffer 实现类。
构造方法
1
2
3
4
5
6
private ByteBuf buffer;
public NettyBackedChannelBuffer(ByteBuf buffer) {
Assert.notNull(buffer, "buffer == null");
this.buffer = buffer;
}
工厂
1
2
3
4
5
@Override
//has nothing use
public ChannelBufferFactory factory() {
return null;
}
- 无使用工厂的地方。
- 另外,ByteBuf 默认情况下,容量为 Integer.MAX_VALUE 。
实现方法
每个方法,直接调用 ByteBuf 对应的方法。 ChannelBuffer 是以 ByteBuf 为原型,设计的接口 API 。
7. NettyCodecAdapter
com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter ,Netty 编解码适配器,将 Dubbo 编解码器 适配成 Netty4 的编码器和解码器。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Netty 编码器
*/
private final ChannelHandler encoder = new InternalEncoder();
/**
* Netty 解码器
*/
private final ChannelHandler decoder = new InternalDecoder();
/**
* Dubbo 编解码器
*/
private final Codec2 codec;
/**
* Dubbo URL
*/
private final URL url;
/**
* Dubbo ChannelHandler
*/
private final com.alibaba.dubbo.remoting.ChannelHandler handler;
public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
}
7.1 InternalEncoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 创建 NettyBackedChannelBuffer 对象
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
// 获得 NettyChannel 对象
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
// 编码
codec.encode(channel, buffer, msg);
} finally {
// 移除 NettyChannel 对象,若断开连接
NettyChannel.removeChannelIfDisconnected(ch);
}
}
}
- io.netty.handler.codec.MessageToByteEncoder抽象类 ,Netty4 编码器 。
- 代码比较简单,胖友自己看注释。
7.2 InternalDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
// 创建 NettyBackedChannelBuffer 对象
ChannelBuffer message = new NettyBackedChannelBuffer(input);
// 获得 NettyChannel 对象
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// 循环解析,直到结束
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
// 记录当前读进度
saveReaderIndex = message.readerIndex();
// 解码
try {
msg = codec.decode(channel, message);
} catch (IOException e) {
throw e;
}
// 需要更多输入,即消息不完整,标记回原有读进度,并结束
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
// 解码到消息,添加到 `out`
} else {
//is it possible to go here ? 芋艿:不可能,哈哈哈
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
// 移除 NettyChannel 对象,若断开连接
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
- io.netty.handler.codec.ByteToMessageDecoder抽象类 ,Netty4 解码器 。
- 代码比较简单,胖友自己看注释。
8. 日志工厂
在 《Dubbo 用户指南 —— 日志适配》 文档,提到:
自 2.2.1 开始,dubbo 开始内置 log4j、slf4j、jcl、jdk 这些日志框架的适配。
在 《Netty源码笔记 —— Netty日志处理》 文档,我们可以看到 Netty 支持实现自定义的日志工厂。通过这样的方式,我们可以接入 Dubbo 的日志适配。
下面,我们来看看具体的代码实现。
调用 NettyHelper#setNettyLoggerFactory() 方法,设置日志工厂,基于 Dubbo Logger 组件。代码如下:
1
2
3
4
5
6
public static void setNettyLoggerFactory() {
InternalLoggerFactory factory = InternalLoggerFactory.getDefaultFactory();
if (factory == null || !(factory instanceof DubboLoggerFactory)) {
InternalLoggerFactory.setDefaultFactory(new DubboLoggerFactory());
}
}
- 设置 Netty 日志工厂为 DubboLoggerFactory 。
DubboLoggerFactory ,代码如下:
1
2
3
4
5
6
7
8
static class DubboLoggerFactory extends InternalLoggerFactory {
@Override
public InternalLogger newInstance(String name) {
return new DubboLogger(LoggerFactory.getLogger(name));
}
}
- 创建 DubboLogger 对象,并传入 Logger name 对应的 Dubbo 对象,而 Dubbo Logger 的对象,基于 Dubbo SPI 机制加载。
DubboLogger ,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static class DubboLogger extends AbstractInternalLogger {
/**
* 日志组件
*/
private com.alibaba.dubbo.common.logger.Logger logger;
DubboLogger(Logger logger) {
super(logger.getClass().getName());
this.logger = logger;
}
@Override
public void info(String msg) {
if (isInfoEnabled()) {
logger.info(msg);
}
}
// ... 省略类似代码
}
- 在实现的每个方法中,直接调用 logger 对应的方法。
- 在类似
#info(String format, Object… arguments)方法中,需要对日志内容进行格式化,代码如下:
1
2
3
4
5
6
7
@Override
public void info(String format, Object... arguments) {
if (isInfoEnabled()) {
FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments);
logger.info(ft.getMessage(), ft.getThrowable());
}
}
我们看到需要 FormattingTuple 和 MessageFormatter 这两个类,用于格式化。实际上,这两个类是直接从 Netty4 中拷贝出来的两个类,因为它们是 package 修饰的类,在 DubboLogger 中,无法访问到它们,所以进行复制解决。
