欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

Netty(十二):源码分析Netty解码编码器实现原理

非阻塞IO,存在一个特殊的问题,就是半包问题。Netty 为了屏蔽底层的半包问题,提供编码解码器。编码解码器在 Netty 里编织为一个个 Handler。本文重点分析一下消息解码器 ByteToMessageDecoder 的实现原理,该类的职责就是将字节流中解析为一个一个有效的客户端请求报文。

1、ByteToMessageDecoder 类概述

    /**
     * A {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
     * other Message type.
     *
     * For example here is an implementation which reads all readable bytes from
     * the input {@link ByteBuf} and create a new {@link ByteBuf}.
     *
     * <pre>
     *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
     *         {@code @Override}
     *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
     *                 throws {@link Exception} {
     *             out.add(in.readBytes(in.readableBytes()));
     *         }
     *     }
     * </pre>
     *
     * <h3>Frame detection</h3>
     * <p>
     * Generally frame detection should be handled earlier in the pipeline by adding a
     * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
     * or {@link LineBasedFrameDecoder}.
     * <p>
     * If a custom frame decoder is required, then one needs to be careful when implementing
     * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
     * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
     * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
     * <p>
     * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
     * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
     * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
     * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
     * <h3>Pitfalls</h3>
     * <p>
     * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
     * annotated with {@link @Sharable}.
     * <p>
     * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
     * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
     * to avoid leaking memory.
     */
  • 如果需要定制一个解码器的实现时,在检测是否有足够的字节到达(包含一个完整的请求)时,如果没有足够的字节到达时,不要改变累积缓存区(buufer)的readerIndex,writerIndex值。可以使用类似getInt(index)等。
  • 注意及时释放相关ByteBuf,避免内存泄漏。

2、ByteToMessageDecoder源码分析

2.1 累积器的实现原理

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() { 
            @Override
            public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {  //@1
                ByteBuf buffer;
                if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()    //@2
                        || cumulation.refCnt() > 1) {   //@3
                    // Expand cumulation (by replace it) when either there is not more room in the buffer
                    // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                    // duplicate().retain().
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/2327
                    // - https://github.com/netty/netty/issues/1764
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes());    //@4
                } else {
                    buffer = cumulation;    
                }
                buffer.writeBytes(in);   // @5
                in.release();                  //@6
                return buffer;
            }
        };

代码@1:参数1:具体的内存分配器,参数2:已累计接收的自己缓冲,参数3:本次读取的字节缓冲区。

代码@2:首先检测当前的累积缓存区是否能够容纳新增加的ByteBuf,如果容量不够,则需要扩展ByteBuf,为了避免内存泄漏,手动去扩展。

代码@3:如果累积缓存区引用数超过1,也需要扩展。

代码@4:扩充累积缓存区。

代码@5:将新输入的字节写入到累积缓存区。

代码@6:释放缓存区。

代码@4:扩展缓冲区实现。

    static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
            ByteBuf oldCumulation = cumulation;
            cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
            cumulation.writeBytes(oldCumulation);
            oldCumulation.release();
            return cumulation;
        }

这里有个非常关键的点:每次扩展的时候,都是产生一个新的累积缓存区,这里主要是确保每一次通道读,所涉及的缓存区不是同一个,这样减少释放跟踪的难度,避免内存泄露。

2.2 相关事件处理

解码器 ByteToMessageDecoder 在 Netty 中属于 InBound(输入方向)。主要关注的事件包括 handlerRemoved、channelReader、channelReadComplete、channelInactive。

2.2.1 handlerRemoved事件

handler从ChannelPipeline中移除时调用。

    @Override
        public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            ByteBuf buf = internalBuffer();
            int readable = buf.readableBytes();
            if (readable > 0) {
                ByteBuf bytes = buf.readBytes(readable);
                buf.release();
                ctx.fireChannelRead(bytes);
                ctx.fireChannelReadComplete();
            } else {
                buf.release();
            }
            cumulation = null;
            handlerRemoved0(ctx);
        }

该方法主要的实现思路是,如果内部的累积缓存区可读,则需要将剩余的字节处理,然后释放内部累积缓存区,并设置为空,然后提供一个钩子函数,供子类去实现。handlerRemoved0(ctx)。

