欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

Netty(十四):再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor

1、线程模型总结

Netty线程模型基于主从Reactor模型;Channel会绑定一个线程模型(EventLoopGroup),与该通道的读,写等事件都在一个EventLoopGroup中执行,避免了Handler执行的线程安全问题。

线程模型前置篇:

  1. Nio实现Reactor模式
  2. 图说netty线程模型

2、源码分析NioEventLoopGroup初始化流程

2.1 NioEventLoopGroup构造方法

    /**
         * Create a new instance that uses twice as many {@link EventLoop}s as there processors/cores
         * available, as well as the default {@link Executor} and the {@link SelectorProvider} which
         * is returned by {@link SelectorProvider#provider()}.
         *
         * @see io.netty.util.concurrent.DefaultExecutorServiceFactory
         */
        public NioEventLoopGroup() {
            this(0);
        }

        /**
         * Create a new instance that uses the default {@link Executor} and the {@link SelectorProvider} which
         * is returned by {@link SelectorProvider#provider()}.
         *
         * @see io.netty.util.concurrent.DefaultExecutorServiceFactory
         *
         * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
         *                      If {@code executor} is {@code null} this number will also be the parallelism
         *                      requested from the default executor. It is generally advised for the number
         *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
         */
        public NioEventLoopGroup(int nEventLoops) {
            this(nEventLoops, (Executor) null);
        }

        /**
         * Create a new instance that uses the the {@link SelectorProvider} which is returned by
         * {@link SelectorProvider#provider()}.
         *
         * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
         *                      If {@code executor} is {@code null} this number will also be the parallelism
         *                      requested from the default executor. It is generally advised for the number
         *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
         *                      {@code executor} to lie very close together.
         * @param executor   the {@link Executor} to use, or {@code null} if the default should be used.
         */
        public NioEventLoopGroup(int nEventLoops, Executor executor) {
            this(nEventLoops, executor, SelectorProvider.provider());
        }

        /**
         * Create a new instance that uses the the {@link SelectorProvider} which is returned by
         * {@link SelectorProvider#provider()}.
         *
         * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
         *                      If {@code executor} is {@code null} this number will also be the parallelism
         *                      requested from the default executor. It is generally advised for the number
         *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
         *                      {@code executor} to lie very close together.
         * @param executorServiceFactory   the {@link ExecutorServiceFactory} to use, or {@code null} if the default
         *                                 should be used.
         */
        public NioEventLoopGroup(int nEventLoops, ExecutorServiceFactory executorServiceFactory) {
            this(nEventLoops, executorServiceFactory, SelectorProvider.provider());
        }

        /**
         * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
         *                      If {@code executor} is {@code null} this number will also be the parallelism
         *                      requested from the default executor. It is generally advised for the number
         *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
         *                      {@code executor} to lie very close together.
         * @param executor  the {@link Executor} to use, or {@code null} if the default should be used.
         * @param selectorProvider  the {@link SelectorProvider} to use. This value must not be {@code null}.
         */
        public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) {
            super(nEventLoops, executor, selectorProvider);
        }

        /**
         * @param nEventLoops   the number of {@link EventLoop}s that will be used by this instance.
         *                      If {@code executor} is {@code null} this number will also be the parallelism
         *                      requested from the default executor. It is generally advised for the number
         *                      of {@link EventLoop}s and the number of {@link Thread}s used by the
         *                      {@code executor} to lie very close together.
         * @param executorServiceFactory   the {@link ExecutorServiceFactory} to use, or {@code null} if the
         *                                 default should be used.
         * @param selectorProvider  the {@link SelectorProvider} to use. This value must not be {@code null}.
         */
        public NioEventLoopGroup(
                int nEventLoops, ExecutorServiceFactory executorServiceFactory, final SelectorProvider selectorProvider) {
            super(nEventLoops, executorServiceFactory, selectorProvider);
        }

重点关注构造方法如下参数:

  • int nEventLoops
    EventLoop个数。
  • Executor executor
    任务执行器。
  • SelectorProvider selectorProvider
    Nio Selector。

