欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

Netty(十六):写事件处理NioSocketChannel、ChannelOutbondBuffer源码分析

本文继续就Netty处理网络事件相关内容进行展开,重点分析Netty是如何处理写事件的。程序入口还是在NioEventLoop的processSlectedKey(SelectionKey k, AbstractNioChannel ch)。

20200109100115\_1.png

最终将执行AbstractNioChannel$AbstractNioUnsafe 的forceFlush()方法。

1、AbstractNioChannel$AbstractNioUnsafe forceFlush

该方法直接调用其父类AbstractUnsafe的flush0方法:

    protected void flush0() {
                if (inFlush0) {
                    // Avoid re-entrance
                    return;
                }

                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;  // @1
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }

                inFlush0 = true;

                // Mark all pending write requests as failure if the channel is inactive.
                if (!isActive()) { // @2
                    try {
                        if (isOpen()) {
                            outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
                        } else {
                            outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
                        }
                    } finally {
                        inFlush0 = false;
                    }
                    return;
                }

                try {
                    doWrite(outboundBuffer);    //@3
                } catch (Throwable t) {
                    outboundBuffer.failFlushed(t);
                } finally {
                    inFlush0 = false;
                }
            }

代码@1:Netty封装的写缓存的数据结构,下文重点分析。

代码@2:如果通道没有激活,调用outboundBuffer.failFlushed方法。

代码@3:写入的具体实现,由子类实现,本文重点分析NioSocketChannel,故目光将移动到NioSocketChannel的doWriter方法。

2、NioSocketChannel doWriter方法详解

    @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            for (;;) {
                int size = in.size();
                if (size == 0) {
                    // All written so clear OP_WRITE
                    clearOpWrite();                                      //@1
                    break;
                }
                long writtenBytes = 0;
                boolean done = false;                              
                boolean setOpWrite = false;                  

                // Ensure the pending writes are made of ByteBufs only.
                ByteBuffer[] nioBuffers = in.nioBuffers();                             
                int nioBufferCnt = in.nioBufferCount();
                long expectedWrittenBytes = in.nioBufferSize();
                SocketChannel ch = javaChannel();                               //@2

                // Always us nioBuffers() to workaround data-corruption.
                // See https://github.com/netty/netty/issues/2761
                switch (nioBufferCnt) {
                    case 0:                                                                        //@3
                        // We have something else beside ByteBuffers to write so fallback to normal writes.
                        super.doWrite(in);
                        return;
                    case 1:                                                                              //@4
                        // Only one ByteBuf so use non-gathering write
                        ByteBuffer nioBuffer = nioBuffers[0];
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {       
                            final int localWrittenBytes = ch.write(nioBuffer);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                    default:
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {                  //@5
                            final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                }

                // Release the fully written buffers, and update the indexes of the partially written buffer.
                in.removeBytes(writtenBytes);      //@6

                if (!done) {
                    // Did not write all buffers completely.
                    incompleteWrite(setOpWrite);      //@7
                    break;
                }
            }
        }

代码@1:ChannelOutboundBuffer,在这里先理解为写缓存区,如果写缓存区待写入的字节数为0,则取消写事件。这是非常有必要的,不然每次select操作,都有可能返回该通道可写,因为该通道的写缓存区未满既可认为可写。那什么时候会重新注册写事件呢?处理读事件后,会重新关注该通道的写事件。

关写clearOpWriter的实现如下:

    protected final void clearOpWrite() {

            final SelectionKey key = selectionKey();

            // Check first if the key is still valid as it may be canceled as part of the deregistration

            // from the EventLoop

            // See https://github.com/netty/netty/issues/2104

            if (!key.isValid()) {

                return;

            }

            final int interestOps = key.interestOps();

            if ((interestOps & SelectionKey.OP_WRITE) != 0) {

                key.interestOps(interestOps & ~SelectionKey.OP_WRITE);

            }

        }

代码@2:解释一下几个局部变量的含义:

  • writtenBytes :调用通道写API后,实际写入字节数
  • done :写缓存区数据是否全部写入通道。
  • setOpWrite : 是否只需要重新关注写事件
  • nioBuffers : java.nio.ByteBuffer数组,该写入缓存区所有的待写入的ByteBuffer。
  • nioBufferCnt : nioBuffers数量
  • expectedWrittenBytes :期待被写入的字节数,写缓冲区总共的可读字节数。
  • ch : 该通道持有的原生NIO通道SocketChannel。

代码@3:如果nioBuffers个数为0,可能是其他内容的数据,比如FileRegion,则调用父类AbstractNioByteChannel的doWriter方法,待深入讲解。

代码@4,@7:如果只有一个ByteBuffer,则不需要使用nio的 gathering write。

这块的逻辑如下:首先和读事件一样,可以设置一次IO写事件,可以循环调用通道API(socketChannel.wirte的次数),这里主要是为了公平起见,避免一个通道一直在写而其他通道的相关事件得不到及时处理。循环开始调用socketChannel.write方法,

  1. 第一种情况:如果socketChannel.write方法返回0,则表示本次没有写入任何字节,设置setOpwrite=true,此时直接跳出,表示需要注册写事件,下次写事件达到时再处理,此时done=flase;setOpWrite为true;根据@8的incompleteWrite方法会被调用,由于setOpWrite为true,只需要简单的关注写事件。
  2. 第二种情况:expectedWrittenBytes 为0,表示在允许的循环次数内,完成了内容的写入操作,此时设置done为true,不会调用incompleteWrite方法,但会执行代码@1处,取消写事件。
  3. 第三种情况,达到配置允许的最大写次数后,默认为16次,数据还未写完,此时setOpWrite=false,done:false,执行incompleteWrite方法的else分支,放入到任务队列中,等该IO线程处理完其他的key,然后会再运行。在讲解线程模型时http://blog.csdn.net/prestigeding/article/details/64443479,我们应该知道,NioEventLoop会首先执行选择键(select),然后处理建processSelectedKey(),然后会执行runAllTask方法,这里的runAllTask方法就是运行在此处加入的任务,从整个select,然后再执行processSelectedKey,再到runAllTask方法,全部在同一个IO线程中执行,故在Netty中,Channel,IO Handler都是线程安全的。包括这里的ChannelOutboundBuffer,写缓存区。
    protected final void incompleteWrite(boolean setOpWrite) {
            // Did not write completely.
            if (setOpWrite) {
                setOpWrite();
            } else {
                // Schedule flush again later so other tasks can be picked up in the meantime
                Runnable flushTask = this.flushTask;
                if (flushTask == null) {
                    flushTask = this.flushTask = new Runnable() {
                        @Override
                        public void run() {
                            flush();
                        }
                    };
                }
                eventLoop().execute(flushTask);
            }
        }
    protected final void setOpWrite() {
            final SelectionKey key = selectionKey();
            // Check first if the key is still valid as it may be canceled as part of the deregistration
            // from the EventLoop
            // See https://github.com/netty/netty/issues/2104
            if (!key.isValid()) {
                return;
            }
            final int interestOps = key.interestOps();
            if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                key.interestOps(interestOps | SelectionKey.OP_WRITE);
            }
        }

