文章

NIO服务器(五)之Buffer层

NIO服务器(五)之Buffer层

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(四)之 Exchange 层》 一文,分享 dubbo-remoting-api 模块, buffer 包,Buffer 层

Buffer 在 NIO 框架中,扮演非常重要的角色,基本每个库都提供了自己的 Buffer 实现,例如:

  • Java NIO 的 java.nio.ByteBuffer
  • Mina 的 org.apache.mina.core.buffer.IoBuffer
  • Netty4 的 io.netty.buffer.ByteBuf

在 dubbo-remoting-api 的 buffer 包中,一方面定义了 ChannelBuffer 和 ChannelBufferFactory 的接口,同时提供了多种默认的实现。整体类图如下:

Buffer类图

  • 其中,红框部分,是 Netty3 和 Netty4 ,实现的自定义的 ChannelBuffer 和 ChannelBufferFactory 类。

2. ChannelBuffer

com.alibaba.dubbo.remoting.buffer.ChannelBuffer ,实现 Comparable 接口,通道 Buffer 接口。

ChannelBuffer 在接口方法的定义上,主要参考了 Netty 的 ByteBuf 进行设计,所以接口和注释基本一致,本文就不一个一个细讲过去,胖友可以看:

独有的接口方法 #factory() 方法,用于逻辑中,需要创建 ChannelBuffer 的情况。

  • 代码如下:
1
2
3
4
5
/**
 * Returns the factory which creates a {@link ChannelBuffer} whose type and
 * default {@link java.nio.ByteOrder} are same with this buffer.
 */
ChannelBufferFactory factory();

  • 调用方如下:

factory方法调用方

2.1 AbstractChannelBuffer

com.alibaba.dubbo.remoting.buffer.AbstractChannelBuffer ,实现 ChannelBuffer 接口,通道 Buffer 抽象类

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
 * 读取位置
 */
private int readerIndex;
/**
 * 写入位置
 */
private int writerIndex;
/**
 * 标记的读取位置
 */
private int markedReaderIndex;
/**
 * 标记的写入位置
 */
private int markedWriterIndex;

FROM 《netty的ByteBuf》

writerIndexreaderIndex

  • 初始状态:

初始状态

  • 当写入5个字节后:

写入5个字节后

这时,writerIndex 为 5,这时如果开始读取,那么这个 writerIndex 可以作为上面ByteBuffer flip 之后的 limit。

  • 当读取3个字节后:

读取3个字节后

实现方法

在 AbstractChannelBuffer 实现的方法,都是重载的方法,真正实质的方法,需要子类来实现。以 #getBytes(…) 方法,举例子:

1
2
3
4
5
6
7
8
@Override
public void getBytes(int index, ChannelBuffer dst, int length) {
    if (length > dst.writableBytes()) {
        throw new IndexOutOfBoundsException();
    }
    getBytes(index, dst, dst.writerIndex(), length);
    dst.writerIndex(dst.writerIndex() + length);
}

  • 方法中调用的 实质实现 #getBytes(index, ds, dstIndex, length) 方法,并未实现。涉及到字节数组的方法,都是如此。

如下是所有未实现的方法:

未实现方法

2.2 ByteBufferBackedChannelBuffer

com.alibaba.dubbo.remoting.buffer.ByteBufferBackedChannelBuffer ,实现 AbstractChannelBuffer 抽象类,基于 java.nio.ByteBuffer 的 Buffer 实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * buffer
 * java.nio.ByteBuffer
 */
private final ByteBuffer buffer;
/**
 * 容量
 */
private final int capacity;

public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }
    // buffer
    this.buffer = buffer.slice();
    // 容量
    capacity = buffer.remaining();
    // 设置 `writerIndex`
    writerIndex(capacity);
}

public ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) {
    // buffer
    this.buffer = buffer.buffer;
    // 容量
    capacity = buffer.capacity;
    // 设置 `writerIndex` `readerIndex`
    setIndex(buffer.readerIndex(), buffer.writerIndex());
}

工厂

1
2
3
4
5
6
7
8
@Override
public ChannelBufferFactory factory() {
    if (buffer.isDirect()) {
        return DirectChannelBufferFactory.getInstance();
    } else {
        return HeapChannelBufferFactory.getInstance();
    }
}

  • 对应的工厂是 DirectChannelBufferFactory 或 HeapChannelBufferFactory。

实现方法

胖友,自己查看。

2.3 HeapChannelBuffer

com.alibaba.dubbo.remoting.buffer.HeapChannelBuffer ,实现 AbstractChannelBuffer 抽象类,基于字节数组的 Buffer 实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * The underlying heap byte array that this buffer is wrapping.
 * 字节数组
 */
protected final byte[] array;

public HeapChannelBuffer(int length) {
    this(new byte[length], 0, 0);
}

public HeapChannelBuffer(byte[] array) {
    this(array, 0, array.length);
}

protected HeapChannelBuffer(byte[] array, int readerIndex, int writerIndex) {
    if (array == null) {
        throw new NullPointerException("array");
    }
    this.array = array;
    setIndex(readerIndex, writerIndex);
}

工厂

1
2
3
4
@Override
public ChannelBufferFactory factory() {
    return HeapChannelBufferFactory.getInstance();
}

  • 对应的工厂是 HeapChannelBufferFactory。

实现方法

胖友,自己查看。

2.4 DynamicChannelBuffer

com.alibaba.dubbo.remoting.buffer.DynamicChannelBuffer ,实现 AbstractChannelBuffer 抽象类,基于动态的 Buffer 实现类。或者说,基于传入的 ChannelBufferFactory 的 Buffer 实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * 工厂
 */
private final ChannelBufferFactory factory;
/**
 * Buffer
 */
private ChannelBuffer buffer;

public DynamicChannelBuffer(int estimatedLength) {
    this(estimatedLength, HeapChannelBufferFactory.getInstance()); // 默认 HeapChannelBufferFactory
}

public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
    if (estimatedLength < 0) {
        throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
    }
    if (factory == null) {
        throw new NullPointerException("factory");
    }
    // 设置 `factory`
    this.factory = factory;
    // 创建 `buffer`
    buffer = factory.getBuffer(estimatedLength);
}

工厂

1
2
3
4
@Override
public ChannelBufferFactory factory() {
    return factory;
}

实现方法

每个方法,直接调用 buffer 对应的方法。

3. ChannelBuffers

com.alibaba.dubbo.remoting.buffer.ChannelBuffers ,Buffer 工具类,提供创建、比较 ChannelBuffer 等公用方法。如下图所示:

ChannelBuffers工具类

4. ChannelBufferFactory

com.alibaba.dubbo.remoting.buffer.ChannelBufferFactory通道 Buffer 工厂接口。方法如下:

1
2
3
ChannelBuffer getBuffer(int capacity);
ChannelBuffer getBuffer(byte[] array, int offset, int length);
ChannelBuffer getBuffer(ByteBuffer nioBuffer); // java.nio.ByteBuffer

4.1 DirectChannelBufferFactory

com.alibaba.dubbo.remoting.buffer.DirectChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 DirectChannelBuffer 的工厂。

实现比较简单,代码已经加注释,胖友自己查看。

4.2 HeapChannelBufferFactory

com.alibaba.dubbo.remoting.buffer.HeapChannelBufferFactory ,实现 ChannelBufferFactory 接口,创建 HeapChannelBufferFactory 的工厂。

实现比较简单,代码已经加注释,胖友自己查看。

5. IO

实际 IO 操作,是基于 InputStream 和 OutputStream ,例如我们在前文看到的 Serialization 序列化和反序列化,方法如下:

1
2
3
4
// ... 省略其他方法
ObjectOutput serialize(URL url, OutputStream output) throws IOException;

ObjectInput deserialize(URL url, InputStream input) throws IOException;

所以,我们需要将 ChannelBuffer 进行装饰。


另外,我们在回过头来看看 Codec 和 Codec2 接口,方法如下:

1
2
3
4
5
6
7
8
9
// Codec.java
void encode(Channel channel, OutputStream output, Object message) throws IOException;
// Codec2.java
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

// Codec.java
Object decode(Channel channel, InputStream input) throws IOException;
// Codec2.java
Object decode(Channel channel, ChannelBuffer buffer) throws IOException

一个变化点,就是将 OutputStream 和 InputStream ,替换成了 ChannelBuffer ,更好的以 ChannelBuffer 为核心,与其他框架整合。

5.1 ChannelBufferInputStream

com.alibaba.dubbo.remoting.buffer.ChannelBufferInputStream ,实现 InputStream 接口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ChannelBufferInputStream extends InputStream {

    private final ChannelBuffer buffer;
    /**
     * 开始位置
     */
    private final int startIndex;
    /**
     * 结束位置
     */
    private final int endIndex;

    public ChannelBufferInputStream(ChannelBuffer buffer) {
        this(buffer, buffer.readableBytes());
    }

    public ChannelBufferInputStream(ChannelBuffer buffer, int length) {
        if (buffer == null) {
            throw new NullPointerException("buffer");
        }
        if (length < 0) {
            throw new IllegalArgumentException("length: " + length);
        }
        if (length > buffer.readableBytes()) {
            throw new IndexOutOfBoundsException();
        }

        this.buffer = buffer;
        startIndex = buffer.readerIndex();
        endIndex = startIndex + length;
        buffer.markReaderIndex();
    }

    // ... 省略从 buffer 读取的方法,胖友,自己查看。
}

5.2 ChannelBufferOutputStream

com.alibaba.dubbo.remoting.buffer.ChannelBufferOutputStream ,实现 OutputStream 接口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ChannelBufferOutputStream extends OutputStream {

    private final ChannelBuffer buffer;
    /**
     * 开始位置
     */
    private final int startIndex;

    public ChannelBufferOutputStream(ChannelBuffer buffer) {
        if (buffer == null) {
            throw new NullPointerException("buffer");
        }
        this.buffer = buffer;
        startIndex = buffer.writerIndex();
    }

    // ... 省略向 buffer 写入的方法,胖友,自己查看。
}

本文由作者按照 CC BY 4.0 进行授权