并且在使用 EventLoopGroup boosGroup = new EventLoopGroup();时,在未调用其父类构造时,nEventLoops为0, executor为null,selectorProvider为特定平台下的Selector实现类。

2.2 进入到直接父类MultithreadEventLoopGroup构造方法

    /**
         * @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)}
         */
        protected MultithreadEventLoopGroup(int nEventLoops, Executor executor, Object... args) {
            super(nEventLoops == 0 ? DEFAULT_EVENT_LOOP_THREADS : nEventLoops, executor, args);
        }

该构造方法,只是初始化nEventLoops参数,如果为0,则使用CPU可用核心数的2倍。

2.3 进入到父类MultithreadEventExecutorGroup中,这里是具体初始化的地方

    private MultithreadEventExecutorGroup(int nEventExecutors,
                                              Executor executor,
                                              boolean shutdownExecutor,
                                              Object... args) {
            if (nEventExecutors <= 0) {
                throw new IllegalArgumentException(
                        String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors));
            }

            if (executor == null) {
                executor = newDefaultExecutorService(nEventExecutors); // @1
                shutdownExecutor = true;
            }

            children = new EventExecutor[nEventExecutors];                //@2
            if (isPowerOfTwo(children.length)) {                                     //@3
                chooser = new PowerOfTwoEventExecutorChooser();
            } else {
                chooser = new GenericEventExecutorChooser();
            }

            for (int i = 0; i < nEventExecutors; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);                  //@4
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {                                       //@5
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }

                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }

            final boolean shutdownExecutor0 = shutdownExecutor;
            final Executor executor0 = executor;
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {      //@6
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                        if (shutdownExecutor0) {
                            // This cast is correct because shutdownExecutor0 is only try if
                            // executor0 is of type ExecutorService.
                            ((ExecutorService) executor0).shutdown();
                        }
                    }
                }
            };

            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }

            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }

代码@1:创建该线程模型的线程执行器,此处返回的是io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors。

代码@2:开始创建该组的工作EventExecutor数组,其中真正存放的实例由代码@4中创建。

代码@3:创建从线程组(EventExecutor)执行器中选择一个执行。

代码@4:创建一个具体的执行器、有具体的线程模型(EventLoopGroup)子类实现。这里是下一步要重点关注的对象,NioEventLoopGroup实例化的对象为NioEventLoop:

20200109100113\_1.png

代码@5:如果创建执行器失败,则关闭资源。

代码@6:添加相关事件通知处理器。

执行到这里,NioEventLoopGroup的初始化完成,目前可以得出如下结论:

  • 1个NioEventLoopGroup可以有多个 NioEventLoop,轮流接受客户端请求;
  • 同一个NioEventLoopGroup中的NioEventLoop共同持有一个Executor,这个Executor是何许人也(io.netty.util.internal.chmv8.ForkJoinPool,并发度为 nEventExecutors)。

接下来将重点探究NioEventLoop的类层次结构与其实现。

3、NioEventLoop初探

NioEventLoop继承自SingleThreadEventLoop,从字面上讲,NioEventLoopGroup的每一个NioEventLoop是一个线程的事件循环器,而每一个NioEventLoop中的执行器(EventExecutor)是一个并发度为nEventLoops的ForkJoinPool。
NioEventLoop的构造方法如下:

20200109100113\_2.png

这里大家是否想过一个问题,为什么单个线程的EventLoop(SingleThreadEventLoop)的事件循环器,里面需要用一个线程池呢?这里用单个线程当事件循环器有什么作用呢?。我目前的理解是,事件循环器(EventLoop)其实就是一个IO线程,首先使用单个线程来实现,简单高效,没有线程切换开销,多线程访问等问题;并且Netty将一个通道的读、写等操作都绑定到一个相同的事件循环器,这样有利于状态的保存,比如说可以比较方便的在Handler(Handler在IO线程中执行)使用线程本地变量ThreadLocal、同时减少线程切换。而使用一个线程池,而不是一个简单的线程,主要是为了提高程序的健壮性,如果单一线程由于异常,导致该线程消亡后,线程池会另起一个新的线程继续提供服务。[但是,从后面的分析看,NioEventLoop中的线程也是轮流执行的。]

