欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

Netty(二):ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

首先,我们再看一下 ByteBuf 的类设计图,从中更进一步了解ByteBuf。

2020010910011\_1.png

ByteBuf 继承自 ReferenceCounted,引用计数,也就是说 ByteBuf 的内存回收使用的是引用计数器来实现。

UnpooledHeapByteBuf 是非池化的堆内存实现,而 UnpooledDirectByteBuf 是非池化的堆外内存(直接内存)。非池化的ByteBuf 就是利用完之后就需要销毁,无法重用。

1、UnpooledHeapByteBuf 详解

其继承链:UnpooledHeapByteBuf –> AbstractReferenceCountedByteBuf –> AbstractByteBuf。

1.1 AbstractByteBuf 源码分析

AbstractByteBuf 定义 ByteBuf 的基本属性,诸如 readerIndex, writerIndex, markedReaderIndex, markedWriterIndex, maxCapacity, 我们知道 ByteBuf 的容量是可以自动扩容的。

AbstractByteBuf 的这两个属性,应该引起我们的注意:

  1. SwappedByteBuf swappedBuf
    这个是大端序列与小端序列的转换。
  2. ResourceLeakDetector leakDetector = new ResourceLeakDetector(ByteBuf.class);
    Netty 用来解决内存泄漏检测机制,下一篇会详细介绍。

这里截取一下 SwappedByteBuf 的源码,采用了典型的装饰模式来设计。

    public class SwappedByteBuf extends ByteBuf {

        private final ByteBuf buf;

        private final ByteOrder order;

        public SwappedByteBuf(ByteBuf buf) {

            if (buf == null) {

                throw new NullPointerException("buf");

            }

            this.buf = buf;

            if (buf.order() == ByteOrder.BIG_ENDIAN) {

                order = ByteOrder.LITTLE_ENDIAN;

            } else {

                order = ByteOrder.BIG_ENDIAN;

            }

        }

        @Override

        public ByteOrder order() {

            return order;

        }

        @Override

        public ByteBuf order(ByteOrder endianness) {

            if (endianness == null) {

                throw new NullPointerException("endianness");

            }

            if (endianness == order) {

                return this;

            }

            return buf;

        }

    }

关于其他 AbstractByteBuf, 该类设计使用了典型的模板模式,对 ByteBuf 提供的类,实现时提供一种模板,然后再提供一个钩子方法,供子类实现,比如_getLong方法,_setLong等方法,由于该类的实现原理不复杂,就不做进一步的源码解读。

1.2 AbstractReferenceCountedByteBuf

该类主要是实现引用计算的常规方法,充分利用 voliate 内存可见性与 CAS 操作完成 refCnt 变量的维护。

其源码实现如下:

    package io.netty.buffer;

    import io.netty.util.IllegalReferenceCountException;
    import io.netty.util.internal.PlatformDependent;

    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

    /**
     * Abstract base class for {@link ByteBuf} implementations that count references.
     */
    public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

        private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;

        static {
            AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
                    PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
            if (updater == null) {
                updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
            }
            refCntUpdater = updater;
        }

        private volatile int refCnt = 1;

        protected AbstractReferenceCountedByteBuf(int maxCapacity) {
            super(maxCapacity);
        }

        @Override
        public final int refCnt() {
            return refCnt;
        }

        /**
         * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
         */
        protected final void setRefCnt(int refCnt) {
            this.refCnt = refCnt;
        }

        @Override
        public ByteBuf retain() {
            for (;;) {
                int refCnt = this.refCnt;
                if (refCnt == 0) {
                    throw new IllegalReferenceCountException(0, 1);
                }
                if (refCnt == Integer.MAX_VALUE) {
                    throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
                }
                if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
                    break;
                }
            }
            return this;
        }

        @Override
        public ByteBuf retain(int increment) {
            if (increment <= 0) {
                throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
            }

            for (;;) {
                int refCnt = this.refCnt;
                if (refCnt == 0) {
                    throw new IllegalReferenceCountException(0, increment);
                }
                if (refCnt > Integer.MAX_VALUE - increment) {
                    throw new IllegalReferenceCountException(refCnt, increment);
                }
                if (refCntUpdater.compareAndSet(this, refCnt, refCnt + increment)) {
                    break;
                }
            }
            return this;
        }

        @Override
        public ByteBuf touch() {
            return this;
        }

        @Override
        public ByteBuf touch(Object hint) {
            return this;
        }

        @Override
        public final boolean release() {
            for (;;) {
                int refCnt = this.refCnt;
                if (refCnt == 0) {
                    throw new IllegalReferenceCountException(0, -1);
                }

                if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                    if (refCnt == 1) {
                        deallocate();
                        return true;
                    }
                    return false;
                }
            }
        }

        @Override
        public final boolean release(int decrement) {
            if (decrement <= 0) {
                throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
            }

            for (;;) {
                int refCnt = this.refCnt;
                if (refCnt < decrement) {
                    throw new IllegalReferenceCountException(refCnt, -decrement);
                }

                if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                    if (refCnt == decrement) {
                        deallocate();
                        return true;
                    }
                    return false;
                }
            }
        }

        /**
         * Called once {@link #refCnt()} is equals 0.
         */
        protected abstract void deallocate();
    }