代码@5的处理逻辑与代码@4一样,只是调用了gathering writer api。

代码@6:将已读的字节跳过,其实就是更新内部ByteBuffer的readerIndex。

关于2代码@3,如果nioBuffers为0,则调用AbstractNioByteChannel的doWriter方法,接下来重点关注一下这个方法:

    @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            int writeSpinCount = -1;

            for (;;) {
                Object msg = in.current();       
                if (msg == null) {                               //@1
                    // Wrote all messages.
                    clearOpWrite();
                    break;
                }

                if (msg instanceof ByteBuf) {
                    ByteBuf buf = (ByteBuf) msg;
                    int readableBytes = buf.readableBytes();
                    if (readableBytes == 0) {
                        in.remove();
                        continue;
                    }

                    boolean setOpWrite = false;
                    boolean done = false;
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        int localFlushedAmount = doWriteBytes(buf);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }

                        flushedAmount += localFlushedAmount;
                        if (!buf.isReadable()) {
                            done = true;
                            break;
                        }
                    }

                    in.progress(flushedAmount);

                    if (done) {
                        in.remove();
                    } else {
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else if (msg instanceof FileRegion) {      //@1
                    FileRegion region = (FileRegion) msg;
                    boolean done = region.transfered() >= region.count();
                    boolean setOpWrite = false;

                    if (!done) {
                        long flushedAmount = 0;
                        if (writeSpinCount == -1) {
                            writeSpinCount = config().getWriteSpinCount();
                        }

                        for (int i = writeSpinCount - 1; i >= 0; i--) {
                            long localFlushedAmount = doWriteFileRegion(region);
                            if (localFlushedAmount == 0) {
                                setOpWrite = true;
                                break;
                            }

                            flushedAmount += localFlushedAmount;
                            if (region.transfered() >= region.count()) {
                                done = true;
                                break;
                            }
                        }

                        in.progress(flushedAmount);
                    }

                    if (done) {
                        in.remove();
                    } else {
                        incompleteWrite(setOpWrite);
                        break;
                    }
                } else {
                    // Should not reach here.
                    throw new Error();
                }
            }
        }