接下来,将从SingleThreadEventExecutor,整个线程模型的执行者开始相信接口执行器内部运作逻辑。

4、SingleThreadEventExecutor源码分析

SingleThreadEventExecutor是NioEventLoopGroup的具体执行器,也就是NioEventLoopGroup中运行的线程,其实就是SingleThreadEventExecutor。本文将从重点属性、构造方法、核心方法三方面剖析该类的实现。

4.1 重要属性

    private static final int ST_NOT_STARTED = 1;        //状态,,,未启动,未启动接受任务
        private static final int ST_STARTED = 2;                 //   已启动,运行
        private static final int ST_SHUTTING_DOWN = 3; //关闭中(平滑关闭)
        private static final int ST_SHUTDOWN = 4;       //已关闭
        private static final int ST_TERMINATED = 5;    // 终止

        private final Queue<Runnable> taskQueue;   // 该线程模型执行器 任务队列,子类可定制自己的任务队列
        @SuppressWarnings({ "FieldMayBeFinal", "unused" })
        private volatile Thread thread;                         // 当前执行器运行的线程。
        private final Executor executor;                       // 具体的线程池,此处为 Netty实现的ForkJoinPool。
        private final Semaphore threadLock = new Semaphore(0);
        private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
        private final boolean addTaskWakesUp;      //如果设置为true, 当且仅当 添加              一个任务时,才唤醒选择器,比如是否唤醒 select() 函数。
        @SuppressWarnings({ "FieldMayBeFinal", "unused" })
        private volatile int state = ST_NOT_STARTED;     // 当前的状态

4.2 构造方法解读

    /**
         * @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it.
         * @param executor the {@link Executor} which will be used for executing.
         * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up
         * the executor thread.
         */
        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean                                                      addTaskWakesUp) {
            super(parent);
            if (executor == null) {
                throw new NullPointerException("executor");
            }
            this.addTaskWakesUp = addTaskWakesUp;
            this.executor = executor;
            taskQueue = newTaskQueue();
        }
        /**
         * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
         * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
         * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
         * implementation that does not support blocking operations at all.
         */
        protected Queue<Runnable> newTaskQueue() {
            return new LinkedBlockingQueue<Runnable>();
        }

子类可以通过重写newTaskQueue方法,重写底层的任务队列。

4.3 核心方法

jdk并发包中线程池的实现,比如submit等方法,都是先包装成相关Task,然后调用execute方法。SingleThreadEventExecutor类,最终父类就是Executor,所以,我们从executor方法开始研究。

4.3.1 executor 方法

    @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            boolean inEventLoop = inEventLoop();  //@1
            if (inEventLoop) {
                addTask(task);                                     //@2
            } else {
                startExecution();                                 //@3
                addTask(task);                                   //@4
                if (isShutdown() && removeTask(task)) {    //@5
                    reject();                                                       //@6
                }
            }
            if (!addTaskWakesUp && wakesUpForTask(task)) {    //@7
                wakeup(inEventLoop);                                               //@8
            }
        }

代码@1,是否在事件循环中,具体实现如下:

    @Override

    public boolean inEventLoop(Thread thread) {

        return thread == this.thread;

    }

怎么解释呢?提交任务的操作,比如需要提交一个读任务,或写任务,如果调用的API的线程是当前的EventLoop,则直接加入到任务队列中等待执行,如果是其他线程调用的,则启动该EventLoop线程进行调度,然后放入到任务队列中。举个简单的例子,在IO线程中(解码出请求信息后),将任务放入到业务队列中去处理的时候,如果调用channel.write方法,此时要注意的是,真实的writer方法的调用,不会在业务线程中调用,因为次数该业务线程并不是EventLoop的执行线程,只会将任务放入到队列,然后业务线程中直接返回(理解这里的异步操作)。

