Java 技术栈中间件优雅停机方案设计与实现全景图(下)

5个月前 (12-08) 0 点赞 0 收藏 0 评论 9 已阅读

我们接着上篇文章Java 技术栈中间件优雅停机方案设计与实现全景图(上)继续 Netty 优雅关闭的相关内容~~~

本文概要

6. Netty 的优雅关闭

通过上小节介绍 dubbo 优雅关闭的相关内容,我们很自然的引出了 Netty 的优雅关闭触发时机,那么在本小节中笔者将为大家详细介绍下 Netty 是如何优雅地装..........优雅地谢幕的~~

image.png

在之前的系列文章中,我们围绕下图所展示的 Netty 整个核心框架的运转流程介绍了主从 ReactorGroup 的创建,启动,运行,接收网络连接,接收网络数据,发送网络数据,以及如何在pipeline中处理相关IO事件的整个源码实现。

netty中的reactor.png

本小节就到了 Netty 优雅谢幕的时刻了,在这谢幕的过程中,Netty 会对它的主从 ReactorGroup ,以及对应 ReactorGroup 中的 Reacto r进行优雅的关闭。下面让我们一起来看下这个优雅关闭的过程~~~

6.1 ReactorGroup 的优雅谢幕


public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

   @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

}

在 Netty 进行优雅关闭的整个过程中,这里涉及到了两个非常重要的控制参数:

gracefulShutdownQuietPeriod:优雅关闭静默期,默认为 2s 。这个参数主要来保证 Netty 整个关闭过程中的优雅。在关闭流程开始后,如果 Reactor 中还有遗留的异步任务需要执行,那么 Netty 就不能关闭,需要把所有异步任务执行完毕才可以。当所有异步任务执行完毕后,Netty 为了实现更加优雅的关闭操作,一定要保障业务无损,这时候就引入了静默期这个概念,如果在这个静默期内,用户没有新的任务向 Reactor 提交那么就开始关闭。如果在这个静默期内,还有用户继续提交异步任务,那么就不能关闭,需要把静默期内用户提交的异步任务执行完毕才可以放心关闭。

gracefulShutdownTimeout:优雅关闭超时时间,默认为 15s 。这个参数主要来保证 Netty 整个关闭过程的可控。我们知道一个生产级的优雅关闭方案既要保证优雅做到业务无损,更重要的是要保证关闭流程的可控,不能无限制的优雅下去。导致长时间无法完成关闭动作。于是 Netty 就引入了这个参数,如果优雅关闭超时,那么无论此时有无异步任务需要执行都要开始关闭了。

这两个控制参数是非常重要核心的两个参数,我们在后面介绍 Netty 关闭细节的时候还会为大家详细剖析,这里大家先从概念上大概理解一下。

在介绍完这两个重要核心参数之后,我们接下来看下 ReactorGroup 的关闭流程:

我们都知道 Netty 为了保证整个系统的吞吐量以及保证 Reactor 可以线程安全地,有序地处理各个 Channel 上的 IO 事件。基于这个目的 Netty 将其承载的海量连接分摊打散到不同的 Reactor 上处理。

ReactorGroup 中包含多个 Reactor ,每个 Channel 只能注册到一个固定的 Reactor 上,由这个固定的 Reactor 负责处理该 Channel 上整个生命周期的事件。

一个 Reactor 上注册了多个 Channel ,负责处理注册在其上的所有 Channel 的 IO 事件以及异步任务。

ReactorGroup 的结构如下图所示:

image.png

ReactorGroup 的关闭流程本质上其实是 ReactorGroup 中包含的所有 Reactor 的关闭,当 ReactorGroup 中的所有 Reactor 完成关闭后,ReactorGroup 才算是真正的关闭。


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    // Reactor线程组中的Reactor集合
    private final EventExecutor[] children;

    // 关闭future
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }

    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

}

EventExecutor[] children:数组中存放的是当前 ReactorGroup 中包含的所有 Reactor,类型为 EventExecutor。

Promise<?> terminationFuture:ReactorGroup 中的关闭 Future ,用户线程通过这个 terminationFuture 可以知道 ReactorGroup 完成关闭的时机,也可以向 terminationFuture 注册一些 listener 。当 ReactorGroup 完成关闭动作后,会回调用户注册的这些 listener 。大家可以根据各自的业务场景灵活运用。