在这里,相关的处理逻辑与NioSocketChannel的doWriter方法类似,只是这里处理的数据是ChannelOutboundBuffer的current字段。

关于ChannelOutboudBuffer相关的API,在下文中会重点梳理,目前我感兴趣的是FileRegion这个类,因为我看到了transfered这样的字眼,零拷贝的核心实现FileChannel的tranferTo和tranferFrom方法。

FileRegion在Netty中的用户为:io.netty.channel.epoll.AbstractEpollStreamChannel。java中真正实现零拷贝的是:FileChannel的 public abstract long transferTo(long position, long count,WritableByteChannel target)。

接下来将讲解本节另外一个重点:ChannelOutboundBuffer的实现,写缓存区。

3、ChannelOutboundBuffer 源码解读

3.1 ChannelOutboundBuffer概述

    /**

     * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending

     * outbound write requests.

     * <p>

     * All methods must be called by a transport implementation from an I/O thread, except the following ones:

     * <ul>

     * <li>{@link #size()} and {@link #isEmpty()}</li>

     * <li>{@link #isWritable()}</li>

     * <li>{@link #getUserDefinedWritability(int)} and {@link #setUserDefinedWritability(int, boolean)}</li>

     * </ul>

     * </p>

     */

该类实现一个预先写缓存区,就是先将写请求放入该缓冲区,然后调用flush一次写入通道。

Netty整个网络读写实现技巧:

ChannelOutboundBuffer,写缓存区,调用netty通道的writer方法,并不会产生真正的IO写请求,只是将需要写入的数据加入到ChannelOutboundBuffer中(内部数据结构为Entry,链表形式),刚加入的Entry是未被刷新的,在没有调用outboundBuffer.addFlush方法之前(也就是直接调用flush0),这些Entry是不会被写入通道(java.nio.SocketChannel)的。特别要注意的是NioEventLoop在处理写事件的时候,是直接调用Unsafe的flush0方法,所以在业务线程在调用netty通道API的时候,调用完write方法后,一定要调用flush方法,否则数据不会写入到通道,造成响应没任何反应。

20200109100115\_2.png3.2 重要属性说明

    private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
            @Override
            protected ByteBuffer[] initialValue() throws Exception {
                return new ByteBuffer[1024];
            }
        };  

        private final Channel channel;

        // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
        //
        // The Entry that is the first in the linked-list structure that was flushed
        private Entry flushedEntry;
        // The Entry which is the first unflushed in the linked-list structure
        private Entry unflushedEntry;
        // The Entry which represents the tail of the buffer
        private Entry tailEntry;
        // The number of flushed entries that are not written yet
        private int flushed;

        private int nioBufferCount;
        private long nioBufferSize;

        private boolean inFail;

        private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER;

        @SuppressWarnings("UnusedDeclaration")
        private volatile long totalPendingSize;

        private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER;

        @SuppressWarnings("UnusedDeclaration")
        private volatile int unwritable;

        private volatile Runnable fireChannelWritabilityChangedTask;
  • NIO_BUFFERS:线程本地变量,该线程的java.nio.Bytebuffer数组,用来存放写入消息的。
  • Channel channel,特定的通道,每个通道只会有一个ChannelOutboundBuffer。
  • flushedEntry,表示第一个被刷新的Entry,在写入时,从该Entry开始写。
  • unflushedEntry,未刷新链表中第一个节点。
  • flushed,待写入的entry个数,这个数据代表在执行一次真正的flush,将会有多少个entry中的内容会被写入到通道。
  • nioBufferCount,ChannelOutboundBuffer中被刷新的entry个数。
  • nioBufferSize,ChannelOutBoundBuffer中被刷新内容的总字节数。
  • totalPendingSize,预写入的总字节数,这个值不能超过配置的阔值。
  • unwritable 是否不可写,如果预写入的数据超过配置的最大阔值,则将该值设置为true,表示通道不可写。

3.3 核心API研读