代码@2:如果是EventLoop执行的任务,直接加入任务队列

代码@3:如果是其他线程,则启动调度,稍后进行详细的代码分析。

代码@5,代码@6:如果停止执行,则拒绝服务。

代码@6,7:唤醒选择器,感觉这里addTaskWakesUp 这个变量有点问题,不过具体的唤醒选择器逻辑是对于事件模型来说比较重要,在相关的子类中都有重写。故这部分在研究NioEventLoop时再重点关注。

接下来重点研究步骤3,startExecution方法的实现

4.3.2 startExecution方法详解

    private void startExecution() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // @1
                    schedule(new ScheduledFutureTask<Void>(
                            this, Executors.<Void>callable(new PurgeTask(), null),
                            ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));   // @2
                    scheduleExecution();   //@3
                }
            }
        }

    protected final void scheduleExecution() {
            updateThread(null);
            executor.execute(asRunnable); // @4
        }

    private void updateThread(Thread t) { 
        THREAD_UPDATER.lazySet(this, t);     //@5
    }

    private final Runnable asRunnable = new Runnable() {  //@6
            @Override
            public void run() {
                updateThread(Thread.currentThread());
                // lastExecutionTime must be set on the first run
                // in order for shutdown to work correctly for the
                // rare case that the eventloop did not execute
                // a single task during its lifetime.
                if (firstRun) {
                    firstRun = false;
                    updateLastExecutionTime();
                }
                try {
                    SingleThreadEventExecutor.this.run();
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                    cleanupAndTerminate(false);
                }
            }
        };

代码@1:该startExecution方法支持多次调用,因为对状态进行了CAS检测并设置,如果启动状态为未启动,并设置为启动中成功,则继续下文的启动流程。

代码@2:新建一个任务,该任务中取消任务队列中的任务,并移除。具体代码下文会重点分析。

代码@3:启动调度器,此处先将当前运行的线程设置为空,代码@5,可以使用lazySet来更新的原因是,对其可见性要求没那么高,因为在添加一个任务的时候,就算检测到当前线程不是EventLoop线程,也就是asRunnable线程,也没关系,会先调用startExecution方法,等其asRunable运行,然后再放入队列中。

代码@4:核心所在呀,这里让SingleThreadEventExecutor真正的名副其实是单个线程。尽管每个EventLoop的执行器是一个并发度为nEventCount的ForkJoinPool线程池。

代码@6:在ForkJoinPool中执行的任务,就是asRunnable中run方法的逻辑,而该方法里面首先先设置当前线程,然后执行SingleThreadEvent的run方法,run方法的具体实现由子类实现。

4.3.3 关于代码@2,关于schedule方法详解

    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {
                scheduledTaskQueue().add(task);
            } else {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        scheduledTaskQueue().add(task);
                    }
                });
            }

            return task;
        }

整个调度任务的执行过程如下:先将任务封装成ScheduledFutureTask,然后如果当前线程是当前执行器线程,则直接加入到优先级队列中,如果不是,则调用execute方法,由执行器线程加入到调度任务的优先级队列中。这里是Netty线程模型的核心所在,通道的相关IO操作等最终都有由IO线程放入到队列并执行之。避免了多线程的竞争。

5、源码分析NioEventLoop