该类,我们只需要了解,当一个ByteBuf 被引用的次数为 0 时,dealocate() 方法将被调用,该方法就是具体回收 ByteBuf 的操作,由具体的子类去实现。

1.3 UnpooledHeapByteBuf 与 UnpooledDirectByteBuf

首先该类的内部结构如下:

2020010910011\_2.png

2020010910011\_3.png

对于非池化的 UnpooledByteBuf,内部就是使用 array 来存储数据,相对简单,所以源码分析,我还是侧重于UnpooledDirectByteBuf。重点关注如下两个方面:

  1. 容量的扩容
  2. 内存的分配

1.3.1 capacity(int newCapacity)

    public ByteBuf capacity(int newCapacity) {
            ensureAccessible();     // @1 
            if (newCapacity < 0 || newCapacity > maxCapacity()) {
                throw new IllegalArgumentException("newCapacity: " + newCapacity);
            }

            int readerIndex = readerIndex();
            int writerIndex = writerIndex();

            int oldCapacity = capacity;
            if (newCapacity > oldCapacity) {   // @2
                ByteBuffer oldBuffer = buffer;
                ByteBuffer newBuffer = allocateDirect(newCapacity);  //@21
                oldBuffer.position(0).limit(oldBuffer.capacity());          //@22
                newBuffer.position(0).limit(oldBuffer.capacity());        //@23
                newBuffer.put(oldBuffer);                                            //@24
                newBuffer.clear();                                                         //@25
                setByteBuffer(newBuffer);                                            //@26
            } else if (newCapacity < oldCapacity) { //@3
                ByteBuffer oldBuffer = buffer;
                ByteBuffer newBuffer = allocateDirect(newCapacity);
                if (readerIndex < newCapacity) {
                    if (writerIndex > newCapacity) {
                        writerIndex(writerIndex = newCapacity);
                    }
                    oldBuffer.position(readerIndex).limit(writerIndex);
                    newBuffer.position(readerIndex).limit(writerIndex);
                    newBuffer.put(oldBuffer);
                    newBuffer.clear();
                } else {
                    setIndex(newCapacity, newCapacity);
                }
                setByteBuffer(newBuffer);
            }
            return this;
        }

代码@1,检测一下访问性,可达性,就是引用数必须大于0,否则该 ByteBuf 的内部空间已经被回收了(堆外内存)。

代码@2,扩容操作,思路新建一个缓存区,然后将原先缓存区的数据全部写入到新的缓存区,然后释放旧的缓存区。

代码@21、22,申请一个直接缓存区,然后将原缓冲区的 postion 设置为0,将 limit 设置为 capacity, 处于释放状态(从缓存区读)。

代码@23,将新缓存区的 postion,limit 属性设置为0,老缓存区 limit。

