文章

NIO服务器(五)之Buffer层

NIO服务器(五)之Buffer层

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(四)之 Exchange 层》 一文,分享 dubbo-remoting-api 模块, buffer 包,Buffer 层

Buffer 在 NIO 框架中,扮演非常重要的角色,基本每个库都提供了自己的 Buffer 实现,例如:

  • Java NIO 的 java.nio.ByteBuffer
  • Mina 的 org.apache.mina.core.buffer.IoBuffer
  • Netty4 的 io.netty.buffer.ByteBuf

在 dubbo-remoting-api 的 buffer 包中,一方面定义了 ChannelBuffer 和 ChannelBufferFactory 的接口,同时提供了多种默认的实现。整体类图如下:

类图

  • 其中,红框部分,是 Netty3 和 Netty4 ,实现的自定义的 ChannelBuffer 和 ChannelBufferFactory 类。

2. ChannelBuffer

com.alibaba.dubbo.remoting.buffer.ChannelBuffer ,实现 Comparable 接口,通道 Buffer 接口。

ChannelBuffer 在接口方法的定义上,主要参考了 Netty 的 ByteBuf 进行设计,所以接口和注释基本一致,本文就不一个一个细讲过去,胖友可以看:

独有的接口方法 #factory() 方法,用于逻辑中,需要创建 ChannelBuffer 的情况。

  • 代码如下:

```plain text plain /** * Returns the factory which creates a {@link ChannelBuffer} whose type and * default {@link java.nio.ByteOrder} are same with this buffer. */ ChannelBufferFactory factory()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
---

- 调用方如下:
![](/assets/images/learning/dubbo/dubbo-nio-server-buffer/7e43836aa5e5b53458be26b838eff209.png)
调用方

## 2.1 AbstractChannelBuffer

[com.alibaba.dubbo.remoting.buffer.AbstractChannelBuffer](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/buffer/AbstractChannelBuffer.java) ,实现 ChannelBuffer 接口,通道 Buffer **抽象类**。

**构造方法**

```plain text
plain /**  * 读取位置  */ private int readerIndex; /**  * 写入位置  */ private int writerIndex; /**  * 标记的读取位置  */ private int markedReaderIndex; /**  * 标记的写入位置  */ private int markedWriterIndex;

FROM 《netty的ByteBuf》

writerIndex 和 readerIndex

  • 初始状态:
  • 当写入5个字节后:

这时,writerIndex 为 5,这时如果开始读取,那么这个 writerIndex 可以作为上面ByteBuffer flip 之后的 limit。

  • 当读取3个字节后:

实现方法

在 AbstractChannelBuffer 实现的方法,都是重载的方法,真正实质的方法,需要子类来实现。以 #getBytes(…) 方法,举例子:

```plain text plain @Override public void getBytes(int index, ChannelBuffer dst, int length) { if (length > dst.writableBytes()) { throw new IndexOutOfBoundsException(); } getBytes(index, dst, dst.writerIndex(), length); dst.writerIndex(dst.writerIndex() + length); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
---

- 方法中调用的 **为啥呢实质实现形式**
#getBytes(index, ds, dstIndex, length)
方法,并未实现。
?
的方法,涉及到字节数组的
。

如下是所有

**未实现**

的方法:

![](/assets/images/learning/dubbo/dubbo-nio-server-buffer/4ae0e76b793e3eb45cedcd11ea5b77e9.png)

未实现方法

## 2.2 ByteBufferBackedChannelBuffer

[com.alibaba.dubbo.remoting.buffer.ByteBufferBackedChannelBuffer](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/buffer/ByteBufferBackedChannelBuffer.java) ,实现 AbstractChannelBuffer 抽象类,基于 **java.nio.ByteBuffer** 的 Buffer 实现类。

**构造方法**

```plain text
plain /**  * buffer  * java.nio.ByteBuffer  */ private final ByteBuffer buffer; /**  * 容量  */ private final int capacity;  public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {     if (buffer == null) {         throw new NullPointerException("buffer");     }     // buffer     this.buffer = buffer.slice();     // 容量     capacity = buffer.remaining();     // 设置 `writerIndex`     writerIndex(capacity); }  public ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) {     // buffer     this.buffer = buffer.buffer;     // 容量     capacity = buffer.capacity;     // 设置 `writerIndex` `readerIndex`     setIndex(buffer.readerIndex(), buffer.writerIndex()); }

工厂

```plain text plain @Override public ChannelBufferFactory factory() { if (buffer.isDirect()) { return DirectChannelBufferFactory.getInstance(); } else { return HeapChannelBufferFactory.getInstance(); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
---

- 对应的工厂是 DirectChannelBufferFactory 或 HeapChannelBufferFactory 。

**实现方法**

胖友,自己查看。

## 2.3 HeapChannelBuffer

[com.alibaba.dubbo.remoting.buffer.HeapChannelBuffer](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/buffer/HeapChannelBuffer.java) ,实现 AbstractChannelBuffer 抽象类,基于**字节数组**的 Buffer 实现类。

**构造方法**

```plain text
plain /**  * The underlying heap byte array that this buffer is wrapping.  * 字节数组  */ protected final byte[] array;  public HeapChannelBuffer(int length) {     this(new byte[length], 0, 0); }  public HeapChannelBuffer(byte[] array) {     this(array, 0, array.length); }  protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) {     if (array == null) {         throw new NullPointerException("array");     }     this.array = array;     setIndex(readerIndex, writerIndex); }

工厂

```plain text plain @Override public ChannelBufferFactory factory() { return HeapChannelBufferFactory.getInstance(); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
---

- 对应的工厂是 HeapChannelBufferFactory 。

**实现方法**

胖友,自己查看。

## 2.4 DynamicChannelBuffer

[com.alibaba.dubbo.remoting.buffer.DynamicChannelBuffer](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/buffer/DynamicChannelBuffer.java) ,实现 AbstractChannelBuffer 抽象类,基于**动态**的 Buffer 实现类。或者说,基于传入的 ChannelBufferFactory 的 Buffer 实现类。

**构造方法**

```plain text
plain /**  * 工厂  */ private final ChannelBufferFactory factory; /**  * Buffer  */ private ChannelBuffer buffer;  public DynamicChannelBuffer(int estimatedLength) {     this(estimatedLength, HeapChannelBufferFactory.getInstance()); // 默认 HeapChannelBufferFactory }  public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {     if (estimatedLength < 0) {         throw new IllegalArgumentException("estimatedLength: " + estimatedLength);     }     if (factory == null) {         throw new NullPointerException("factory");     }     // 设置 `factory`     this.factory = factory;     // 创建 `buffer`     buffer = factory.getBuffer(estimatedLength); }

工厂

```plain text plain @Override public ChannelBufferFactory factory() { return factory; }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
---

**实现方法**

每个方法,直接调用 buffer 对应的方法。

# 3. ChannelBuffers

[com.alibaba.dubbo.remoting.buffer.ChannelBuffers](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/buffer/ChannelBuffers.java)

,Buffer

**工具类**

,提供创建、比较 ChannelBuffer 等公用方法。如下图所示:

![](/assets/images/learning/dubbo/dubbo-nio-server-buffer/edccf2f8d577f7bca943034e840ef26e.png)

ChannelBuffers

# 4. ChannelBufferFactory

com.alibaba.dubbo.remoting.buffer.ChannelBufferFactory ,**通道 Buffer 工厂**接口。方法如下:

```plain text
plain ChannelBuffer getBuffer(int capacity); ChannelBuffer getBuffer(byte[] array, int offset, int length); ChannelBuffer getBuffer(ByteBuffer nioBuffer); // java.nio.ByteBuffer

4.1 DirectChannelBufferFactory

com.alibaba.dubbo.remoting.buffer.DirectChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 DirectChannelBuffer 的工厂。

实现比较简单,代码已经加注释,胖友自己查看。

4.2 HeapChannelBufferFactory

com.alibaba.dubbo.remoting.buffer.HeapChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 HeapChannelBufferFactory 的工厂。

实现比较简单,代码已经加注释,胖友自己查看。

5. IO

实际 IO 操作,是基于 InputStream 和 OutputStream ,例如我们在前文看到的 Serialization 序列化和反序列化,方法如下:

```plain text plain // … 省略其他方法 ObjectOutput serialize(URL url, OutputStream output) throws IOException; ObjectInput deserialize(URL url, InputStream input) throws IOException;

1
2
3
4
5
6
7
8
9
10
11
---

所以,我们需要将 ChannelBuffer 进行装饰。

---

另外,我们在回过头来看看 Codec 和 Codec2 接口,方法如下:

```plain text
plain // Codec.java void encode(Channel channel, OutputStream output, Object message) throws IOException; // Codec2.java void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;  // Codec.java Object decode(Channel channel, InputStream input) throws IOException; // Codec2.java Object decode(Channel channel, ChannelBuffer buffer) throws IOException

一个变化点,就是将 OutputStream 和 InputStream ,替换成了 ChannelBuffer ,更好的以 ChannelBuffer 为核心,与其他框架整合。

5.1 ChannelBufferInputStream

com.alibaba.dubbo.remoting.buffer.ChannelBufferInputStream ,实现 InputStream 接口,代码如下:

```plain text plain public class ChannelBufferInputStream extends InputStream { private final ChannelBuffer buffer; /** * 开始位置 */ private final int startIndex; /** * 结束位置 */ private final int endIndex; public ChannelBufferInputStream(ChannelBuffer buffer) { this(buffer, buffer.readableBytes()); } public ChannelBufferInputStream(ChannelBuffer buffer, int length) { if (buffer == null) { throw new NullPointerException(“buffer”); } if (length < 0) { throw new IllegalArgumentException(“length: “ + length); } if (length > buffer.readableBytes()) { throw new IndexOutOfBoundsException(); } this.buffer = buffer; startIndex = buffer.readerIndex(); endIndex = startIndex + length; buffer.markReaderIndex(); } // … 省略从 buffer 读取的方法,胖友,自己查看。 }

1
2
3
4
5
6
7
8
9
---

## 5.2 ChannelBufferOutputStream

[com.alibaba.dubbo.remoting.buffer.ChannelBufferOutputStream](https://github.com/YunaiV/dubbo/blob/master/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/buffer/ChannelBufferOutputStream.java) ,实现 OutputStream 接口,代码如下:

```plain text
plain public class ChannelBufferOutputStream extends OutputStream {      private final ChannelBuffer buffer;     /**      * 开始位置      */     private final int startIndex;      public ChannelBufferOutputStream(ChannelBuffer buffer) {         if (buffer == null) {             throw new NullPointerException("buffer");         }         this.buffer = buffer;         startIndex = buffer.writerIndex();     }      // ... 省略向 buffer 写入的方法,胖友,自己查看。  }

本文由作者按照 CC BY 4.0 进行授权