从AbstractNioByteBufChannel、AbstractChannel.AbstractUnsafe的flush、flush0的处理流程来看,ChannelOutboudBuffer的API被调用的顺序分别为

  1. isEmpty
  2. nioBuffers(),nioBufferCount(),nioBufferSize()
  3. removeBytes(int)

接下来会按照上述方法进行讲解,当然在讲解的过程中会引出一些关键的方法,比如 addMessage、addFlush方法等。

3.3.1 isEmpty()

    public boolean isEmpty() {

            return flushed == 0;

    }

在NioSocketChannel的doWriter方法中,如果isEmpty返回true,直接结束本次写入操作,更加准确的是结束本次flush操作。flushed该字段代表的当时待写入的Entry,如果为0,表示没有待flush的Entry,但不代表ChannelOutboundBuffer中没有Entry存在,比如调用Channel.writer方法,会往ChannelOutboundBuffer增加Entry,但在没有调用addFlush方法之前,ChannelOutboundBuffer中的flushed 字段的值不会增加。既然有增加,肯定就会有减少,应该在removeBytes方法中,下文再做详解。

3.1.2 nioBuffers(),nioBufferCount(),nioBuferSize()

本方法,调用一次nioBuffers,会同步更新nioBufferCount与nioBufferSize属性。

nioBuffers,只处理消息类型是ByteBuf的情况。

    /**
         * Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
         * {@link #nioBufferCount()} and {@link #nioBufferSize()} will return the number of NIO buffers in the returned
         * array and the total number of readable bytes of the NIO buffers respectively.
         * <p>
         * Note that the returned array is reused and thus should not escape
         * {@link AbstractChannel#doWrite(ChannelOutboundBuffer)}.
         * Refer to {@link NioSocketChannel#doWrite(ChannelOutboundBuffer)} for an example.
         * </p>
         */
        public ByteBuffer[] nioBuffers() {
            long nioBufferSize = 0;
            int nioBufferCount = 0;
            final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);      //@1
            Entry entry = flushedEntry;
            while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {   //@2
                if (!entry.cancelled) {                                                                   //@3
                    ByteBuf buf = (ByteBuf) entry.msg;
                    final int readerIndex = buf.readerIndex();
                    final int readableBytes = buf.writerIndex() - readerIndex;

                    if (readableBytes > 0) {
                        nioBufferSize += readableBytes;
                        int count = entry.count;
                        if (count == -1) {
                            //noinspection ConstantValueVariableUse
                            entry.count = count = buf.nioBufferCount();
                        }
                        int neededSpace = nioBufferCount + count;
                        if (neededSpace > nioBuffers.length) {
                            nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                            NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                        }
                        if (count == 1) {   //@4
                            ByteBuffer nioBuf = entry.buf;
                            if (nioBuf == null) {
                                // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
                                // derived buffer
                                entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                            }
                            nioBuffers[nioBufferCount ++] = nioBuf;
                        } else {             //@5
                            ByteBuffer[] nioBufs = entry.bufs;
                            if (nioBufs == null) {
                                // cached ByteBuffers as they may be expensive to create in terms
                                // of Object allocation
                                entry.bufs = nioBufs = buf.nioBuffers();
                            }
                            nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
                        }
                    }
                }
                entry = entry.next;
            }
            this.nioBufferCount = nioBufferCount;
            this.nioBufferSize = nioBufferSize;

            return nioBuffers;
        }

该方法的实现不难,主要逻辑,就是将已经flush过的Entry中的java.nio.ByteBuffer提取出来,次方法并不会改变flushedEntry,unFlushedEntry,flushed等相关属性,只是获取。

代码@1:获取线程本地变量中存放的nioBuffers[]数组,用来存放被刷新的Entry中的ByteBuffer。

代码@2:循环,知道Entry未被刷新,并且内部消息是ByteBuf

代码@3:判断该Entry是否被取消。

代码@4、代码@5:就是从Entry的msg中得到ByteBuf信息,可能一个ByteBuf里面包含了多个CompositeByteBuf()。然后放入到nioBuffers中。并返回。

不难看出,nioBuffers方法,返回的是被刷新后还未写入的Entry,那么这些Entry是什么时候被加入到ChannelOutbondBuffer中的呢?请看addMessage,addFlush方法

3.1.3 addMessage方法详解

    /**
         * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
         * the message was written.
         */
        public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);    // @1
            if (tailEntry == null) {     
                flushedEntry = null;
                tailEntry = entry;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
                tailEntry = entry;
            }
            if (unflushedEntry == null) {
                unflushedEntry = entry;
            }                                               //@2

            // increment pending bytes after adding message to the unflushed arrays.
            // See https://github.com/netty/netty/issues/1619
            incrementPendingOutboundBytes(size, false);      //@3
        }

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
            if (size == 0) {
                return;
            }

            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);    //@4
            if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
                setUnwritable(invokeLater);
            }
        }
    private void setUnwritable(boolean invokeLater) {        
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue | 1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {  //@5
                    if (oldValue == 0 && newValue != 0) {
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }
    private void fireChannelWritabilityChanged(boolean invokeLater) { //@6
            final ChannelPipeline pipeline = channel.pipeline();
            if (invokeLater) {
                Runnable task = fireChannelWritabilityChangedTask;
                if (task == null) {
                    fireChannelWritabilityChangedTask = task = new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelWritabilityChanged();
                        }
                    };
                }
                channel.eventLoop().execute(task);
            } else {
                pipeline.fireChannelWritabilityChanged();
            }
        }

代码@1:ChannelOutboundBuffer内部使用Entry(链表节点)来存放待写入的内容,比如ByteBuf,然后初始化size,totalSize两个属性。由于Entry结构比较简单,在这不做过多讲解。

代码@2:将新增加的Entry放入到队列,如果尾部节点为空,则设在尾部节点为新增节点,flushedEntry设在为空,次数unFlushedEntry也为空。如果尾部节点不为空,将尾部节点用临时变量存储,然后当前节点设置为尾部节点,原尾部节点的next指向新增的节点,如果unFlushedEntry为空,则设在unFlushedEntry为当前节点。

代码@3,4,5,6:每增加一个Entry,就要更新一下ChannelOutboundBuffer中已经预写入的字节总数,看是否超过了允许的最大预写字节数,默认值为64 * 1024,64k。如果超过了允许预写入字节数,则会改变通道的是否可写状态,并且触发pipeline.fireChannelWritabilityChanged()事件。

3.1.4 addFlush方法

该方法,ChannelOutboundBuffer中未被刷新的Entry,进行刷新,已便通道在执行真正的写操作时能将预先存放在写缓存区的字节写入底层通道(java.nio.SocketChannel)

    /**
         * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
         * and so you will be able to handle them.
         */
        public void addFlush() {
            // There is no need to process all entries if there was already a flush before and no new messages
            // where added in the meantime.
            //
            // See https://github.com/netty/netty/issues/2577
            Entry entry = unflushedEntry; // @1
            if (entry != null) {
                if (flushedEntry == null) {    //@2
                    // there is no flushedEntry yet, so start with the entry
                    flushedEntry = entry;
                }
                do { //@3
                    flushed ++;
                    if (!entry.promise.setUncancellable()) {
                        // Was cancelled so make sure we free up memory and notify about the freed bytes
                        int pending = entry.cancel();
                        decrementPendingOutboundBytes(pending, false);
                    }
                    entry = entry.next;
                } while (entry != null);

                // All flushed so reset unflushedEntry
                unflushedEntry = null;  //@4
            }
        }

代码@1:从第一个未被刷新的Entry开始。如果第一个未被刷新的Entry为空,则结束本次刷新。

代码@2:如果已被刷新的Entry为空,则设在为第一个未被刷新的Entry,然后开始循环遍历整个链表,直到Entry为空,也就是一次刷新,会刷新到链表的尾部。

代码@3:开始循环刷新entry,flushed每刷新一个增加1,这样ChannelOutboundBuffer的isEmpty将返回false,相关Unsafe的flush方法,才会调用真正的通道写操作(NioSocketChannel.doWrite方法)。如果Entry被取消,如果entry.promise.setUncancelable返回false,则需要取消该Entry,并减少预写入空间大小,会触发写状态的改变。

代码@4:将unflushedEntry设置为空,表示刷新完毕,这样在下次有写入事件发生后调用addMessage时,unflushedEntry将被设置为新增的Entry。

综合上面看,addFlush方法,主要是修改flushed,unflushedEntry属性,方便在nioBuffer()方法中返回相关的ByteBuf。

在这里,我们也可以知道,那在什么时候会减少flushed,将flushedEntry设置为空呢?我想肯定在removeBytes方法中。将刷新后的Entry写入缓存区后,需要从ChannelOutboundBuffer中移除。

3.1.5 removeBytes(int)方法