2.2.2 channelRead 通道读事件

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {    //@1
                RecyclableArrayList out = RecyclableArrayList.newInstance();  //@2
                try {
                    ByteBuf data = (ByteBuf) msg;                                                   
                    first = cumulation == null;                                                          //@3
                    if (first) {
                        cumulation = data;
                    } else {
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);   //@4
                    }
                    callDecode(ctx, cumulation, out);                                                             //@5
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DecoderException(t);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {     //@6
                        cumulation.release();
                        cumulation = null;
                    }
                    int size = out.size();

                    for (int i = 0; i < size; i ++) {                  //@7
                        ctx.fireChannelRead(out.get(i));
                    }
                    out.recycle();                                        //@8
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }

代码@1:如果消息是字节流(ByteBuf),则进行解码,否则直接转发给下游Handler处理。

代码@2:新建一个List,每一个元素代表解码后的一条完整的客户端请求,在Netty 5的实现中,ChannelHandler的执行是线程安全的,因为每一个Channel相关事件的执行都在与Channel绑定的线程执行器(EventLoop)中执行。由于这里使用了线程本地对象池(Recycler),ArrayList是可以重复使用的。能在这里使用线程本地池,为了线程安全,线程池的的线程个数应该是1个,在这里,我觉得非常有必要再深究一下Netty线程模型关于执行相关的具体细节,故该篇文章讲解完后,会进一步回到前面的Netty线程模型篇,细化一下SingleThreadEventExecutor的执行逻辑。

代码@3、4:如果当前的累积区为空,说明是初次解码,直接设置累积区为本次读入的字节。否则,将读入的字节添加到累积区。合并累积区已在上文中讲解。

代码@5:解码器具体逻辑实现,稍后会详解。

代码@6:如果累积缓存区不为空并且不可读,释放该累积缓存区。

代码@7:将解码后的一条一条客户端请求(消息)转发给下游Handler进行处理。

代码@8:将资源回收,放入对象池,其实这里有资源泄露的可能。

接下来,重点看callDecode方法:

    /**
         * Called once data should be decoded from the given {@link ByteBuf}. This method will call
         * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
         *
         * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param in            the {@link ByteBuf} from which to read data
         * @param out           the {@link List} to which decoded messages should be added
         */
        protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while (in.isReadable()) {        //@1
                    int outSize = out.size();    //@2
                    int oldInputLength = in.readableBytes(); //@3
                    decode(ctx, in, out);                                 //@4

                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com/netty/netty/issues/1664
                    if (ctx.isRemoved()) {
                        break;
                    }

                    if (outSize == out.size()) {                                       //@5
                        if (oldInputLength == in.readableBytes()) {
                            break;
                        } else {
                            continue;
                        }
                    }

                    if (oldInputLength == in.readableBytes()) {           //@6
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
                    }

                    if (isSingleDecode()) {
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable cause) {
                throw new DecoderException(cause);
            }
        }

参数详解:

  • ChannelHandlerContext ctx 执行上下文。
  • ByteBuf in 当前累积的缓存区。
  • out 解码出消息的集合。

代码@1:如果当前累积区可读。

代码@2:当前解码后的消息条数,该值主要是判断本次解码,是否成功解码到消息。

代码@3:当前累积缓存区当前可读字节数。同样是用于判断是否成功解码。

代码@4:执行具体的解码实现(也成为编码解码协议的具体实现),该方法在具体的子类中实现。从这里也可以看出,整个ByteToMessageDecoder的设计,使用了模板模式。

代码@5:如果没有解码出消息(累积缓存区中的内容没有包含一个完整的请求信息,并且累积缓存区的可读数据没有发生变化,则结束本次解码。

代码@6:如果解码出消息,但是累积缓存区的可读数据没有发生变化,则抛出异常。

从代码@5,@6:可以得出如下重要结论,并且在我们实现自己的解码器时要特别注意:

  1. 如果没有成功从本次累积缓存区解码出需要的消息,则不能修改累积缓存区的readerIndex,writerIndex。
  2. 如果解码出合适的消息,则readerIndex,writerIndex要修改成已解析的字节的位置。

2.2.3 通道非激活事件实现 channelInactive

    @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                if (cumulation != null) {
                    callDecode(ctx, cumulation, out);
                    decodeLast(ctx, cumulation, out);
                } else {
                    decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                try {
                    if (cumulation != null) {
                        cumulation.release();
                        cumulation = null;
                    }
                    int size = out.size();
                    for (int i = 0; i < size; i++) {
                        ctx.fireChannelRead(out.get(i));
                    }
                    if (size > 0) {
                        // Something was read, call fireChannelReadComplete()
                        ctx.fireChannelReadComplete();
                    }
                    ctx.fireChannelInactive();
                } finally {
                    // recycle in all cases
                    out.recycle();
                }
            }
        }