在 ReactorGroup 的关闭过程中,会挨个触发它所包含的所有 Reactor 的关闭流程。并返回 terminationFuture 给用户线程。

当 ReactorGroup 中的所有 Reactor 完成关闭之后,这个 terminationFuture 会被设置为 success,这样一来用户线程可以感知到 ReactorGroup 已经完成关闭了。

这一点笔者也在《Reactor在Netty中的实现(创建篇)》一文中的第四小节《4. 向Reactor线程组中所有的Reactor注册terminated回调函数》强调过。

在 ReactorGroup 创建的最后一步,会定义 Reactor 关闭的 terminationListener。在 Reactor 的 terminationListener 中会判断当前 ReactorGroup 中的 Reactor 是否全部关闭,如果已经全部关闭,则会设置 ReactorGroup的 terminationFuture 为 success 。

    //记录关闭的Reactor个数,当Reactor全部关闭后,ReactorGroup才可以认为关闭成功
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    //ReactorGroup的关闭future
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {

        ........挨个创建Reactor............

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    //当所有Reactor关闭后 ReactorGroup才认为是关闭成功
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            //向每个Reactor注册terminationListener
            e.terminationFuture().addListener(terminationListener);
        }
    }

从以上 ReactorGroup 的关闭流程我们可以看出,ReactorGroup 的关闭逻辑只是挨个去触发它所包含的所有 Reactor 的关闭,Netty 的整个优雅关闭核心其实是在单个 Reactor 的关闭逻辑上。毕竟 Reactor 才是真正驱动 Netty 运转的核心引擎。

6.2 Reactor 的优雅谢幕

Reactor的优雅谢幕流程.png

Reactor 的状态特别重要,从《一文聊透Netty核心引擎Reactor的运转架构》一文中我们知道 Reactor 是在一个 for (;;) {....} 死循环中 996 不停地工作。比如轮询 Channel 上的 IO 就绪事件,处理 IO 就绪事件,执行异步任务就是在这个死循环中完成的。

而 Reactor 在每一次循环任务结束之后,都会先去判断一下当前 Reactor 的状态,如果状态变为准备关闭状态 ST_SHUTTING_DOWN 后,Reactor 就会开启优雅关闭流程。

所以在介绍 Reactor 的关闭流程之前,笔者先来为大家捋一捋 Reactor 中的各种状态。

ST_NOT_STARTED = 1:Reactor 的初始状态。在 Reactor 刚被创建出来的时候,状态为 ST_NOT_STARTED 。

ST_STARTED = 2:Reactor 的启动状态。当向 Reactor 提交第一个异步任务的时候会触发 Reactor 的启动。启动之后状态变为 ST_STARTED 。

相关细节可在回顾下《详细图解Netty Reactor启动全流程》一文。

ST_SHUTTING_DOWN = 3:Reactor 准备开始关闭状态。当 Reactor 的 shutdownGracefully 方法被调用的时候,Reactor 的状态就会变为ST_SHUTTING_DOWN。在这个状态下,用户仍然可以向 Reactor 提交任务。

ST_SHUTDOWN = 4:Reactor 停止状态。表示 Reactor 的优雅关闭流程已经结束,此时用户不能在向 Reactor 提交任务,Reactor 会在这个状态下最后一次执行剩余的异步任务。

ST_TERMINATED = 5:Reactor 真正的终结状态,该状态表示 Reactor 已经完全关闭了。在这个状态下 Reactor 会设置自己的 terminationFuture 为 Success。进而开始回调上小节末尾提到的 terminationListener 。

在我们了解了 Reactor 的各种状态之后,下面就该来正式开始介绍 Reactor 的关闭流程了:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    //Reactor的状态  初始为未启动状态
    private volatile int state = ST_NOT_STARTED;
 
    //Reactor的初始状态,未启动
    private static final int ST_NOT_STARTED = 1;
    //Reactor启动后的状态
    private static final int ST_STARTED = 2;
    //准备正在进行优雅关闭,此时用户仍然可以提交任务,Reactor仍可以执行任务
    private static final int ST_SHUTTING_DOWN = 3;
    //Reactor停止状态,表示优雅关闭结束,此时用户不能在提交任务,Reactor最后一次执行剩余的任务
    private static final int ST_SHUTDOWN = 4;
    //Reactor中的任务已被全部执行完毕,且不在接受新的任务,真正的终止状态
    private static final int ST_TERMINATED = 5;

    //优雅关闭的静默期
    private volatile long gracefulShutdownQuietPeriod;
    //优雅关闭超时时间
    private volatile long gracefulShutdownTimeout;

    //Reactor的关闭Future
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {

        ......省略参数校验.......

        //此时Reactor的状态为ST_STARTED
        if (isShuttingDown()) {
            return terminationFuture();
        }

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            //需要唤醒Reactor去执行关闭流程
            wakeup = true;
            oldState = state;
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        //Reactor正在关闭或者已经关闭
                        newState = oldState;
                        wakeup = false;
                }
            }
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }
        }
        //优雅关闭静默期,在该时间内,用户还是可以向Reactor提交任务并且执行,只要有任务在Reactor中,就不能进行关闭
        //每隔100ms检测是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭 保证关闭行为的优雅
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        //优雅关闭的最大超时时间,优雅关闭行为不能超过该时间,如果超过的话 不管当前是否还有任务 都要进行关闭
        //保证关闭行为的可控
        gracefulShutdownTimeout = unit.toNanos(timeout);

        //这里需要保证Reactor线程是在运行状态,如果已经停止,那么就不在进行后续关闭行为,直接返回terminationFuture
        if (ensureThreadStarted(oldState)) {
            return terminationFuture;
        }

        //将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程
        if (wakeup) {
            //确保Reactor线程在执行完任务之后 不会在selector上停留
            taskQueue.offer(WAKEUP_TASK);
            if (!addTaskWakesUp) {
                //如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒
                wakeup(inEventLoop);
            }
        }

        return terminationFuture();
    }

    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

}

首先在开启关闭流程之前,需要调用 isShuttingDown() 判断一下当前 Reactor 是否已经开始关闭流程或者已经完成关闭。如果已经开始关闭了,这里会直接返回 Reactor 的 terminationFuture 。


    @Override
    public boolean isShuttingDown() {
        return state >= ST_SHUTTING_DOWN;
    }

剩下的逻辑就是不停的在一个 for 循环中通过 CAS 不停的尝试将 Reactor 的当前 ST_STARTED 状态改为 ST_SHUTTING_DOWN 正在关闭状态。

如果通过 inEventLoop() 判断出当前执行线程是 Reactor 线程,那么表示当前 Reactor 的状态只会是 ST_STARTED 运行状态,那么就可以直接将 newState 设置为 ST_SHUTTING_DOWN 。因为只有 Reactor 处于 ST_STARTED 状态的时候才会运行到这里。否则在前边就直接返回 terminationFuture了。

如果当前执行线程为用户线程并不是 Reactor 线程的话,那么此时 Reactor 的状态可能是正在关闭状态或者已经关闭状态,用户线程在重复发起 Reactor 的关闭流程。所以这些异常场景的处理会在 switch(oldState){....} 语句中完成。

            switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        //Reactor正在关闭或者已经关闭
                        newState = oldState;
                        //当前Reactor已经处于关闭流程中,则无需在唤醒Reactor了
                        wakeup = false;
                }

如果当前 Reactor 还未发起关闭流程,比如状态为 ST_NOT_STARTED 或者 ST_STARTED ,那么直接可以放心的将 newState 设置为 ST_SHUTTING_DOWN 。

如果当前 Reactor 已经处于关闭流程中或者已经完成关闭,比如状态为 ST_SHUTTING_DOWN ,ST_SHUTDOWN 或者 ST_TERMINATED 。则没有必要在唤醒 Reactor 重复执行关闭流程了 wakeup = false。Reactor 的状态维持当前状态不变。

当 Reactor 的状态确定完毕后,则在 for 循环中不断的通过 CAS 修改 Reactor 的当前状态。此时 oldState = ST_STARTED ,newState = ST_SHUTTING_DOWN 。


          if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }

随后在 Reactor 中设置我们在《6.1 ReactorGroup 的优雅谢幕》小节开始处介绍的控制 Netty 优雅关闭的两个非常重要的核心参数:

gracefulShutdownQuietPeriod:优雅关闭静默期,默认为 2s 。当 Reactor 中已经没有异步任务需要在执行时,该静默期开始触发,Netty 在这里会每隔 100ms 检测一下是否有任务提交进来,如果在静默期内没有新的任务提交,那么才会进行关闭,保证关闭行为的优雅。

gracefulShutdownTimeout:优雅关闭超时时间,默认为 15s 。优雅关闭行为不能超过该时间,如果超过的话不管当前是否还有任务都要进行关闭,保证关闭行为的可控。

流程走到这里,Reactor 就开始准备执行关闭流程了,那么在进行关闭操作之前,我们需要确保 Reactor 线程此时应该是运行状态,如果此时 Reactor 线程还未开始运行那么就需要让它运行起来执行关闭操作。


        //这里需要保证Reactor线程是在运行状态,如果已经停止,
        //那么就不在进行后续关闭行为,直接返回terminationFuture
        if (ensureThreadStarted(oldState)) {
            return terminationFuture;
        }


    private boolean ensureThreadStarted(int oldState) {
        if (oldState == ST_NOT_STARTED) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_TERMINATED);
                terminationFuture.tryFailure(cause);

                if (!(cause instanceof Exception)) {
                    // Also rethrow as it may be an OOME for example
                    PlatformDependent.throwException(cause);
                }
                return true;
            }
        }
        return false;
    }

如果此时 Reactor 线程刚刚执行完异步任务或者正在 Selector 上阻塞,那么我们需要确保 Reactor 线程被及时的唤醒,从而可以直接进入关闭流程。wakeup == true。

这里的 addTaskWakesUp 默认为 false 。表示并不是只有 addTask 方法才能唤醒 Reactor 线程 还有其他方法可以唤醒 Reactor 线程,比如 SingleThreadEventExecutor#execute 方法还有本小节介绍的 SingleThreadEventExecutor#shutdownGracefully 方法都会唤醒 Reactor 线程。

关于 addTaskWakesUp 字段的详细含义和作用,大家可以回顾下《一文聊透 Netty 核心引擎 Reactor 的运转架构》一文中的《1.2.2 Reactor 开始轮询 IO 就绪事件》小节。


     //将正在监听IO事件的Reactor从Selector上唤醒,表示要关闭了,开始执行关闭流程
        if (wakeup) {
            //确保Reactor线程在执行完任务之后 不会在selector上停留
            taskQueue.offer(WAKEUP_TASK);
            if (!addTaskWakesUp) {
                //如果此时Reactor正在Selector上阻塞,则可以确保Reactor被及时唤醒
                wakeup(inEventLoop);
            }
        }

通过 taskQueue.offer(WAKEUP_TASK) 向 Reactor 中添加 WAKEUP_TASK,可以确保 Reactor 在执行完异步任务之后不会在 Selector 上做停留,直接执行关闭操作。

如果此时 Reactor 线程正在 Selector 上阻塞,那么直接调用 wakeup(inEventLoop) 唤醒 Reactor 线程,直接来到关闭流程。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            selector.wakeup();
        }
    }
}

6.3 Reactor 线程的优雅关闭

我们先来通过一张 Reactor 优雅关闭整体流程图来从总体上俯撼一下关闭流程:

Reactor线程优雅关闭流程.png

通过《一文聊透Netty核心引擎Reactor的运转架构》一文的介绍,我们知道 Reacto r是在一个 for 循环中 996 不停地处理 IO 事件以及执行异步任务。如下面笔者提取的 Reactor 运行框架所示:

public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() {
        for (;;) {
            try {
                  .......1.监听Channel上的IO事件.......
                  .......2.处理Channel上的IO事件.......
                  .......3.执行异步任务..........
            } finally {
                try {
                    if (isShuttingDown()) {
                        //关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件
                        closeAll();
                        //注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw (Error) e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

}

在 Reactor 在每次 for 循环的末尾 finally{....} 语句块中都会通过 isShuttingDown() 方法去检查当前 Reactor 的状态是否是关闭状态,如果是关闭状态则开始正式进入 Reactor 的优雅关闭流程。

我们在本文前边《1.2 优雅关闭》小节中在讨论优雅关闭方案的时候提到,我们要着重从以下两个方面来实施优雅关闭:

首先需要切走程序承担的现有流量。

保证现有剩余的任务可以执行完毕,保证业务无损。

Netty 这里实现的优雅关闭同样也遵从这两个要点。

在优雅关闭流程开始之前首先会调用 closeAll() 方法,将 Reactor 上注册的所有 Channel 全部关闭掉,切掉现有流量。

随后会调用 confirmShutdown() 方法,将剩余的异步任务执行完毕。在该方法中只要有异步任务需要执行,就不能关闭,保证业务无损。该方法返回值为 true 时表示可以进行关闭。返回 false 时表示不能马上关闭。

6.3.1 切走流量

    private void closeAll() {
        //这里的目的是清理selector中的一些无效key
        selectAgain();
        //获取Selector上注册的所有Channel
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            //获取NioSocketChannel
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                .........省略......
            }
        }

        for (AbstractNioChannel ch: channels) {
            //关闭Reactor上注册的所有Channel,并在pipeline中触发unActive事件和unRegister事件
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }

首先会通过 selectAgain() 最后一次在 Selector 上执行一次非阻塞轮询操作,目的是清除 Selector 上的一些无效 Key 。

关于无效 Key 的清除,详细细节大家可以回看下《一文聊透Netty核心引擎Reactor的运转架构》一文中的《3.1.3 从Selector中移除失效的SelectionKey》小节。

随后通过 selector.keys() 获取在 Selector 上注册的所有 SelectionKey 。进而获取到 Netty 中的 NioSocketChannel 。SelectionKey 与 NioSocketChannel 的对应关系如下图所示:

channel与SelectionKey对应关系.png

最后将注册在 Reactor 上的这些 NioSocketChannel 挨个进行关闭。

Channel 的关闭流程可以回看下笔者的这篇文章 《且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景》

6.3.2 保证业务无损

该方法中的逻辑是保证 Reactor 进行优雅关闭的核心,Netty 这里为了保证业务无损,采取的是只要有异步任务 Task 或者 ShutdwonHooks 需要执行,就不能关闭,需要等待所有 tasks 或者 ShutdownHooks 执行完毕,才会考虑关闭的事情。

    protected boolean confirmShutdown() {
        if (!isShuttingDown()) {
            return false;
        }

        if (!inEventLoop()) {
            throw new IllegalStateException("must be invoked from an event loop");
        }

        //取消掉所有的定时任务
        cancelScheduledTasks();

        if (gracefulShutdownStartTime == 0) {
            //获取优雅关闭开始时间,相对时间
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }

        //这里判断只要有task任务需要执行就不能关闭
        if (runAllTasks() || runShutdownHooks()) {
            if (isShutdown()) {
                // Executor shut down - no new tasks anymore.
                return true;
            }

            /**
             * gracefulShutdownQuietPeriod表示在这段时间内,用户还是可以继续提交异步任务的,Reactor在这段时间内
             * 是会保证这些任务被执行到的。
             *
             * gracefulShutdownQuietPeriod = 0 表示 没有这段静默时期,当前Reactor中的任务执行完毕后,无需等待静默期,执行关闭
             * */
            if (gracefulShutdownQuietPeriod == 0) {
                return true;
            }
            //避免Reactor在Selector上阻塞,因为此时已经不会再去处理IO事件了,专心处理关闭流程
            taskQueue.offer(WAKEUP_TASK);
            return false;
        }

        //此时Reactor中已经没有任务可执行了,是时候考虑关闭的事情了
        final long nanoTime = ScheduledFutureTask.nanoTime();

        //当Reactor中所有的任务执行完毕后,判断是否超过gracefulShutdownTimeout
        //如果超过了 则直接关闭
        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
            return true;
        }

        //即使现在没有任务也还是不能进行关闭,需要等待一个静默期,在静默期内如果没有新的任务提交,才会进行关闭
        //如果在静默期内还有任务继续提交,那么静默期将会重新开始计算,进入一轮新的静默期检测
        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
            taskQueue.offer(WAKEUP_TASK);
            try {
                //gracefulShutdownQuietPeriod内每隔100ms检测一下 是否有任务需要执行
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }

            return false;
        }

        // 在整个gracefulShutdownQuietPeriod期间内没有任务需要执行或者静默期结束 则无需等待gracefulShutdownTimeout超时,直接关闭
        return true;
    }

在关闭流程开始之前,Netty 首先会调用 cancelScheduledTasks() 方法将 Reactor 中剩余需要执行的定时任务全部取消掉。

记录优雅关闭开始时间 gracefulShutdownStartTime ,这是为了后续判断优雅关闭流程是否超时。

调用 runAllTasks() 方法将 Reactor 中 TaskQueue 里剩余的异步任务全部取出执行。

运行剩余tasks和hooks.png

调用 runShutdownHooks() 方法将用户注册在 Reactor 上的 ShutdownHook 取出执行。

我们可以在用户线程中通过如下方式向 Reactor 中注册 ShutdownHooks :

        NioEventLoop reactor = (NioEventLoop) ctx.channel().eventLoop();
        reactor.addShutdownHook(new Runnable() {
            @Override
            public void run() {
                .....关闭逻辑....
            }
        });

在 Reactor 进行关闭的时候,会取出用户注册的这些 ShutdownHooks 进行运行。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

   //可以向Reactor添加shutdownHook,当Reactor关闭的时候会被调用
   private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();

   private boolean runShutdownHooks() {
        boolean ran = false;
        while (!shutdownHooks.isEmpty()) {
            List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
            shutdownHooks.clear();
            for (Runnable task: copy) {
                try {
                    //Reactor线程挨个顺序同步执行
                    task.run();
                } catch (Throwable t) {
                    logger.warn("Shutdown hook raised an exception.", t);
                } finally {
                    ran = true;
                }
            }
        }

        if (ran) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }

        return ran;
    }

}

需要注意的是这里的 ShutdownHooks 是 Netty 提供的一种机制并不是我们在《3. JVM 中的 ShutdownHook》小节中介绍的 JVM 中的 ShutdownHooks 。

JVM 中的 ShutdownHooks 是一个 Thread ,JVM 在关闭之前会并发无序地运行。而 Netty 中的 ShutdownHooks 是一个 Runnable ,Reactor 在关闭之前,会由 Reactor 线程同步有序地执行。

这里需要注意的是只要有 tasks 和 hooks 需要执行 Netty 就会一直执行下去直到这些任务全部执行完为止

当 Reactor 没有任何任务需要执行时,这时就会判断当前关闭流程所用时间是否超过了我们前边设定的优雅关闭最大超时时间 gracefulShutdownTimeout 。

nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout

如果关闭流程因为前边这些任务的执行导致已经超时,那么就直接关闭 Reactor ,退出 Reactor 的工作循环。

如果没有超时,那么这时就会触发前边介绍的优雅关闭的静默期 gracefulShutdownQuietPeriod 。

在静默期中 Reactor 线程会每隔 100ms 检查一下是否有用户提交任务请求,如果有的话,就需要保证将用户提交的这些任务执行完毕。然后静默期将会重新开始计算,进入一轮新的静默期检测。

如果在整个静默期内,没有任何任务提交,则无需等待 gracefulShutdownTimeout 超时,直接关闭 Reactor ,退出 Reactor 的工作循环。

从以上过程我们可以看出 Netty 的优雅关闭至少需要等待一个静默期的时间。还有一点是 Netty 优雅关闭的时间可能会超出 gracefulShutdownTimeout ,因为 Netty 需要保证遗留剩余的任务被执行完毕。当所有任务执行完毕之后,才会去检测是否超时。

6.4 Reactor 的最终关闭流程

当在静默期内没有任何任务提交或者关闭流程超时时,上小节中介绍的 confirmShutdown() 就会返回 true 。随即 Reactor 线程就会退出工作循环。

public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() {
        for (;;) {
            try {
                  .......1.监听Channel上的IO事件.......
                  .......2.处理Channel上的IO事件.......
                  .......3.执行异步任务..........
            } finally {
                try {
                    if (isShuttingDown()) {
                        //关闭Reactor上注册的所有Channel,停止处理IO事件,触发unActive以及unRegister事件
                        closeAll();
                        //注销掉所有Channel停止处理IO事件之后,剩下的就需要执行Reactor中剩余的异步任务了
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw (Error) e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

}

我们在《详细图解 Netty Reactor 启动全流程》一文中的《1.3.3 Reactor 线程的启动》小节中的介绍中提到,Reactor 线程的启动是通过第一个异步任务被提交到 Reactor 中的时候被触发的。在向 Reactor 提交任务的方法 SingleThreadEventExecutor#execute(java.lang.Runnable, boolean) 中会触发下面 doStartThread() 方法的调用,在这里会调用前边提到的 Reactor 工作循环 run() 方法。

在 doStartThread() 方法的 finally{...} 语句块中会完成 Reactor 的最终关闭流程,也就是 Reactor 在退出 run 方法中的 for 循环之后的后续收尾流程。

最终 Reactor 的优雅关闭完整流程如下图所示:

Reactor优雅关闭全流程.png

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {

                ..........省略.........

                try {
                    //Reactor线程开始轮询处理IO事件,执行异步任务
                    SingleThreadEventExecutor.this.run();
                    //后面的逻辑为用户调用shutdownGracefully关闭Reactor退出循环 走到这里
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    //走到这里表示在静默期内已经没有用户在向Reactor提交任务了,或者达到优雅关闭超时时间,开始对Reactor进行关闭
                    //如果当前Reactor不是关闭状态则将Reactor的状态设置为ST_SHUTTING_DOWN
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    try {
                        for (;;) {
                            //此时Reactor线程虽然已经退出,而此时Reactor的状态为shuttingdown,但任务队列还在
                            //用户在此时依然可以提交任务,这里是确保用户在最后的这一刻提交的任务可以得到执行。
                            if (confirmShutdown()) {
                                break;
                            }
                        }

                        for (;;) {
                            // 当Reactor的状态被更新为SHUTDOWN后,用户提交的任务将会被拒绝
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }

                        // 这里Reactor的状态已经变为SHUTDOWN了,不会在接受用户提交的新任务了
                        // 但为了防止用户在状态变为SHUTDOWN之前,也就是Reactor在SHUTTINGDOWN的时候 提交了任务
                        // 所以此时Reactor中可能还会有任务,需要将剩余的任务执行完毕
                        confirmShutdown();
                    } finally {
                        try {
                            //SHUTDOWN状态下,在将全部的剩余任务执行完毕后,则将Selector关闭
                            cleanup();
                        } finally {
                            // 清理Reactor线程中的threadLocal缓存,并通知相应future。
                            FastThreadLocal.removeAll();

                            //ST_TERMINATED状态为Reactor真正的终止状态
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            
                            //使得awaitTermination方法返回
                            threadLock.countDown();

                            //统计一下当前reactor任务队列中还有多少未执行的任务,打出日志
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }

                            /**
                             * 通知Reactor的terminationFuture成功,在创建Reactor的时候会向其terminationFuture添加Listener
                             * 在listener中增加terminatedChildren个数,当所有Reactor关闭后 ReactorGroup关闭成功
                             * */
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
}

流程走到 doStartThread 方法中的 finally{...} 语句块中的时候,这个时候表示在优雅关闭的静默期内,已经没有任务继续向 Reactor 提交了。或者关闭耗时已经超过了设定的优雅关闭最大超时时间。

现在正式来到了 Reactor 的关闭流程。在流程开始之前需要确保当前 Reactor 的状态为 ST_SHUTTING_DOWN 正在关闭状态。

注意此刻用户线程依然可以向 Reactor 提交任务。当 Reactor 的状态变为 ST_SHUTDOWN 或者 ST_TERMINATED 时,用户向 Reactor 提交的任务就会被拒绝,但是此时 Reactor 的状态为 ST_SHUTTING_DOWN ,依然可以接受用户提交过来的任务。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
  @Override
  public boolean isShutdown() {
        return state >= ST_SHUTDOWN;
  }

  private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            //当Reactor的状态为ST_SHUTDOWN时,拒绝用户提交的异步任务,但是在优雅关闭ST_SHUTTING_DOWN状态时还是可以接受用户提交的任务的
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                }
                if (reject) {
                    reject();
                }
            }
        }

        .........省略........
    }
}

所以 Reactor 从工作循环 run 方法中退出随后流程一路走到这里来的这段时间,用户仍然有可能向 Reactor 提交任务,为了确保关闭流程的优雅,这里会在 for 循环中不停的执行 confirmShutdown() 方法直到所有的任务全部执行完毕。

随后会将 Reactor 的状态改为 ST_SHUTDOWN 状态,此时用户就不能在向 Reactor 提交任务了。如果此时在提交任务就会收到 RejectedExecutionException 异常。

大家这里可能会有疑问,Netty 在 Reactor 的状态变为 ST_SHUTDOWN 之后,又一次调用了 confirmShutdown() 方法,这是为什么呢?

其实这样做的目的是为了防止 Reactor 状态在变为 SHUTDOWN 之前,在这个极限的时间里,用户又向 Reactor 提交了任务,所以还需要最后一次调用 confirmShutdown() 将在这个极限时间内提交的任务执行完毕。

以上逻辑步骤就是真正优雅关闭的精髓所在,确保任务全部执行完毕,保证业务无损。

在我们优雅处理流程介绍完了之后,下面就是关闭 Reactor 的流程了:

Reactor 会在 SHUTDOWN 状态下,将 Selector 进行关闭。

    @Override
    protected void cleanup() {
        try {
            selector.close();
        } catch (IOException e) {
            logger.warn("Failed to close a selector.", e);
        }
    }

清理 Reactor 线程中遗留的所有 ThreadLocal 缓存。

FastThreadLocal.removeAll();

将 Reactor 的状态由 SHUTDOWN 改为 ST_TERMINATED 状态。此时 Reactor 就算真正的关闭了

 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);

用户线程可能会调用 Reactor 的 awaitTermination 方法阻塞等待 Reactor 的关闭,当 Reactor 关闭之后会调用 threadLock.countDown() 使得用户线程从 awaitTermination 方法返回。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private final CountDownLatch threadLock = new CountDownLatch(1);

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        
         ........省略.......

        //等待Reactor关闭
        threadLock.await(timeout, unit);
        return isTerminated();
    }

    @Override
    public boolean isTerminated() {
        return state == ST_TERMINATED;
    }
}

当这一切处理完毕之后,最后就会设置 Reactor 的 terminationFuture 为 success 。此时注册在 Reactor 的 terminationFuture 上的 listener 就会被回调。

这里还记得我们在《Reactor 在 Netty 中的实现(创建篇)》一文中介绍的,在 ReactorGroup 中的所有 Reactor 被挨个全部创建成功之后,会向所有 Reactor 的 terminationFuture 注册一个 terminationListener 。

在 terminationListener 中检测当前 ReactorGroup 中的所有 Reactor 是否全部完成关闭,如果已经全部关闭,则设置 ReactorGroup 的 terminationFuture 为Success。此刻 ReactorGroup 关闭流程结束,Netty 正式优雅谢幕完毕~~


public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    //Reactor线程组中的Reactor集合
    private final EventExecutor[] children;
    //记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    //ReactorGroup关闭future
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
      
        ........挨个创建Reactor........

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    //当所有Reactor关闭后 才认为是关闭成功
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        ........省略........
    }

}


到现在为止,Netty 的整个优雅关闭流程,笔者就为大家详细介绍完了,下图为整个优雅关闭的完整流程图,大家可以对照下面这副总体流程图在回顾下我们前面介绍的源码逻辑。

Reactor优雅关闭总流程.png

6.5 Reactor 的状态变更流转

在本文的最后,笔者再来带着大家回顾下 Reactor 的状态变更流程。

Reactor的状态变更.png

在 Reactor 被创建出来之后状态为 ST_NOT_STARTED。

随着第一个异步任务的提交 Reactor 开始启动随后状态为 ST_STARTED 。

当调用 shutdownGracefully 方法之后,Reactor 的状态变为 ST_SHUTTING_DOWN 。表示正在进行优雅关闭。此时用户仍可向 Reactor 提交异步任务。

当 Reactor 中遗留的任务全部执行完毕之后,Reactor 的状态变为 ST_SHUTDOWN 。此时如果用户继续向 Reactor 提交异步任务,会被拒绝,并收到 RejectedExecutionException 异常。

当 Selector 完成关闭,并清理掉 Reactor 线程中所有的 TheadLocal 缓存之后,Reactor 的状态变为 ST_TERMINATED 。

总结

到这里关于优雅关闭的前世今生笔者就位大家全部交代完毕了,信息量比较大,需要好好消化一下,很佩服大家能够一口气看到这里。

本文我们从进程优雅启停方案开始聊起,以优雅关闭的实现方案为起点,先是介绍了优雅关闭的底层基石-内核的信号量机制,从内核又聊到了 JVM 的 ShutdownHook 原理以及执行过程,最后通过三个知名的开源框架为案例,分别从 Spring 的优雅关闭机制聊到了 Dubbo 的优雅关闭,最后通过 Dubbo 的优雅关闭引出了 Netty 优雅关闭的详细实现方案,前后呼应。

好了,本文的内容就到这里了,大家辛苦了,相信大家认真看完之后一定会收获很大,我们下篇文章见~~~


Java 技术栈中间件优雅停机方案设计与实现全景图(下)

本文收录在
0评论

登录

忘记密码 ?

切换登录

注册