5.1 NioEventLoop重要属性详解

    private static final int CLEANUP_INTERVAL = 256;     //取消键的个数超过改造,将cancelKeys清空为0,并执行一次重新选择
    private static final boolean DISABLE_KEYSET_OPTIMIZATION =
                SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); // 是否启用选择键优化(SelectionKeys),默认为true

    private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;   // 众所周知,jdk在1.7之前的Selector的select方法会出现空轮询,导致CPU资源紧张,解决
                                                                      //空轮询最小的空轮训次数,如果SELECTOR_AUTO_REBUILD_THRESHOLD小于该值,则不触发Selector的重建工作。
     private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;  //如果出现轮询(select),连续多少次未返回准备的键,则触发Selector重建。默认为512
    /**
         * The NIO {@link Selector}.
         */
        Selector selector;                // NIO Selector
        private SelectedSelectionKeySet selectedKeys;      // 可选择的键,netty对原生Selector的select()方法返回的键的一个优化集合

        private final SelectorProvider provider;

        /**
         * Boolean that controls determines if a blocked Selector.select should
         * break out of its selection process. In our case we use a timeout for
         * the select method and the select method will block for that time unless
         * waken up.
         */
        private final AtomicBoolean wakenUp = new AtomicBoolean();    //是否需要执行 selector的wakenup()方法

        private volatile int ioRatio = 50;   // ioRatio执行比例
        private int cancelledKeys;            //取消键的个数
        private boolean needsToSelectAgain; //是否需要重新select

5.2 核心入口方法 run

继上文的分析,一个任务提交到EventExecutor,首先会先确认是否开始执行(startExecution),在启动调度之前,会运行具体的线程调度处理逻辑run方法里的逻辑。NioEventLoop的间接父类为SingleThreadEventExecutor。

    @Override
        protected void run() {
            boolean oldWakenUp = wakenUp.getAndSet(false);      //@1
            try {
                if (hasTasks()) {      // @2 
                    selectNow();
                } else {
                    select(oldWakenUp);    //@3 

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {         // @4
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {          // @51
                    processSelectedKeys();  //@52
                    runAllTasks();                //53
                } else { //@61
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();//@62

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);    //@63
                }

                if (isShuttingDown()) {   //@7
                    closeAll();
                    if (confirmShutdown()) {
                        cleanupAndTerminate(true);
                        return;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // TODO: After using a ForkJoinPool that is potentially shared with other software
                // than Netty. The Thread.sleep might be problematic. Even though this is unlikely to ever
                // happen anyways.

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }

            scheduleExecution();     //@7
        }

代码@1:获取当前的wakeup状态并充值为false

代码@2:如果任务队列中有任务,则执行一次快速selectNow操作,该方法不阻塞。

代码@3:如果任务队列中,没有任务,则执行select方法。select方法,不会阻塞,因为调用的是selectNow或select(超时时间)

代码@4:如果需要weakup,则调用selector的weakup()方法。

代码@5,6:是处理具体的任务相关逻辑。相关方法在后文详细讲解。(根据ioRatio的值不同,处理的逻辑不同)。

代码@6:关闭流程。

代码@7:这里重新向EventExecutor提交任务,再次开始执行select方法。

20200109100113\_3.png

这里的设计,还得琢磨一下,为什么不直接在一个线程中发送执行。

先重点研究一下代码@3处select(wakeup)方法

5.2.1 select(wakeup)方法详解

    private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);   // @1
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }

                    int selectedKeys = selector.select(timeoutMillis);   // @2
                    selectCnt ++;

                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {   // @3
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }

                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {   // @4
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {      //@5
                        // The selector returned prematurely many times in a row.
                        // Rebuild the selector to work around the problem.
                        logger.warn(
                                "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                                selectCnt);

                        rebuildSelector();                                                                     //@6
                        selector = this.selector;

                        // Select again to populate selectedKeys.
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }

                    currentTimeNanos = time;
                }

                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
                }
                // Harmless exception - log anyway
            }
        }

代码@1:计算select方法应该传入的超时时间,方法主要是从优先级队列(调度队列)中,取第一个节点,计算该任务在多久后应该被调度。

    protected long delayNanos(long currentTimeNanos) {

            ScheduledFutureTask<?> scheduledTask = peekScheduledTask();

            if (scheduledTask == null) {

                return SCHEDULE_PURGE_INTERVAL;

            }

            return scheduledTask.delayNanos(currentTimeNanos);

        }

代码@2:执行带超时时间的select方法。

代码@3:如果本次有选择出感兴趣的键、或有调度任务处理,则跳出,去执行相关的操作。

代码@4:此处的判断主要是判断是否是空轮询,由于select是带超时时间的,如果没有超过其超时时间,就返回并且没有选择到键,则认为发生了空轮训,然后执行@5的逻辑,如果连续发生空轮询,超过SELECTOR_AUTO_REBUILD_THRESHOLD的值(默认512)次的话,执行重建Selector操作,也就是@6 rebuildSelector()方法执行逻辑。

5.2.2 rebuildSelector 方法详解

    /**
         * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
         * around the infamous epoll 100% CPU bug.
         */
        public void rebuildSelector() {
            if (!inEventLoop()) {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        rebuildSelector();
                    }
                });
                return;
            }

            final Selector oldSelector = selector;
            final Selector newSelector;

            if (oldSelector == null) {
                return;
            }

            try {
                newSelector = openSelector();
            } catch (Exception e) {
                logger.warn("Failed to create a new Selector.", e);
                return;
            }

            // Register all channels to the new Selector.
            int nChannels = 0;
            for (;;) {
                try {
                    for (SelectionKey key: oldSelector.keys()) {
                        Object a = key.attachment();
                        try {
                            if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                                continue;
                            }

                            int interestOps = key.interestOps();
                            key.cancel();
                            SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                            if (a instanceof AbstractNioChannel) {
                                // Update SelectionKey
                                ((AbstractNioChannel) a).selectionKey = newKey;
                            }
                            nChannels ++;
                        } catch (Exception e) {
                            logger.warn("Failed to re-register a Channel to the new Selector.", e);
                            if (a instanceof AbstractNioChannel) {
                                AbstractNioChannel ch = (AbstractNioChannel) a;
                                ch.unsafe().close(ch.unsafe().voidPromise());
                            } else {
                                @SuppressWarnings("unchecked")
                                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                                invokeChannelUnregistered(task, key, e);
                            }
                        }
                    }
                } catch (ConcurrentModificationException e) {
                    // Probably due to concurrent modification of the key set.
                    continue;
                }

                break;
            }

            selector = newSelector;

            try {
                // time to close the old selector as everything else is registered to the new one
                oldSelector.close();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                }
            }

            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }

该方法的实现思路如下:

首先,调用者必须是EventExecutor的当前线程,然后就是新建一个Selector,然后将原来注册在Selector的通道,事件重新注册到新的Selector( selector.keys()),并取消在原Selector上的事件(取消操作非常重要,因为如果不取消,Selector关闭后,注册在Selector上的通道都将关闭)然后关闭旧的Selector以释放相关资源。上述代码就不一一详细解读了。

5.2.3 关于run方法 @52,@62 processSelectedKeys()方法详解

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            // check if the set is empty and if so just return to not create garbage by
            // creating a new Iterator every time even if there is nothing to process.
            // See https://github.com/netty/netty/issues/597
            if (selectedKeys.isEmpty()) {
                return;
            }

            Iterator<SelectionKey> i = selectedKeys.iterator();  
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();

                if (a instanceof AbstractNioChannel) {       //@1
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {                //@2
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }

                if (!i.hasNext()) {
                    break;
                }

                if (needsToSelectAgain) {     //@3
                    selectAgain();
                    selectedKeys = selector.selectedKeys();

                    // Create the iterator again to avoid ConcurrentModificationException
                    if (selectedKeys.isEmpty()) {
                        break;
                    } else {
                        i = selectedKeys.iterator();
                    }
                }
            }
        }

该方法,主要就是遍历选择键,真正对键的处理在代码@1,代码@2中。主要看一下代码@1的处理逻辑吧。

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {                                                                             //@1
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }

            try {
                int readyOps = k.readyOps();   //@2
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {     //@3
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {   //@4
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {   //@5
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);

                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }

代码@1:如果key不可用,直接将通道关闭。

代码@2:获取该key已准备就绪的操作事件。

代码@3:判断是否有读事件就绪,如果就绪,则直接调用该Channel的unsafe对象的read操作。如果通道已经关闭,则返回。

代码@4:如果有写事件就绪,调用通道 ch.unsafe().forceFlush()方法,强制进行写操作。

代码@5:如果是连接操作,则将该键的感兴趣的连接事件取消,然后调用finishConnect()实现。

5.2.4 关于run方法 @53,@63 runAllTask详解

    /**
         * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
         *
         * @return {@code true} if and only if at least one task was run
         */
        protected boolean runAllTasks() {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                return false;
            }

            for (;;) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("A task raised an exception.", t);
                }

                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    return true;
                }
            }
        }

        /**
         * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
         * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
         */
        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                return false;
            }

            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("A task raised an exception.", t);
                }

                runTasks ++;

                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }

                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }

            this.lastExecutionTime = lastExecutionTime;
            return true;
        }

    private void fetchFromScheduledTaskQueue() {
            if (hasScheduledTasks()) {
                long nanoTime = AbstractScheduledEventExecutor.nanoTime();
                for (;;) {
                    Runnable scheduledTask = pollScheduledTask(nanoTime);
                    if (scheduledTask == null) {
                        break;
                    }
                    taskQueue.add(scheduledTask);
                }
            }
        }

上述代码就是运行积压在队列中的任务,在获取任务时没有使用队列的阻塞方法,故这些方法最终不会阻塞。

从这里可以更加具体说一下ioRatio这个参数的含义。io执行时间比例,取值为1到100,Netty把处理Selector.selecteKeys的时间,也就是processSelectedKey()方法执行的时间做执行时间的基数,标记为t1时间,从上文的processSelectedKey讲解,其实可以得出t1的时间就绪事件的网络读写时间。另一部分时间是运行任务队列中的时间。如果ioRatio为100的话,表示processSelectedKey,runAllTask时间依次执行,但如果ioRatio设置小于100的话,runAllTask的运行时间将减少(权重),网络事件(select)就绪事件更加容易得到执行,那我们不妨思考一下任务队列中的task是从如何被加入的?一般也是一些读写事件,但不是由IO线程(EventLoop触发的),而是在业务线程中调用网络读写相关API,此时会先加入队列,然后再被调度执行。但这两部分,其实都是IO时间,所以对于ioRatio这个参数,我认为是 运行任务队列的权重更为直观。

总结:本文深入细致的分析了Netty NIO的事件模型,从事件模型的初始化(构造的全过程)、核心属性、构造方法、核心入口方法等方面细致分析了NioEventLoopGroup、SingleThreadEventExecutor的原理。

Netty事件模型:核心思路基于主从Reactor模型,一个NioEventLoopGroup包含n个NioEventLoop,每一个NioEventLoop持有一个Selector和一个线程池(执行器EventExecute,其实是netty ForkJoinPool,并发度为n),在选择器的NioEventLoop的run方法每次运行后,就会交给NioEventLoop中线程池的另外一个线程,这里的设计,其实我不太明白为什么要这样,一直在一个线程中执行我个人觉得更好。

问题:

不知道netty为什么要这样设计,,在NioEventLoop中,内部会持有一个EventExecutor(ForkjoinPool,并发度为nEventLoops),我开始以为,一个NioEventLoop,只会用到ForkJoinPool中的一个线程,除非那个线程异常退出了后,才会用一个新的线程来提供服务,这样保证健壮性),,但我仔细看源码时,我发现,每一次select后,会重新进行一次scheduleExecution方法调用,这样会使用ForkJoinPool中的另外一个线程,,这样的设计,不利于ThreadLocal的使用,特别是线程本地内存分配,比如同一个通道的读操作,从线程本地变量中分配一个ByteBuf,然后写操作又会用ForkJoinPool的第二个线程,这样的ByteBuf又不会重复使用,为什么会这样设计呢?为什么在EventExecutor的线程池(ForkJoinPool),只使用一个线程,只有当这个线程奔溃后,再切换另一个线程进行处理(线程池自动处理)。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 » Netty(十四):再谈线程模型之源码分析NioEventLoopGroup、SingleThreadEventExecutor

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