channelInActivie,通道变为非激活时触发的事件,处理逻辑就是如果当前的累积区有数据,则需要将数据解码并发送给下游handler,处理通道读相关事件。这里对有调用一个新的方法,decodeLast,表示通道转为非激活状态最后一次解码,可以供子类去实现。

关于ByteToMessageDecoder的实现原理就分析到这了,ByteToMessageDecoder可以是说是Netty提供的一个标签解码器的模板(典型的模板模式),用户可以基于此模板定制自己的私有协议。为了更好定制自己的解码器,接下来将重点分析Netty提供的一些解码器的实现。

3、LineBasedFrameDecoder 解码器实现分析

LineBasedFrameDecoder是基于行分割符符合的解码器。(\n 或\r\n)。

源码分析LineBasedFrameDecoder的解码实现。

    /**
         * Create a frame out of the {@link ByteBuf} and return it.
         *
         * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param   buffer          the {@link ByteBuf} from which to read data
         * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
         *                          be created.
         */
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
            final int eol = findEndOfLine(buffer);     //@1
            if (!discarding) {                                      //@2
                if (eol >= 0) {                                      //@3
                    final ByteBuf frame;
                    final int length = eol - buffer.readerIndex();   //@4
                    final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;      //@5

                    if (length > maxLength) {     //@6
                        buffer.readerIndex(eol + delimLength);
                        fail(ctx, length);
                        return null;
                    }

                    if (stripDelimiter) { // @7
                        frame = buffer.readSlice(length);
                        buffer.skipBytes(delimLength);
                    } else {
                        frame = buffer.readSlice(length + delimLength);
                    }

                    return frame.retain();   //@8
                } else {  //@9
                    final int length = buffer.readableBytes(); 
                    if (length > maxLength) {       // @10
                        discardedBytes = length;
                        buffer.readerIndex(buffer.writerIndex());
                        discarding = true;
                        if (failFast) {
                            fail(ctx, "over " + discardedBytes);
                        }
                    }
                    return null;
                }
            } else {
                if (eol >= 0) {
                    final int length = discardedBytes + eol - buffer.readerIndex();
                    final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                    buffer.readerIndex(eol + delimLength);
                    discardedBytes = 0;
                    discarding = false;
                    if (!failFast) {
                        fail(ctx, length);
                    }
                } else {
                    discardedBytes = buffer.readableBytes();
                    buffer.readerIndex(buffer.writerIndex());
                }
                return null;
            }
        }

代码@1:从累积缓存区中,试图找到行结束符合(\r\n),具体实现再看。

代码@2:是否丢弃了一部分数据,当解码后的数据长度超过帧允许的最大长度时(maxLength)时,将丢弃整个累积缓存区。

代码@3:如果找到一个行结束标记,说明该累积缓存区中至少有一个完整的帧(请求信息),进入解码处理逻辑。

代码@4:计算该帧(请求信息)的长度,用eof减去当前累积区域的rederIndex即可。

代码@5:计算分割符所占用的字节长度,如果为\r\n则为两个字节,如果\n则表示1个字节。

代码@6:如果帧长度超过最大允许的长度,将累积缓存区的readerIndex设置为eof加上分隔符的长度,以便下次解码。同时触发exceptionCaught事件。

代码@7:根据是剥离分割符(剥离的话,就是该帧数据不会包含分割符合),从累积缓存区中读取一帧数据,使用的方式是 readSlice方法,共用累积缓存区的数据

代码@8:将解码处理的消息,引用加1,并返回处理,待交给下游Handler进一步处理。

代码@9,10:如果没有找到分割符,并且长度已经超过了maxLength的话,直接将该部分丢弃。

4、编码器实现原理(MessageToByteEncoder)

首先上文中提到了Netty解码器的实现原理,主要解决的问题是TCP的粘包,就是从请求流中解析出一个一个的客户端请求。解码器的职责是面向输入的,解析请求的(输入流)。而编码器,是面向响应的,将响应信息按照相关约定进行组织,方便接收端解析请求。上篇提到一个解码器(LineBasedFrameDecoder,请求信息以\n或\r\n),那是需要一个LineBasedFreameEncoder呢?答案是否定的,因为如果使用LineBasedFrameDecoder解码器解码请求信息的时候,与此响应报文中,肯定会以\n或\r\n结束,否则编码器将无法发挥作用,这也是编码器(响应流)会根据约定进行组织响应报文的原理。编码器的左右主要是实现自定义协议时需要用到的,重点实现ecode方法:下文给出MessageToByteEncoder的源码,由于实现原理简单,就不做过多讲解:

    package io.netty.handler.codec;

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.ChannelPromise;
    import io.netty.util.ReferenceCountUtil;
    import io.netty.util.internal.TypeParameterMatcher;

    /**
     * {@link ChannelHandlerAdapter} which encodes message in a stream-like fashion from one message to an
     * {@link ByteBuf}.
     *
     *
     * Example implementation which encodes {@link Integer}s to a {@link ByteBuf}.
     *
     * <pre>
     *     public class IntegerEncoder extends {@link MessageToByteEncoder}<{@link Integer}> {
     *         {@code @Override}
     *         public void encode({@link ChannelHandlerContext} ctx, {@link Integer} msg, {@link ByteBuf} out)
     *                 throws {@link Exception} {
     *             out.writeInt(msg);
     *         }
     *     }
     * </pre>
     */
    public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter {

        private final TypeParameterMatcher matcher;
        private final boolean preferDirect;

        /**
         * @see {@link #MessageToByteEncoder(boolean)} with {@code true} as boolean parameter.
         */
        protected MessageToByteEncoder() {
            this(true);
        }

        /**
         * @see {@link #MessageToByteEncoder(Class, boolean)} with {@code true} as boolean value.
         */
        protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
            this(outboundMessageType, true);
        }

        /**
         * Create a new instance which will try to detect the types to match out of the type parameter of the class.
         *
         * @param preferDirect          {@code true} if a direct {@link ByteBuf} should be tried to be used as target for
         *                              the encoded messages. If {@code false} is used it will allocate a heap
         *                              {@link ByteBuf}, which is backed by an byte array.
         */
        protected MessageToByteEncoder(boolean preferDirect) {
            matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
            this.preferDirect = preferDirect;
        }

        /**
         * Create a new instance
         *
         * @param outboundMessageType   The tpye of messages to match
         * @param preferDirect          {@code true} if a direct {@link ByteBuf} should be tried to be used as target for
         *                              the encoded messages. If {@code false} is used it will allocate a heap
         *                              {@link ByteBuf}, which is backed by an byte array.
         */
        protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
            matcher = TypeParameterMatcher.get(outboundMessageType);
            this.preferDirect = preferDirect;
        }

        /**
         * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
         * {@link ChannelHandler} in the {@link ChannelPipeline}.
         */
        public boolean acceptOutboundMessage(Object msg) throws Exception {
            return matcher.match(msg);
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            ByteBuf buf = null;
            try {
                if (acceptOutboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I cast = (I) msg;
                    buf = allocateBuffer(ctx, cast, preferDirect);
                    try {
                        encode(ctx, cast, buf);
                    } finally {
                        ReferenceCountUtil.release(cast);
                    }

                    if (buf.isReadable()) {
                        ctx.write(buf, promise);
                    } else {
                        buf.release();
                        ctx.write(Unpooled.EMPTY_BUFFER, promise);
                    }
                    buf = null;
                } else {
                    ctx.write(msg, promise);
                }
            } catch (EncoderException e) {
                throw e;
            } catch (Throwable e) {
                throw new EncoderException(e);
            } finally {
                if (buf != null) {
                    buf.release();
                }
            }
        }

        /**
         * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
         * Sub-classes may override this method to returna {@link ByteBuf} with a perfect matching {@code initialCapacity}.
         */
        protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                                   boolean preferDirect) throws Exception {
            if (preferDirect) {
                return ctx.alloc().ioBuffer();
            } else {
                return ctx.alloc().heapBuffer();
            }
        }

        /**
         * Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled
         * by this encoder.
         *
         * @param ctx           the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
         * @param msg           the message to encode
         * @param out           the {@link ByteBuf} into which the encoded message will be written
         * @throws Exception    is thrown if an error accour
         */
        protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
    }

总结:

由于使用非阻塞IO与流式请求,,IO的一次读写无法保证是一个完整的请求。【场景,客户端用一条通道发送了3个请求信息,在调用多次读API后,服务端只能保证整个请求序列是正确的,但无法保证单次读就能刚好是一个请求序列。比如 如下3个请求 【ABC】 [CD] [EFG] 】 服务端第一次读,可能是[AB] 第二次读[CDEFG]。所以需要将请求信息进行解码,如果不足一个请求,则需要累积请求序列,每读一次,就将新读到的字节序列与原来累积的序列进行合并后再尝试解析。故ByteToMessageDecoder应用而生,该类使用模板模式进行代码设计,累积缓存区的操作是亮点。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 » Netty(十二):源码分析Netty解码编码器实现原理

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