代码@24,将原缓冲区写入到新的缓存区,然后将缓存区置的 position 设置为0,limt 设置为 capacity,其实这里设置position,capacity 的意义不大,因为 ByteBuf 并不会利用内部的 ByteBuffe r的 limit,postion 属性,而是使用readerIndex, wriateIndex。

代码@26,关联新的 ByteBuffer, 并释放原缓存区的空间。

代码@3,压缩缓存区。实现思路是新建一个缓存区,如果 readerIndex 大于新建的 ByteBuffer 的 capacity,则无需将旧的缓存区内容写入到新的缓存区中。如果 readerIndex 小于新 capacity,那需要将 readerIndex 至( Math.min(writerIndex, newCapacity) )直接的内容写入到新的缓存,然后释放旧的缓存区。

我们在重点关注一下 setByteBuffer(newBuffer) 方法,该方法还负责销毁原先的 ByteBuffer。

    private void setByteBuffer(ByteBuffer buffer) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {
                if (doNotFree) {
                    doNotFree = false;
                } else {
                    freeDirect(oldBuffer);
                }
            }

            this.buffer = buffer;
            tmpNioBuf = null;
            capacity = buffer.remaining();
        }

释放原先的内存。

1.3.2 内存分配

Netty 在为内存的分配,单独封装,相关类图:

2020010910011\_4.png

目前,先关注 UnpooledByteBufAllocator,对象池的ByteBuf在后续章节中重点关注。

结合原代码,有如下两个方法引起了我的注意:

  1. 容量扩容规则(容量增长规则)calculateNewCapacity 方法。
  2. 直接内存的分配。newDirectBuffer 方法。

1.3.2.1 calculateNewCapacity

    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
            if (minNewCapacity < 0) {
                throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
            }
            if (minNewCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                        minNewCapacity, maxCapacity));
            }
            final int threshold = 1048576 * 4; // 4 MiB page

            if (minNewCapacity == threshold) {
                return threshold;
            }

            // If over threshold, do not double but just increase by threshold.
            if (minNewCapacity > threshold) {               //@1
                int newCapacity = minNewCapacity / threshold * threshold;
                if (newCapacity > maxCapacity - threshold) {
                    newCapacity = maxCapacity;
                } else {
                    newCapacity += threshold;
                }
                return newCapacity;
            }

            // Not over threshold. Double up to 4 MiB, starting from 64.
            int newCapacity = 64;c   // @2
            while (newCapacity < minNewCapacity) {
                newCapacity <<= 1;
            }

            return Math.min(newCapacity, maxCapacity);
        }
  • minNewCapacity:本次需要申请的最小内存。
  • macCapacity:最大总内存申请值。

代码@1,如果最小需要的内存超过设置的 threshold(阔值的话),则循环,每次增加threshold,然后看是否达到本次申请目标。

代码@2,如果需要申请的内存小于阔值,则以64个字节以2的幂增长。这里体现了内存扩容时的一个优化点。

1.3.2.2 newDirectBuffer 方法

    @Override
        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            ByteBuf buf;
            if (PlatformDependent.hasUnsafe()) {
                buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }

            return toLeakAwareBuffer(buf);
        }

该方法中,除了见到申请一个直接内存外,还将该 buf 变成一个可感知的对象。toLeakAwareBuffer 方法,用于该对象被引用的情况,因为 UnpooledDirectByteBuf 是一个聚合对象,内部维护了一个 java.nio.ByteBuffer 的直接对外内存空间,在什么是释放UnpooledDirectByteBuf 中的堆外内存呢?在 UnpooledDirectByteBuf 被java垃圾回收的时候,应该于此同时需要释放指向的堆外内存,但堆外内存不受JVM GC的管理,所以我们只有感知到UnpooledDirectByteBuf被JVM虚拟机回收后,手动去释放堆外内存,大家想想都知道,我们可以通过JAVA提供的引用机制,来实现跟踪垃圾回收器的收集工作,虚引用的作用来了,下一篇,我将会以这个为入口点,重点分析 Netty 堆外内存如何管理,也就是内存泄露检测等方面的课题。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 博客站点 » Netty(二):ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