该方法要做的就是,根据写入的具体字节数,从第一个flushedEntry开始,根据flusheEntry里面ByteBuf的可读字节数,与总共写入的字节数进行比对,如果总写入的字节数大于ByteBuf的可读字节数,则可以直接移除该flushedEntry,然后继续循环,否则修改ByteBuf的readerIndex到合适的位置,然后结束本次removeBytes。

    /**
         * Removes the fully written entries and update the reader index of the partially written entry.
         * This operation assumes all messages in this buffer is {@link ByteBuf}.
         */
        public void removeBytes(long writtenBytes) {
            for (;;) {
                Object msg = current();            // @1
                if (!(msg instanceof ByteBuf)) {
                    assert writtenBytes == 0;
                    break;
                }

                final ByteBuf buf = (ByteBuf) msg;
                final int readerIndex = buf.readerIndex();
                final int readableBytes = buf.writerIndex() - readerIndex;

                if (readableBytes <= writtenBytes) {   // @2
                    if (writtenBytes != 0) {
                        progress(readableBytes);
                        writtenBytes -= readableBytes; 
                    }
                    remove();      //@3
                } else { // readableBytes > writtenBytes      //@4
                    if (writtenBytes != 0) {
                        buf.readerIndex(readerIndex + (int) writtenBytes);    
                        progress(writtenBytes);   //@5
                    }
                    break;
                }
            }
        }

    public boolean remove() {
            Entry e = flushedEntry;
            if (e == null) {
                return false;
            }
            Object msg = e.msg;

            ChannelPromise promise = e.promise;
            int size = e.pendingSize;

            removeEntry(e);      // @6

            if (!e.cancelled) {
                // only release message, notify and decrement if it was not canceled before.
                ReferenceCountUtil.safeRelease(msg);
                safeSuccess(promise);
                decrementPendingOutboundBytes(size, false);
            }

            // recycle the entry
            e.recycle();

            return true;
        }

    private void removeEntry(Entry e) {
            if (-- flushed == 0) {
                // processed everything
                flushedEntry = null;
                if (e == tailEntry) {
                    tailEntry = null;
                    unflushedEntry = null;
                }
            } else {
                flushedEntry = e.next;
            }
        }

代码@1:获取第一个flushedEntry所代表的msg。

代码@2:如果当前flushedEntry代表的msg,ByteBuf的可读字节数小于总写入字节数,说明该ByteBuf被完整的写入,则调用remove方法,将该Entry从链表中移除,并回收(Entry)的管理使用了线程对象池。

代码@3:从链表中将flushedEntry移除,并将flushedEntry设置为它的next,移除的Entry将被回收利用(线程本地对象池Recyer)。

代码@4:如果ByteBuf可读字节数大于总写入数,则说明没有将ByteBuf的全部内容写入通道,此时,需要设置readerIndex,然后跳出removeBytes即可。

代码@5:汇报进度,如果完成写入,许可会得到通知,并触发相关事件。

代码@6:移除一个Entry,同时将flushed减1。

总结:NioSocketChannel的写事件处理的流程为,如果写缓存区中没有可写字节,则取消写事件的监听。如果写缓存区的数据不是以ByteBuf存在的,则直接调用父类(AbstractNioByteChannel的doWriter方法,里面有FileRegion的相关处理,如果消息类型是ByteBuf,并且写缓存区不为空(是否为空的判断标准不是缓存区是否有ByteBuf,而是是否有被刷新(待写入)的字节,也就是如果addFlush方法没有被调用,默认会任务该预先缓存区为空),如果不为空,则调用nioBuffers方法获取待写入的ByteBuf,如果有多个ByteBuf,则使用NIO的 scathing特性进行写入,得到最终的写入字节数,然后调用removeBtyes移除写入的字节信息。

然后重点分析了Netty写预先缓存区的实现原理:

ChannelOutboundBuffer,写缓存区,调用netty通道的writer方法,并不会产生真正的IO写请求,只是将需要写入的数据加入到ChannelOutboundBuffer中(内部数据结构为Entry,链表形式),刚加入的Entry是未被刷新的,在没有调用outboundBuffer.addFlush方法之前(也就是直接调用flush0),这些Entry是不会被写入通道(java.nio.SocketChannel)的。特别要注意的是NioEventLoop在处理写事件的时候,是直接调用Unsafe的flush0方法,所以在业务线程在调用netty通道API的时候,调用完write方法后,一定要调用flush方法,否则数据不会写入到通道,造成响应没任何反应。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 » Netty(十六):写事件处理NioSocketChannel、ChannelOutbondBuffer源码分析

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