NIO服务器(二)之Transport层
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(一)之抽象 API》 一文,分享 dubbo-remoting-api 模块, transport 包,网络传输层。
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
涉及的类图如下:
- 白色部分,为通用接口。
- 蓝色部分,为
transport包下的类。 - 整个类图,我们分成六个部分:
- Client
- Server
- Channel
- ChannelHandler
- Codec
- Dispacher
- 从流程上来说,我们分成:
- Server
- 启动
- 关闭
- Client
- 启动
- 关闭
- ChannelHandler
- 处理连接
- 处理断开
- 发送消息
- 接收消息
- 处理异常
- Server
艿艿的旁白:涉及较多类和流程,内容不是很线性,可能分享的比较凌乱,还望胖友谅解。建议,读 2-3 遍,并且做一些调试。
2. AbstractPeer
com.alibaba.dubbo.remoting.transport.AbstractPeer ,实现 Endpoint、ChannelHandler 接口,Peer 抽象类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 通道处理器
*/
private final ChannelHandler handler;
/**
* URL
*/
private volatile URL url;
/**
* 正在关闭
*
* {@link #startClose()}
*/
// closing closed means the process is being closed and close is finished
private volatile boolean closing;
/**
* 关闭完成
*
* {@link #close()}
*/
private volatile boolean closed;
public AbstractPeer(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
handler属性,通道处理器,通过构造方法传入。实现的 ChannelHandler 的接口方法,直接调用handler的方法,进行执行逻辑处理。url属性,URL,通过构造方法传入。通过该属性,传递 Dubbo 服务引用和服务暴露的配置项。closing属性,正在关闭,调用#startClose()方法,变更。closed属性,关闭完成,调用#close()方法,变更。
发送消息
1
2
3
4
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
sent配置项:true等待消息发出,消息发送失败将抛出异常。false不等待消息发出,将消息放入 IO 队列,即刻返回。- 详细参见:《Dubbo 用户指南 —— 异步调用》
其他方法
胖友点击 AbstractPeer ,再看看所有的方法。
2.1 AbstractEndpint
com.alibaba.dubbo.remoting.transport.AbstractPeer.AbstractEndpint ,实现 Resetable 接口,继承 AbstractPeer 抽象类,端点抽象类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 编解码器
*/
private Codec2 codec;
/**
* 超时时间
*/
private int timeout;
/**
* 连接超时时间
*/
private int connectTimeout;
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url);
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
codec属性,编解码器。在构造方法中,可以看到调用#getChannelCodec(url)方法,基于url参数,加载对应的 Codec 实现对象。代码如下:
1
2
3
4
5
6
7
8
9
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
// 例如,在 DubboProtocol 中,会获得 DubboCodec
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
}
}
- 第 3 行:基于 Dubbo SPI 机制,加载对应的 Codec 实现对象。例如,在 DubboProtocol 中,会获得 DubboCodec 对象。
- 第 6 行:Codec 接口,已经废弃了,目前 Dubbo 项目里,也没有它的拓展实现。
重置属性
#reset(url) 实现方法,使用新的 url 属性,可重置 codec、timeout、connectTimeout 属性。 已经添加了谅解,胖友点击可看。
3. Client
3.1 AbstractClient
com.alibaba.dubbo.remoting.transport.AbstractClient ,实现 Client 接口,继承 AbstractEndpoint 抽象类,客户端抽象类,重点实现了公用的重连逻辑,同时抽象了连接等模板方法,供子类实现。抽象方法如下:
1
2
3
4
5
6
7
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
protected abstract void doConnect() throws Throwable;
protected abstract void doDisConnect() throws Throwable;
protected abstract Channel getChannel();
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 重连定时任务执行器
*/
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
/**
* 发送消息时,若断开,是否重连
*/
private final boolean send_reconnect;
/**
* 重连 warning 的间隔.(waring多少次之后,warning一次) //for test
*/
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
/**
* 关闭超时时间
*/
private final long shutdown_timeout;
/**
* 线程池
*
* 在调用 {@link #wrapChannelHandler(URL, ChannelHandler)} 时,会调用 {@link com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler} 创建
*/
protected volatile ExecutorService executor;
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 从 URL 中,获得重连相关配置项
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
// 初始化客户端
try {
doOpen();
} catch (Throwable t) {
close(); // 失败,则关闭
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
// 连接服务器
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close(); // 失败,则关闭
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close(); // 失败,则关闭
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
// 获得线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
.get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension()
.remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
reconnectExecutorService属性,重连定时任务执行器。在客户端连接服务端时,会创建后台任务,定时检查连接,若断开,会进行重连。- 第 27 至 31 行:从 URL 中,获得重连相关配置项。
- 第 33 至 41 行:调用 抽象
#doOpen()方法,初始化客户端。若异常,调用#close()方法,进行关闭。 - 第 43 至 63 行:调用 实现
#connect()方法,连接服务器。若异常,调用#close()方法,进行关闭。- 第 51 至 57 行:若是连接失败 RemotingException ,若开启了 启动时检查,则调用
#close()方法,进行关闭。
- 第 51 至 57 行:若是连接失败 RemotingException ,若开启了 启动时检查,则调用
- 第 66 至 69 行:从 DataStore 中,获得线程池。
- DataStore 在 store(dubbo-common 模块,包下实现)。目前的实现比较简单,可以认为是
ConcurrentMap<String, ConcurrentMap<String, Object>>的集合。胖友可以自己看相关实现。 - 此处的线程池,实际就是 《Dubbo 用户指南 —— 线程模型》 线程池 中说的。在 「8. Dispacher」 中,详细解析。
- DataStore 在 store(dubbo-common 模块,包下实现)。目前的实现比较简单,可以认为是
连接服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 连接锁,用于实现发起连接和断开连接互斥,避免并发。
*/
private final Lock connectLock = new ReentrantLock();
protected void connect() throws RemotingException {
// 获得锁
connectLock.lock();
try {
// 已连接,
if (isConnected()) {
return;
}
// 初始化重连线程
initConnectStatusCheckCommand();
// 执行连接
doConnect();
// 连接失败,抛出异常
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
// 连接成功,打印日志
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
// 设置重连次数归零
reconnect_count.set(0);
// 设置未打印过错误日志
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
// 释放锁
connectLock.unlock();
}
}
- 第 3 行:获得锁。在连接和断开连接时,通过锁,避免并发冲突。
- 第 5 至 8 行:调用
#isConnected()方法,判断连接状态。若已经连接,就不重复连接。代码如下:
1
2
3
4
5
@Override
public boolean isConnected() {
Channel channel = getChannel();
return channel != null && channel.isConnected();
}
该方法,是因为实现 Channel 接口( Client 实现 Channel 接口 ),所以需要实现的。我们可以看到,实际方法内部,调用的是
channel对象,进行判断。其它实现 Channel 的方法,也是这么处理的,例如 #getAttribute(key) 等方法。- 第 10 行:调用
#initConnectStatusCheckCommand()方法,初始化重连线程。- 方法会复杂一些,不杂糅在这里讲。
- 第 14 至 17 行:连接失败,抛出异常 RemotingException 。
- 第 18 至 25 行:连接成功,打印日志。
- 第 26 至 29 行:设置重连次数归零,打印过错误日志状态为否。下面,我们会看到这些状态字段的变更。
- 第 38 行:释放锁。
初始化重连线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 重连次数
*/
private final AtomicInteger reconnect_count = new AtomicInteger(0);
/**
* 重连时,是否已经打印过错误日志。
*/
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
/**
* 重连执行任务 Future
*/
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
/**
* 最后成功连接时间
*/
// the last successed connected time
private long lastConnectedTime = System.currentTimeMillis();
private synchronized void initConnectStatusCheckCommand() {
//reconnect=false to close reconnect
// 获得获得重连频率,默认开启。
int reconnect = getReconnectParam(getUrl());
// 若开启重连功能,创建重连线程
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
// 创建 Runnable 对象
Runnable connectStatusCheckCommand = new Runnable() {
public void run() {
try {
// 未连接,重连
if (!isConnected()) {
connect();
// 已连接,记录最后连接时间
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
// 超过一定时间未连接上,才打印异常日志。并且,仅打印一次。默认,15 分钟。
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
// 每一定次发现未重连,才打印告警日志。默认,1800 次,1 小时。
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
// 发起定时任务
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
- 第 4 行:调用 #getReconnectParam(url) 方法,获得重连频率。默认开启,2000 毫秒。
- 代码比较简单,胖友自己点击方法查看。
- 第 6 至 38 行:若开启重连功能,创建重连线程。
- 第 8 至 35 行:创建 Runnable 对象。
- 第 11 至 13 行:未连接时,调用
#connect()方法,进行重连。 - 第 14 至 17 行:已连接时,记录最后连接时间。
- 第 18 至 33 行:符合错误告警条件时,打印
error或warn日志。为什么要符合条件才打印呢?之前也和朋友聊起来过,线上因为中间件组件,打印了太多的日志,结果整个 JVM 崩了。特别在网络场景 + 大量”无限”重试的场景,特别容易打出满屏的日志。这块,我们可以学习下。另外,Eureka 在集群同步,也有类似处理。
- 第 11 至 13 行:未连接时,调用
- 第 36 行:发起任务,定时检查,是否需要重连。
- 第 8 至 35 行:创建 Runnable 对象。
reconnect=false to close reconnect,从目前代码上来看,未实现#reset(url)方法,在 URL 的reconnect=false配置项时,关闭重连线程。
发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void send(Object message, boolean sent) throws RemotingException {
// 未连接时,开启重连功能,则先发起连接
if (send_reconnect && !isConnected()) {
connect();
}
// 发送消息
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
包装通道处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Constants.java
public static final String DEFAULT_CLIENT_THREADPOOL = "cached";
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
/**
* 包装通道处理器
*
* @param url URL
* @param handler 被包装的通道处理器
* @return 包装后的通道处理器
*/
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
// 设置线程名
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
// 设置使用的线程池类型
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
// 包装通道处理器
return ChannelHandlers.wrap(handler, url);
}
- 第 10 行:调用
ExecutorUtil#setThreadName(url, CLIENT_THREAD_POOL_NAME)方法,设置线程名,即URL.threadname=xxx。代码如下:
1
2
3
4
5
6
public static URL setThreadName(URL url, String defaultName) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, defaultName);
name = new StringBuilder(32).append(name).append("-").append(url.getAddress()).toString();
url = url.addParameter(Constants.THREAD_NAME_KEY, name);
return url;
}
注意,线程名中,包含 URL 的地址信息。
- 第 12 行:设置线程类型,即
URL.threadpool=xxx。默认情况下,使用"cached"类型,这个和 Server 是不同的,下面我们会看到。 - 第 14 行:调用 「8. Dispacher」
ChannelHandlers#wrap(handler, url)方法,包装通道处理器。这里我们不细说,在 中,结合解析。 - 这是一个非常关键的方法,在例如 NettyClient 等里,都会调用该方法。
其他方法
如下方法比较简单,艿艿就不重复啰嗦了。
- #disconnect() 方法,断开连接。
- #reconnect() 方法,主动重连。
- #close() 方法,强制关闭。
- #close(timeout) 方法,优雅关闭。
子类类图
3.2 ClientDelegate
com.alibaba.dubbo.remoting.transport.ClientDelegate ,实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 client 属性的方法。
目前 dubbo-rpc-default 模块中,ChannelWrapper 继承了 ClientDelegate 类。但实际上,ChannelWrapper 重新实现了所有的方法,并且,并未复用任何方法。所以,ClientDelegate 目前用途不大。
4. Server
4.1 AbstractServer
com.alibaba.dubbo.remoting.transport.AbstractServer ,实现 Server 接口,继承 AbstractEndpoint 抽象类,服务器抽象类,重点实现了公用的逻辑,同时抽象了开启、关闭等模板方法,供子类实现。抽象方法如下:
1
2
3
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 线程池
*/
ExecutorService executor;
/**
* 服务地址
*/
private InetSocketAddress localAddress;
/**
* 绑定地址
*/
private InetSocketAddress bindAddress;
/**
* 服务器最大可接受连接数
*/
private int accepts;
/**
* 空闲超时时间,单位:毫秒
*/
private int idleTimeout; //600 seconds
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 服务地址
localAddress = getUrl().toInetSocketAddress();
// 绑定地址
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 服务器最大可接受连接数
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
// 空闲超时时间
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
// 开启服务器
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 获得线程池
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
- 第 24 至 36 行:从 URL 中,加载
localAddress、bindAddress、accepts、idleTimeout配置项。比较难理解的,可能是两个地址属性,如下是比例提供的一个例子:
1
2
- 配置项可在 [#reset(url)](https://github.com/YunaiV/dubbo/blob/31b3f1e868ed2d62c97a26b5cd233a921ce2205a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractServer.java#L80-L129) 方法中,重置属性。 - 第 38 至 47 行:调用 `#doOpen()` 方法,开启服务器。 - 第 49 至 52 行:从 [DataStore](https://github.com/YunaiV/dubbo/blob/31b3f1e868ed2d62c97a26b5cd233a921ce2205a/dubbo-common/src/main/java/com/alibaba/dubbo/common/store/DataStore.java) 中,获得线程池。
- `fixme replace this with better method` **官方**说明,在这块实现上,也不是很满意,后面会优化掉。
被客户端连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClosed()) {
logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
// 超过上限,关闭新的链接
Collection<Channel> channels = getChannels();
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close(); // 关闭新的链接
return;
}
// 连接
super.connected(ch);
}
发送消息
1
2
3
4
5
6
7
8
9
10
11
@Override
public void send(Object message, boolean sent) throws RemotingException {
// 获得所有的客户端的通道
Collection<Channel> channels = getChannels();
// 群发消息
for (Channel channel : channels) {
if (channel.isConnected()) {
channel.send(message, sent);
}
}
}
其他方法
如下方法比较简单,艿艿就不重复啰嗦了。
- #disconnect() 方法,断开连接。
- #close() 方法,强制关闭。
- #close(timeout) 方法,优雅关闭。
子类类图
4.2 ServerDelegate
com.alibaba.dubbo.remoting.transport.ServerDelegate ,实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 server 属性的方法。
目前 dubbo-remoting-p2p 模块中,PeerServer 会继承该类,后续再看。
5. Channel
5.1 AbstractChannel
com.alibaba.dubbo.remoting.transport.AbstractChannel ,实现 Channel 接口,实现 AbstractPeer 抽象类,通道抽象类。
发送消息
1
2
3
4
5
6
7
8
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (isClosed()) {
throw new RemotingException(this, "Failed to send message "
+ (message == null ? "" : message.getClass().getName()) + ":" + message
+ ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
}
}
- 具体的发送方法,子类实现。在 AbstractChannel 中,目前只做状态检查。
子类类图
5.2 ChannelDelegate
com.alibaba.dubbo.remoting.transport.ChannelDelegate ,实现 Channel 接口,通道装饰者实现类。在每个实现的方法里,直接调用被装饰的 channel 属性的方法。
目前 Dubbo 中,暂未用到。
7. ChannelHandler
7.1 ChannelHandlerAdapter
com.alibaba.dubbo.remoting.transport.ChannelHandlerAdapter ,实现 ChannelHandler 接口,通道处理器适配器,每个方法为空实现。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void connected(Channel channel) throws RemotingException { }
@Override
public void disconnected(Channel channel) throws RemotingException { }
@Override
public void sent(Channel channel, Object message) { }
@Override
public void received(Channel channel, Object message) throws RemotingException { }
@Override
public void caught(Channel channel, Throwable exception) throws RemotingException { }
子类,可继承它,仅实现想要的方法。
7.2 ChannelHandlerDispatcher
com.alibaba.dubbo.remoting.transport.ChannelHandlerDispatcher ,实现 ChannelHandler 接口,通道处理器调度器。在它内部,有一个通道处理器数组 channelHandlers 属性。
每个实现的方法,都会循环调用 channelHandlers 的方法,例如:
1
2
3
4
5
6
7
8
9
public void received(Channel channel, Object message) {
for (ChannelHandler listener : channelHandlers) {
try {
listener.received(channel, message);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
搜索了下 ChannelHandlerDispatcher 的使用情况,主要用在 dubbo-remoting-p2p 的 AbstractGroup 中。
7.3 ChannelHandlerDelegate
com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate ,实现 ChannelHandler 接口,通道处理器装饰者接口。方法如下:
1
ChannelHandler getHandler();
正如,我们在上文中说道,装饰器模式,在 dubbo-remoting-api 扮演了非常重要的角色,那么最佳演员就是 ChannelHandlerDelegate 们。下面,开始他们的表演。
7.3.1 AbstractChannelHandlerDelegate
com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate ,实现 ChannelHandlerDelegate 接口,通道处理器装饰者抽象实现类。在每个实现的方法里,直接调用被装饰的 handler 属性的方法。
7.3.2 DecodeHandler
com.alibaba.dubbo.remoting.transport.DecodeHandler ,实现 AbstractChannelHandlerDelegate 抽象类,解码处理器,处理接收到的消息,实现了 Decodeable 接口的情况。
覆写了 #received(channel, message) 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
- 第 3 至 5 行:当消息是 Decodeable 类型时,调用
#decode(message)方法,解析消息。 - 第 7 至 9 行:当消息是 Request 类型时,调用
#decode(message)方法,解析data属性。 - 第 11 至 13 行:当消息是 Response 类型时,调用
#decode(message)方法,解析result属性。 - 第 15 行:调用
ChannelHandler#received(channel, message)方法,将消息交给委托的handler,继续处理。 胖友是否感受到,装饰器模式的好处:通过组合的方式,实现功能的叠加。
解析消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
try {
((Decodeable) message).decode(); // 解析消息
if (log.isDebugEnabled()) {
log.debug(new StringBuilder(32).append("Decode decodeable message ").append(message.getClass().getName()).toString());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn(new StringBuilder(32).append("Call Decodeable.decode failed: ").append(e.getMessage()).toString(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
- 第 2 至 4 行:当类型是 Decodeable 时,调用
Decodeable#decode()方法,进一步解析。 - 在
dubbo-rpc-default项目中,DecodeableRpcInvocation 和 DecodeableRpcResult 实现 Decodeable 接口,后面我们来分享。
7.3.3 MultiMessageHandler
com.alibaba.dubbo.remoting.transport.MultiMessageHandler,实现 AbstractChannelHandlerDelegate 抽象类,多消息处理器,处理一次性接收到多条消息的情况。
覆写了 #received(channel, message) 方法
1
2
3
4
5
6
7
8
9
10
11
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) { // 多消息
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
- 第 3 至 7 行:当消息是 MultiMessage 类型,即多消息循环,提交给
handler处理。 - 第 8 至 10 行:当单消息直接时,提交给
handler处理。
在下面的文章,我们可以看到 ChannelHandlerDelegate 的组合使用的例子。
8. Dispacher
本小节内容,对应 《Dubbo 用户指南 —— 线程模型》。
简单概括这节,以接收消息举例子,代码如下:
1
2
3
executor.execute(new Runnable() {
handler.received(channel, message)
});
将 ChannelHandler 的具体操作,调度到线程池中,这也是为什么这个模块叫 dispacher 的原因。
8.1 ChannelHandlers
com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers ,通道处理器工厂。在上文 「3.1 AbstractClient」,我们看到 AbstractClient#wrapChannelHandler(url, handler) 方法中,会调用 ChannelHandlers#wrap(url, handler) 方法。实际上,Server 部分也会有这样类似的逻辑,只是代码实现上暂未统一。以 dubbo-remoting-netty4 来举例子:
- NettyClient :
1
2
3
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
- NettyServer :
1
2
3
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) /* 设置线程名到 URL 上 */));
}
无论 Client 还是 Server,都是类似的,将传入的 handler,最终使用 ChannelHandlers 进行一次包装。OK,我们来看看包装通道处理器的具体代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 单例
*/
private static ChannelHandlers INSTANCE = new ChannelHandlers();
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(
new HeartbeatHandler(
ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
)
);
}
- 第 11 至 15 行:在这里,我们就看到了多个 ChannelHandlerDelegate 的组合。包括,第 15 行的,返回
Dispatcher#dispatch(handler, url)方法,实际上也是一个 ChannelHandlerDelegate 对象。
8.2 Dispatcher 实现类
在 Dubbo 中,有多种 Dispatcher 的实现,如下:
FROM 《Dubbo 用户指南 —— 线程模型》
all所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。direct所有消息都不派发到线程池,全部在 IO 线程上直接执行。message只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。execution只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。connection在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
子类类图
8.2.1 AllDispatcher
我们以 all 对应的 AllDispatcher 举例子,代码如下:
1
2
3
4
5
6
7
8
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
在该类的 #dispatch(…) 的方法中,我们可以看到创建 AllChannelHandler 对象,并传入 handler 属性。聪慧如你,已经猜到 AllChannelHandler 也是 ChannelHandlerDelegate 类型。也就是说”线程模型“,也是通过装饰器模式,组合而成。
每个 Dispatcher 实现类,都对应一个 ChannelHandler 实现类。默认未配置的情况下,使用 AllDispatcher 调度。
8.2.2 AllChannelHandler
com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler ,实现 WrappedChannelHandler 抽象类。覆写 #connected(channel) 方法如下:
WrappedChannelHandler 是实现 ChannelHandlerDelegate 的抽象类,下文再看。
1
2
3
4
5
6
7
8
9
@Override
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
}
}
- 创建 ChannelEventRunnable 对象,提交给线程池执行。
- 注意,传入的状态为
ChannelState.CONNECTED。不同的实现方法,对应不同的状态。
8.3 ChannelEventRunnable
com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable ,实现 Runnable 接口。代码比较简单,胖友自己看噢。主要分成三部分:
- 构造方法
- ChannelState
- #run() 方法,简化代码如下:
1
2
3
4
5
6
7
8
9
10
11
@Override
public void run() {
switch (state) {
case CONNECTED: handler.connected(channel); break;
case DISCONNECTED: handler.disconnected(channel); break;
case SENT: handler.sent(channel, message); break;
case RECEIVED: handler.received(channel, message); break;
case CAUGHT: handler.caught(channel, exception); break;
default: logger.warn("unknown state: " + state + ", message is " + message);
}
}
8.4 WrappedChannelHandler
com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler,实现 ChannelHandlerDelegate 接口,包装的 WrappedChannelHandler 实现类。
从目前的实现来看,WrappedChannelHandler 继承 AbstractChannelHandlerDelegate 更合适,因为 #connected(channel) 等,实现的方法都是相同的。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 线程池
*/
protected final ExecutorService executor;
/**
* 通道处理器
*/
protected final ChannelHandler handler;
/**
* URL
*/
protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 创建线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
// 添加线程池到 DataStore 中
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
- 第 19 行:基于 Dubbo SPI Adaptive 机制,创建线程池。
- 第 21 至 27 行:添加线程池到 DataStore 中。这就是上文 AbstractClient 或 AbstractServer 从 DataStore 获得线程池的方式。当然,官方也说了,这种方式不是很优雅,有点奇淫技巧,未来会优化掉。
共享线程池
在 WrappedChannelHandler 中,有一个内置的共享线程池,如下:
1
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));
【TODO 8024】搞不懂,这个设计的意图,先mark留着。
子类类图
9. Codec
9.1 CodecSupport
com.alibaba.dubbo.remoting.transport.CodecSupport,编解码工具类,提供查询 Serialization 的功能。
初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 序列化对象集合
* key:序列化类型编号 {@link Serialization#getContentTypeId()}
*/
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
/**
* 序列化名集合
* key:序列化类型编号 {@link Serialization#getContentTypeId()}
* value: 序列化拓展名
*/
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();
static {
// 基于 Dubbo SPI ,初始化
Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
for (String name : supportedExtensions) {
Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
byte idByte = serialization.getContentTypeId();
if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
logger.error("Serialization extension " + serialization.getClass().getName()
+ " has duplicate id to Serialization extension "
+ ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
+ ", ignore this Serialization extension");
continue;
}
ID_SERIALIZATION_MAP.put(idByte, serialization);
ID_SERIALIZATIONNAME_MAP.put(idByte, name);
}
}
Dubbo 提供了多种序列化方式,此处初始化结果,如下图:
查找 Serialization 对象
1
2
3
4
5
6
7
8
9
10
11
12
public static Serialization getSerialization(URL url, Byte id) throws IOException {
Serialization serialization = getSerializationById(id);
String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); // 默认,hessian2
// 出于安全的目的,针对 JDK 的序列化方式(对应编号为 3、4、7),检查连接到服务器的 URL 和实际传输的数据,协议是否一致。
// https://github.com/apache/incubator-dubbo/issues/1138
// Check if "serialization id" passed from network matches the id on this side(only take effect for JDK serialization), for security purpose.
if (serialization == null
|| ((id == 3 || id == 7 || id == 4) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) {
throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
}
return serialization;
}
在最新的 Dubbo 版本中,已经将 serialization 模块,从 dubbo-common 中,独立成 dubbo-serialization。So,我们后面开一个系列来分享。
9.2 AbstractCodec
com.alibaba.dubbo.remoting.transport.AbstractCodec,实现 Codec2 接口,提供如下公用方法:
- #checkPayload(channel, size) 静态方法,校验消息长度。
- #getSerialization(channel) 方法,获得 Serialization 对象。
- #isClientSide(channel) 方法,是否为客户端侧的通道。
- #isServerSide(channel) 方法,是否为服务端侧的通道。
子类类图
编解码器的实现,通过继承的方式,获得更多的功能。每一个 Codec2 类实现对不同消息的编解码。通过协议头来判断,具体使用哪个编解码逻辑。听起来有点绕,我们来看一段简化 ExchangeCodec 的 #decode(…) 例子:
1
2
3
4
5
6
7
8
9
10
11
12
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
// ... 省略
return super.decode(channel, buffer, readable, header);
}
// ... 省略
return decodeBody(channel, is, header);
}
- 第 2 至 7 行:通过 magic number 判断到,并非信息交易的协议头(Dubbo Exchange),转交给父类 TelnetCodec 处理,一般此时是 Telnet 消息。
- 第 8 至 11 行:通过 magic number 判断到,符合信息交易的协议头(Dubbo Exchange),ExchangeCodec 自己处理。
9.2.1 TransportCodec
com.alibaba.dubbo.remoting.transport.codec.TransportCodec,传输编解码器,使用 Serialization 进行序列化/反序列化,直接编解码。
编码消息
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
// 获得反序列化的 ObjectOutput 对象
OutputStream output = new ChannelBufferOutputStream(buffer);
ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
// 写入 ObjectOutput
encodeData(channel, objectOutput, message);
objectOutput.flushBuffer();
// 释放
if (objectOutput instanceof Cleanable) {
((Cleanable) objectOutput).cleanup();
}
}
- 第 3 至 5 行:获得对应的 Serialization 对象,并创建用于反序列化的 ObjectOutput 对象。不同的 Serialization 实现,对应不同的 ObjectOutput 实现类。这里,我们只要读懂大体流程,详细的,我们后面文章见。
- 第 7 行:调用
#encodeData(channel, objectOutput, message)方法,写入 ObjectOutput。代码如下:
1
2
3
4
5
6
7
protected void encodeData(Channel channel, ObjectOutput output, Object message) throws IOException {
encodeData(output, message);
}
protected void encodeData(ObjectOutput output, Object message) throws IOException {
output.writeObject(message);
}
- 第 9 至 12 行:释放资源。目前,仅有
kryo的 KryoObjectInput、KryoObjectOutput 实现了 Cleanable 接口,需要释放资源。
解码消息
#decode(channel, buffer) 实现方法,和解码消息基本一致,胖友自己查看。
9.3 CodecAdapter
com.alibaba.dubbo.remoting.transport.codec.CodecAdapter,实现 Code2 接口,Codec 适配器,将 Codec 适配成 Codec2。
代码比较简单,胖友自己查看。
666. 彩蛋
代码比较多,如果不熟悉 Netty 等框架的胖友,可能会一脸懵逼的看到文末。建议胖友结合如下的代码:
1
2
3
4
5
6
7
8
9
10
// Netty.java
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline().addLast("decoder", adapter.getDecoder()) // 解码
.addLast("encoder", adapter.getEncoder()) // 解码
.addLast("handler", nettyServerHandler); // 处理器
}
}
在脑补 Debug + IDE Debug,多考虑下。
推荐阅读:








