日志

netty源码解解析(4.0)-23 ByteBuf内存管理:分配和释放

 来源    2019-10-09    1  

  ByteBuf内存分配和释放由具体实现负责,抽象类型只定义的内存分配和释放的时机。

  内存分配分两个阶段: 第一阶段,初始化时分配内存。第二阶段: 内存不够用时分配新的内存。ByteBuf抽象层没有定义第一阶段的行为,但定义了第二阶段的方法:

  public abstract ByteBuf capacity(int newCapacity)

  这个方法负责分配一个长度为newCapacity的新内存。

  内存释放的抽象实现在AbstractReferenceCountedByteBuf中实现,这个类实现引用计数,当调用release方法把引用计数变成0时,会调用

  protected abstract void deallocate()

  执行真正的内存释放操作。

内存相关的属性

  ByteBuf定义了两个内存相关的属性:

  capacity: 当前的当前的容量,也就是当前使用的内存大小。使用capacity()方法获得。

  maxCapacity: 最大容量,也就是可以使用的最大内存大小。使用newCapacity()方法获得。

内存分配时机

  AbstractByteBuf定义了内存分配的时机。当writeXX方法被调用的时候,如果如果发现可写空间不足,就调用capacity分配新的内存。下面以writeInt为例详细分析这个过程。

@Override
    public ByteBuf writeInt(int value) {
        ensureWritable0(4);
        _setInt(writerIndex, value);
        writerIndex += 4;
        return this;
    }    


    final void ensureWritable0(int minWritableBytes) {
        ensureAccessible();
        if (minWritableBytes <= writableBytes()) {
            return;
        }

        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }

        // Normalize the current capacity to the power of 2.
        int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

        // Adjust to the new capacity.
        capacity(newCapacity);
    }

  3行: ensureWritable确保当前能写入4Byte的数据。

  11行: 确保当前ByteBuf是可以访问的。防止多线程环境下,ByteBuf内存被释放后读写数据。

  12,13行: 如果内存够用,就此作罢。

  16行:  如果需要内存大于maxCapacity抛出异常。

  23, 26行行: 计算新内存的大小, 调用capacity(int)分配新内存。

  

  重新分配内存之前的一个重要步骤的计算新内存的大小。这个工作由calculateNewCapacity方法完成,它的代码如下:

private int calculateNewCapacity(int minNewCapacity) {
        final int maxCapacity = this.maxCapacity;
        final int threshold = 1048576 * 4; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) {
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }

  1行:接受一个最小的新内存参数minNewCapacity。

  3行: 定义一个4MB的阈值常量threshold。

  5,6行: 如果minNewCapacity==threshold,那么新内存大小就是threshold。

  10-17行: 如果minNewCapacity>threshold, 新内存大小是min(maxCapacity, threshold * n)且threshold * n >= minNewCapacity。

  21-26行: 如果minNewCapacity<threshold, 新内存大小是min(maxCapacity, 64 * 2n)且64 * 2n >= minNewCapacity。

内存分配和释放的具体实现

  本章涉及到的内存分配和释放的具体实现只涉及到unpooled类型的ByteBuf,即:

  UnpooledHeapByteBuf

  UnpooledDirectByteBuf

  UnpooledUnsafeHeapByteBuf

  UnpooledUnsafeDirectByteBuf

  这几个具体实现中涉及到的内存分配和释放代码比较简洁,更容易明白ByteBuf内存管理的原理。相比之下,pooled类型的ByteBuf内存分配和释放的代码要复杂很多,会在后面的章节独立分析。

  

  UnpooledHeapByteBuf和UnpooledUnsafeHeapByteBuf实现

   UnpooledHeapByteBuf中,内存分配的实现代码主要集中在capacity(int)和allocateArray()方法中。capacity分配新内存的步骤是

  • 调用allocateArray分配一块新内存。
  • 把旧内存中的实际复制到新内存中。
  • 使用新内存替换旧内存(24行)。
  • 释放掉旧内存(25行)。

   代码如下:

@Override
    public ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

        int oldCapacity = array.length;
        byte[] oldArray = array;
        if (newCapacity > oldCapacity) {
            byte[] newArray = allocateArray(newCapacity);
            System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
            setArray(newArray);
            freeArray(oldArray);
        } else if (newCapacity < oldCapacity) {
            byte[] newArray = allocateArray(newCapacity);
            int readerIndex = readerIndex();
            if (readerIndex < newCapacity) {
                int writerIndex = writerIndex();
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setArray(newArray);
            freeArray(oldArray);
        }
        return this;
    }

  capacity中复制旧内存数据到新内存中的时候分两种情况(newCapacity,oldCapacity分别是新旧内存的大小):

  • newCapacity>oldCapacity,这种情况比较简单,直接复制就好了(第8行)。不影响readerIndex和writerIndex。
  • newCapacity<oldCapacity,  这种情况比较复杂。capacity尽量把可读的数据复制新内存中。
    • 如果readerIndex<newCapacity且writerIndex<newCapacity。可读数据会全部转移到新内存中。readerIndex和writerIndex保持不变。
    • 如果readerIndex<newCapacity且writeIndex>newCapacity。可端数据会部分转移的新内存中,会丢失部分可读数据。readerIndex不变,writerIndex变成newCapacity。
    • 如果readerIndex>newCapacity,数据全部丢失,readerIndex和writerIndex都会变成newCapacity。

   allocateArray方法负责分配一块新内存,它的实现是new byte[]。freeArray方法负责释放内存,这个方法是个空方法。

  UnpooledUnsafeHeapByteBuf是UnloopedHeadpByteBuf的直接子类,在内存管理上的差别是allocateArray的实现,UnpooledUnsafeHeapByteBuf的实现是:  

@Override
    byte[] allocateArray(int initialCapacity) {
        return PlatformDependent.allocateUninitializedArray(initialCapacity);
    }

  UnpooledDirectByteBuf和UnpooledUnsafeDirectByteBuf实现

  UnpooledDirectByteBuf类使用DirectByteBuffer作为内存,使用了DirectByteBuffer的能力来实现ByteBuf接口。allocateDirect和freeDirect方法负责分配和释放DirectByteBuffer。capacity(int)方法和UnloopedHeapByteBuf类似,使用allocateDirect创建一个新的DirectByteBuffer, 把旧内存数据复制到新内存中,然后使用新内存替换旧内存,最后调用freeDirect方法释放掉旧的DirectByteBuffer。

protected ByteBuffer allocateDirect(int initialCapacity) {
        return ByteBuffer.allocateDirect(initialCapacity);
    }

    protected void freeDirect(ByteBuffer buffer) {
        PlatformDependent.freeDirectBuffer(buffer);
    }

  @Override
    public ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

        int readerIndex = readerIndex();
        int writerIndex = writerIndex();

        int oldCapacity = capacity;
        if (newCapacity > oldCapacity) {
            ByteBuffer oldBuffer = buffer;
            ByteBuffer newBuffer = allocateDirect(newCapacity);
            oldBuffer.position(0).limit(oldBuffer.capacity());
            newBuffer.position(0).limit(oldBuffer.capacity());
            newBuffer.put(oldBuffer);
            newBuffer.clear();
            setByteBuffer(newBuffer);
        } else if (newCapacity < oldCapacity) {
            ByteBuffer oldBuffer = buffer;
            ByteBuffer newBuffer = allocateDirect(newCapacity);
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                oldBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.put(oldBuffer);
                newBuffer.clear();
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setByteBuffer(newBuffer);
        }
        return this;
    }

  对比UnloopedHeapByteBuf的capacity(int)方法,发现这两个实现非常类似,也分两种情况处理:

  • 18-24行,newCapacity > oldCapacity的情况。
  • 26-39行, newCapacity < oldCapacity的情况。

  两种情况对readerIndex和writerIndex的影响也一样,不同的是数据复制时的具体实现。  

  UnpooledUnsafeDirectByteBuf和UnpooledDirectByteBuf同属于AbstractReferenceCountedByteBuf的派生类,它们之间没有继承关系。但内存分配和释放实现是一样的,不同的地方是内存I/O。UnpooledUnsafeDirectByteBuf使用UnsafeByteBufUtil类之间读写DirectByteBuffer的内存,没有使用DirectByteBuffer的I/O能力。

  

netty源码解解析(4.0)-22 ByteBuf的I/O
日志    ByteBuf的I/O主要解决的问题有两个: 管理readerIndex和writerIndex.这个在在AbstractByteBuf中解决. 从内存中读写数据.ByteBuf的不同实现主要 ...
netty源码解解析(4.0)-21 ByteBuf的设计原理
日志    io.netty.buffer包中是netty ByteBuf的实现.ByteBuf是一个二进制缓冲区的抽象接口,它的功能有: 可以随机访问.顺序访问. 支持基本数据类型(byte, shor ...
netty源码解解析(4.0)-24 ByteBuf基于内存池的内存管理
日志io.netty.buffer.PooledByteBuf<T>使用内存池中的一块内存作为自己的数据内存,这个块内存是PoolChunk<T>的一部分.PooledByteBu ...
netty源码解解析(4.0)-17 ChannelHandler: IdleStateHandler实现
日志  io.netty.handler.timeout.IdleStateHandler功能是监测Channel上read, write或者这两者的空闲状态.当Channel超过了指定的空闲时间时,这个 ...
netty源码解解析(4.0)-14 Channel NIO实现:读取数据
日志 本章分析Nio Channel的数据读取功能的实现. Channel读取数据需要Channel和ChannelHandler配合使用,netty设计数据读取功能包括三个要素:Channel, Eve ...
netty源码解解析(4.0)-20 ChannelHandler: 自己实现一个自定义协议的服务器和客户端
日志本章不会直接分析Netty源码,而是通过使用Netty的能力实现一个自定义协议的服务器和客户端.通过这样的实践,可以更深刻地理解Netty的相关代码,同时可以了解,在设计实现自定义协议的过程中需要解决 ...
netty源码解解析(4.0)-19 ChannelHandler: codec--常用编解码实现
日志数据包编解码过程中主要的工作就是:在编码过程中进行序列化,在解码过程中从Byte流中分离出数据包然后反序列化.在MessageToByteEncoder中,已经解决了序列化之后的问题,ByteToMe ...
netty源码解解析(4.0)-16 ChannelHandler概览
日志本章开始分析ChannelHandler实现代码.ChannelHandler是netty为开发者提供的实现定制业务的主要接口,开发者在使用netty时,最主要的工作就是实现自己的ChannelHan ...
netty源码解解析(4.0)-15 Channel NIO实现:写数据
日志写数据是NIO Channel实现的另一个比较复杂的功能.每一个channel都有一个outboundBuffer,这是一个输出缓冲区.当调用channel的write方法写数据时,这个数据被一系列C ...
netty源码解解析(4.0)-13 Channel NIO实现: 关闭和清理
日志Channel提供了3个方法用来实现关闭清理功能:disconnect,close,deregister.本章重点分析这个3个方法的功能的NIO实现. disconnect实现: 断开连接 disco ...
netty源码解解析(4.0)-12 Channel NIO实现:channel初始化
日志创建一个channel实例,并把它register到eventLoopGroup中之后,这个channel然后处于inactive状态,仍然是不可用的.只有在bind或connect方法调用成功之后才 ...
netty源码解解析(4.0)-11 Channel NIO实现-概览
日志    结构设计 Channel的NIO实现位于io.netty.channel.nio包和io.netty.channel.socket.nio包中,其中io.netty.channel.nio是抽 ...
netty源码解解析(4.0)-10 ChannelPipleline的默认实现--事件传递及处理
日志事件触发.传递.处理是DefaultChannelPipleline实现的另一个核心能力.在前面在章节中粗略地讲过了事件的处理流程,本章将会详细地分析其中的所有关键细节.这些关键点包括: 事件触发接口 ...
netty源码解解析(4.0)-9 ChannelPipleline的默认实现-链表管理
日志io.netty.channel.DefaultChannelPipeline implements ChannelPipleline DefaultChannelPiple给出了ChannelPip ...
netty源码解解析(4.0)-18 ChannelHandler: codec--编解码框架
日志编解码框架和一些常用的实现位于io.netty.handler.codec包中. 编解码框架包含两部分:Byte流和特定类型数据之间的编解码,也叫序列化和反序列化.不类型数据之间的转换. 下图是编解码 ...
《Linux源码情景分析》--2.1 Linux内存管理的基本框架
日志2.1 Linux内存管理的基本框架 ​ Linux内核的设计要考虑在各种不同的CPU上的实现,还要考虑64位CPU,所以不能仅仅针对i386结构来设计它的映射机制,要以一种假象的.虚拟的CPU和MM ...
深入netty源码解析之一数据结构
日志Netty是一个异步事件驱动的网络应用框架,它适用于高性能协议的服务端和客户端的快速开发和维护.其架构如下所示: 其核心分为三部分,     最低层为支持零拷贝功能的自定义Byte buffer:   ...
Netty源码解析—客户端启动
日志Netty源码解析-客户端启动 Bootstrap示例 public final class EchoClient { static final boolean SSL = System.getPro ...
Netty源码解析---服务端启动
日志Netty源码解析---服务端启动 一个简单的服务端代码: public class SimpleServer { public static void main(String[] args) { N ...