NIO服务器(五)之Buffer层
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(四)之 Exchange 层》 一文,分享 dubbo-remoting-api 模块, buffer 包,Buffer 层。
Buffer 在 NIO 框架中,扮演非常重要的角色,基本每个库都提供了自己的 Buffer 实现,例如:
- Java NIO 的
java.nio.ByteBuffer - Mina 的
org.apache.mina.core.buffer.IoBuffer - Netty4 的
io.netty.buffer.ByteBuf
在 dubbo-remoting-api 的 buffer 包中,一方面定义了 ChannelBuffer 和 ChannelBufferFactory 的接口,同时提供了多种默认的实现。整体类图如下:
- 其中,红框部分,是 Netty3 和 Netty4 ,实现的自定义的 ChannelBuffer 和 ChannelBufferFactory 类。
2. ChannelBuffer
com.alibaba.dubbo.remoting.buffer.ChannelBuffer ,实现 Comparable 接口,通道 Buffer 接口。
ChannelBuffer 在接口方法的定义上,主要参考了 Netty 的 ByteBuf 进行设计,所以接口和注释基本一致,本文就不一个一个细讲过去,胖友可以看:
独有的接口方法 #factory() 方法,用于逻辑中,需要创建 ChannelBuffer 的情况。
- 代码如下:
1
2
3
4
5
/**
* Returns the factory which creates a {@link ChannelBuffer} whose type and
* default {@link java.nio.ByteOrder} are same with this buffer.
*/
ChannelBufferFactory factory();
- 调用方如下:
2.1 AbstractChannelBuffer
com.alibaba.dubbo.remoting.buffer.AbstractChannelBuffer ,实现 ChannelBuffer 接口,通道 Buffer 抽象类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 读取位置
*/
private int readerIndex;
/**
* 写入位置
*/
private int writerIndex;
/**
* 标记的读取位置
*/
private int markedReaderIndex;
/**
* 标记的写入位置
*/
private int markedWriterIndex;
FROM 《netty的ByteBuf》
writerIndex 和 readerIndex
- 初始状态:
- 当写入5个字节后:
这时,writerIndex 为 5,这时如果开始读取,那么这个 writerIndex 可以作为上面ByteBuffer flip 之后的 limit。
- 当读取3个字节后:
实现方法
在 AbstractChannelBuffer 实现的方法,都是重载的方法,真正实质的方法,需要子类来实现。以 #getBytes(…) 方法,举例子:
1
2
3
4
5
6
7
8
@Override
public void getBytes(int index, ChannelBuffer dst, int length) {
if (length > dst.writableBytes()) {
throw new IndexOutOfBoundsException();
}
getBytes(index, dst, dst.writerIndex(), length);
dst.writerIndex(dst.writerIndex() + length);
}
- 方法中调用的 实质实现
#getBytes(index, ds, dstIndex, length)方法,并未实现。涉及到字节数组的方法,都是如此。
如下是所有未实现的方法:
2.2 ByteBufferBackedChannelBuffer
com.alibaba.dubbo.remoting.buffer.ByteBufferBackedChannelBuffer ,实现 AbstractChannelBuffer 抽象类,基于 java.nio.ByteBuffer 的 Buffer 实现类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* buffer
* java.nio.ByteBuffer
*/
private final ByteBuffer buffer;
/**
* 容量
*/
private final int capacity;
public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
// buffer
this.buffer = buffer.slice();
// 容量
capacity = buffer.remaining();
// 设置 `writerIndex`
writerIndex(capacity);
}
public ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) {
// buffer
this.buffer = buffer.buffer;
// 容量
capacity = buffer.capacity;
// 设置 `writerIndex` `readerIndex`
setIndex(buffer.readerIndex(), buffer.writerIndex());
}
工厂
1
2
3
4
5
6
7
8
@Override
public ChannelBufferFactory factory() {
if (buffer.isDirect()) {
return DirectChannelBufferFactory.getInstance();
} else {
return HeapChannelBufferFactory.getInstance();
}
}
- 对应的工厂是 DirectChannelBufferFactory 或 HeapChannelBufferFactory。
实现方法
胖友,自己查看。
2.3 HeapChannelBuffer
com.alibaba.dubbo.remoting.buffer.HeapChannelBuffer ,实现 AbstractChannelBuffer 抽象类,基于字节数组的 Buffer 实现类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* The underlying heap byte array that this buffer is wrapping.
* 字节数组
*/
protected final byte[] array;
public HeapChannelBuffer(int length) {
this(new byte[length], 0, 0);
}
public HeapChannelBuffer(byte[] array) {
this(array, 0, array.length);
}
protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) {
if (array == null) {
throw new NullPointerException("array");
}
this.array = array;
setIndex(readerIndex, writerIndex);
}
工厂
1
2
3
4
@Override
public ChannelBufferFactory factory() {
return HeapChannelBufferFactory.getInstance();
}
- 对应的工厂是 HeapChannelBufferFactory。
实现方法
胖友,自己查看。
2.4 DynamicChannelBuffer
com.alibaba.dubbo.remoting.buffer.DynamicChannelBuffer ,实现 AbstractChannelBuffer 抽象类,基于动态的 Buffer 实现类。或者说,基于传入的 ChannelBufferFactory 的 Buffer 实现类。
构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 工厂
*/
private final ChannelBufferFactory factory;
/**
* Buffer
*/
private ChannelBuffer buffer;
public DynamicChannelBuffer(int estimatedLength) {
this(estimatedLength, HeapChannelBufferFactory.getInstance()); // 默认 HeapChannelBufferFactory
}
public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
if (estimatedLength < 0) {
throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
}
if (factory == null) {
throw new NullPointerException("factory");
}
// 设置 `factory`
this.factory = factory;
// 创建 `buffer`
buffer = factory.getBuffer(estimatedLength);
}
工厂
1
2
3
4
@Override
public ChannelBufferFactory factory() {
return factory;
}
实现方法
每个方法,直接调用 buffer 对应的方法。
3. ChannelBuffers
com.alibaba.dubbo.remoting.buffer.ChannelBuffers ,Buffer 工具类,提供创建、比较 ChannelBuffer 等公用方法。如下图所示:
4. ChannelBufferFactory
com.alibaba.dubbo.remoting.buffer.ChannelBufferFactory ,通道 Buffer 工厂接口。方法如下:
1
2
3
ChannelBuffer getBuffer(int capacity);
ChannelBuffer getBuffer(byte[] array, int offset, int length);
ChannelBuffer getBuffer(ByteBuffer nioBuffer); // java.nio.ByteBuffer
4.1 DirectChannelBufferFactory
com.alibaba.dubbo.remoting.buffer.DirectChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 DirectChannelBuffer 的工厂。
实现比较简单,代码已经加注释,胖友自己查看。
4.2 HeapChannelBufferFactory
com.alibaba.dubbo.remoting.buffer.HeapChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 HeapChannelBufferFactory 的工厂。
实现比较简单,代码已经加注释,胖友自己查看。
5. IO
实际 IO 操作,是基于 InputStream 和 OutputStream ,例如我们在前文看到的 Serialization 序列化和反序列化,方法如下:
1
2
3
4
// ... 省略其他方法
ObjectOutput serialize(URL url, OutputStream output) throws IOException;
ObjectInput deserialize(URL url, InputStream input) throws IOException;
所以,我们需要将 ChannelBuffer 进行装饰。
另外,我们在回过头来看看 Codec 和 Codec2 接口,方法如下:
1
2
3
4
5
6
7
8
9
// Codec.java
void encode(Channel channel, OutputStream output, Object message) throws IOException;
// Codec2.java
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
// Codec.java
Object decode(Channel channel, InputStream input) throws IOException;
// Codec2.java
Object decode(Channel channel, ChannelBuffer buffer) throws IOException
一个变化点,就是将 OutputStream 和 InputStream ,替换成了 ChannelBuffer ,更好的以 ChannelBuffer 为核心,与其他框架整合。
5.1 ChannelBufferInputStream
com.alibaba.dubbo.remoting.buffer.ChannelBufferInputStream ,实现 InputStream 接口,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ChannelBufferInputStream extends InputStream {
private final ChannelBuffer buffer;
/**
* 开始位置
*/
private final int startIndex;
/**
* 结束位置
*/
private final int endIndex;
public ChannelBufferInputStream(ChannelBuffer buffer) {
this(buffer, buffer.readableBytes());
}
public ChannelBufferInputStream(ChannelBuffer buffer, int length) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
if (length < 0) {
throw new IllegalArgumentException("length: " + length);
}
if (length > buffer.readableBytes()) {
throw new IndexOutOfBoundsException();
}
this.buffer = buffer;
startIndex = buffer.readerIndex();
endIndex = startIndex + length;
buffer.markReaderIndex();
}
// ... 省略从 buffer 读取的方法,胖友,自己查看。
}
5.2 ChannelBufferOutputStream
com.alibaba.dubbo.remoting.buffer.ChannelBufferOutputStream ,实现 OutputStream 接口,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ChannelBufferOutputStream extends OutputStream {
private final ChannelBuffer buffer;
/**
* 开始位置
*/
private final int startIndex;
public ChannelBufferOutputStream(ChannelBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
this.buffer = buffer;
startIndex = buffer.writerIndex();
}
// ... 省略向 buffer 写入的方法,胖友,自己查看。
}






