文章

优雅停机

优雅停机

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文分享 Dubbo 的优雅停机 Graceful Shutdown ,对应 《Dubbo 用户指南 —— 优雅停机》

定义如下:

Dubbo 是通过 JDK 的 ShutdownHook 来完成优雅停机的,所以如果用户使用 kill -9 PID 等强制关闭指令,是不会执行优雅停机的,只有通过 kill PID 时,才会执行。

原理如下:

服务提供方

  • 停止时,先标记为不接收新请求,新请求过来时直接报错,让客户端重试其它机器。 //<1>
  • 然后,检测线程池中的线程是否正在运行,如果有,等待所有线程执行完成,除非超时,则强制关闭。 // <2>

服务消费方

2. ShutdownHook

Dubbo 的优雅停机 ShutdownHook 在 AbstractConfig 的静态代码块初始化,代码如下:

1
2
3
4
5
6
7
8
9
10
static {
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        public void run() {
            if (logger.isInfoEnabled()) {
                logger.info("Run shutdown hook now.");
            }
            ProtocolConfig.destroyAll();
        }
    }, "DubboShutdownHook"));
}

  • 从代码的位置上来说,这不是一个好能够合适 的位置。但是考虑到保证 被初始化到 ShutdownHook ,这又是一个 的位置。当然,从官方的 TODO 来说,未来可能会换地方。
  • ProtocolConfig#destroyAll() 方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void destroyAll() {
    // 忽略,若已经销毁
    if (!destroyed.compareAndSet(false, true)) {
        return;
    }
    // 销毁 Registry 相关
    AbstractRegistryFactory.destroyAll();

    // 等到服务消费,接收到注册中心通知到该服务提供者已经下线,加大了在不重试情况下优雅停机的成功率。
    // Wait for registry notification
    try {
        Thread.sleep(ConfigUtils.getServerShutdownTimeout());
    } catch (InterruptedException e) {
        logger.warn("Interrupted unexpectedly when waiting for registry notification during shutdown process!");
    }

    // 销毁 Protocol 相关
    ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
    for (String protocolName : loader.getLoadedExtensions()) {
        try {
            Protocol protocol = loader.getLoadedExtension(protocolName);
            if (protocol != null) {
                protocol.destroy();
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}

1
- 第 2 至 5 行:**忽略**,若已经销毁。- 第 7 行:调用 `AbstractRegistryFactory#destroyAll()` 方法,销毁**所有** Registry ,取消**应用程序**中的服务提供者和消费者的**订阅**与**注册**。详细解析,见 [「2.1 AbstractRegistryFactory」](http://svip.iocoder.cn/Dubbo/graceful-shutdown/#) 中。- 第 9 至 15 行:sleep **等待**一段时间,用于**其他应用程序**的服务消费者,接收到注册中心通知,该**应用程序**的服务提供者已经下线,加大了在不重试情况下优雅停机的成功率。    * 当然,这不是**绝对**能等待到,而是开发者自己配置 `"dubbo.service.shutdown.wait"` 参数,设置等待的时长(单位:毫秒)。`ConfigUtils#getServerShutdownTimeout()` 方法,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static int getServerShutdownTimeout() {
    // 默认,10 * 1000 毫秒
    int timeout = Constants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
    // 获得 "dubbo.service.shutdown.wait" 配置项,单位:毫秒
    String value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);
    if (value != null && value.length() > 0) {
        try {
            timeout = Integer.parseInt(value);
        } catch (Exception e) {
        }
    // 若为空,获得 "dubbo.service.shutdown.wait.seconds" 配置项,单位:秒。
    // ps:目前已经废弃该参数,推荐使用 "dubbo.service.shutdown.wait"
    } else {
        value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);
        if (value != null && value.length() > 0) {
            try {
                timeout = Integer.parseInt(value) * 1000;
            } catch (Exception e) {
            }
        }
    }
    // 返回
    return timeout;
}

1
        + **默认** 10 * 1000 毫秒。    * 在 [ISSUE#1021:Enhancement for graceful shutdown](https://github.com/apache/incubator-dubbo/pull/1021) ,是针对这块的讨论,非常有趣,胖友一定要看看。目前不管是用的最多的2.5.3版本还是最新的2.5.7版本,亲自测试在不设置重试机制下是无法做到优雅停机的,这次改动主要是修改一点点代码,加上可配置的等待时间,就能简单的做到"不开启重试也能优雅停机"。其主要实现机制就是在【provider断连注册中心之后,关闭应答之前】和【consumer移除掉invoker后,关闭client之前】这两个阶段加入可配置的等待时间,目前亲测可以做到不配置重试也能优雅停机。因为现在大多数用dubbo的公司,为了避免极端情况下的雪崩和流量风暴,大部分接口都会关闭重试机制,这样,对于当前dubbo优雅停机的设定,就无法做到优雅停机了,所以这里通过比较简单的方式,加大了在不重试情况下优雅停机的成功率。
  • 第 17 至 28 行:销毁所有 Protocol 。目前分层两类 Protocol :
    • 和 Registry 集成的 Protocol 实现类 RegistryProtocol注册「2.3 RegistryProtocol」 ,关注服务的 。具体的销毁逻辑,见 中。
    • 具体协议暴露引用常用「2.2 DubboProtocol」 对应的 Protocol 实现类,例如 dubbo:// 对应的 DubboProtocol 、 hessian:// 对应的 HessianProtocol ,关注服务的 和 。因为 DubboProtocol 是最 的,所以我们以它为例子,在 中分享。

2.1 AbstractRegistryFactory

#destroyAll() 方法,销毁所有 Registry 。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

public static void destroyAll() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Close all registries " + getRegistries());
    }
    // 获得锁
    LOCK.lock();
    try {
        // 销毁
        for (Registry registry : getRegistries()) {
            try {
                registry.destroy();
            } catch (Throwable e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
        // 清空缓存
        REGISTRIES.clear();
    } finally {
        // 释放锁
        LOCK.unlock();
    }
}

  • 调用 Registry#destroy() 方法,销毁每个 Registry 。
  • AbstractRegistry 实现了公用注册订阅 的销毁逻辑:取消 和 。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public void destroy() {
    // 已销毁,跳过
    if (!destroyed.compareAndSet(false, true)) {
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Destroy registry:" + getUrl());
    }
    // 取消注册
    Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
    if (!destroyRegistered.isEmpty()) {
        for (URL url : new HashSet<URL>(getRegistered())) {
            if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                try {
                    unregister(url); // 取消注册
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unregister url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }
    // 取消订阅
    Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!destroySubscribed.isEmpty()) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                try {
                    unsubscribe(url, listener); // 取消订阅
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unsubscribe url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }
}

1
- 无论是服务**提供者**还是**消费者**,都会向 Registry 发起**注册**和**订阅**,所以**都**需要进行取消。
  • AbstractRegistry 的子类公用重试任务 FailbackRegistry ,实现销毁 的 。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void destroy() {
    // 忽略,若已经销毁
    if (!canDestroy()) {
        return;
    }
    // 调用父方法,取消注册和订阅
    super.destroy();
    // 销毁重试任务
    try {
        retryFuture.cancel(true);
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

protected boolean canDestroy(){
    return destroyed.compareAndSet(false, true);
}

  • FailbackRegistry 有多种客户端连接Zookeeper 实现类,会有销毁其对应的 的逻辑。以 Registry 举例子。代码如下:
1
2
3
4
5
6
7
8
9
10
11
@Override
public void destroy() {
    // 调用父方法,取消注册和订阅
    super.destroy();
    try {
        // 关闭 Zookeeper 客户端连接
        zkClient.close();
    } catch (Exception e) {
        logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

2.2 DubboProtocol

#destroy() 方法,销毁所有通信 ExchangeClient 和 ExchangeServer 。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@SuppressWarnings("Duplicates")
@Override
public void destroy() {
    // 销毁所有 ExchangeServer
    for (String key : new ArrayList<String>(serverMap.keySet())) {
        ExchangeServer server = serverMap.remove(key);
        if (server != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Close dubbo server: " + server.getLocalAddress());
                }
                server.close(ConfigUtils.getServerShutdownTimeout());
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }

    // 销毁所有 ExchangeClient
    for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
        ExchangeClient client = referenceClientMap.remove(key);
        if (client != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                }
                client.close(ConfigUtils.getServerShutdownTimeout()); // 销毁
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    // 销毁所有幽灵 ExchangeClient
    for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
        ExchangeClient client = ghostClientMap.remove(key);
        if (client != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                }
                client.close(ConfigUtils.getServerShutdownTimeout()); // 销毁
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    // 【TODO 8033】 参数回调
    stubServiceMethodsMap.clear();
    super.destroy();
}

  • 实际情况下,一个应用程序即可以是服务提供者消费者 ,又是服务 。因此,需要关闭 ExchangeClient 和 ExchangeServer 。
  • 第 4 至 17 行:循环「2.2.1 HeaderExchangeServer」 调用 HeaderExchangeServer#close(timeout) 方法,销毁所有 ExchangeServer 。详细解析,见 。
  • 第 19 至 32 行:循环在该方法内部「2.2.2 HeaderExchangeClient」 调用 ReferenceCountExchangeClient#close(timeout) 方法,销毁所有 ReferenceCountExchangeClient 。 ,会调用 HeaderExchangeClient#close(timeout) 方法,关闭 HeaderExchangeClient 对象。详细解析,见 。
  • 第 33 至 46 行:循环调用 LazyConnectExchangeClient#close(timeout) 方法,进行关闭。关于 LazyConnectExchangeClient ,详细见 精尽 Dubbo 源码分析 —— 服务引用(二)之远程引用(Dubbo)「5.2 LazyConnectExchangeClient」
  • 第 48 行:【TODO 8033】 参数回调
  • 第 49 行:调用 AbstractExporter#unexport() 方法,取消服务的暴露( Exporter )。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public void destroy() {
    //  销毁协议对应的服务消费者的所有 Invoker
    for (Invoker<?> invoker : invokers) {
        if (invoker != null) {
            invokers.remove(invoker);
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Destroy reference: " + invoker.getUrl());
                }
                invoker.destroy();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    // 销毁协议对应的服务提供者的所有 Exporter
    for (String key : new ArrayList<String>(exporterMap.keySet())) {
        Exporter<?> exporter = exporterMap.remove(key);
        if (exporter != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Unexport service: " + exporter.getInvoker().getUrl());
                }
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
}

1
- 第 3 至 16 行:**循环**,销毁**协议( 此处为 DubboProtocol )对应**的服务_消费者_的所有 Invoker**( 此处为 DubboInvoker )** 。详细解析,见 [「2.2.3 DubboInvoker」](http://svip.iocoder.cn/Dubbo/graceful-shutdown/#) 。- 第 17 至 30 行:**循环**,销毁**协议( 此处为 DubboProtocol )对应**的服务_提供者_的所有 Exporter**( 此处为 DubboExporter )** 。详细解析,见 [「2.2.4 DubboExporter」](http://svip.iocoder.cn/Dubbo/graceful-shutdown/#) 。

2.2.1 HeaderExchangeServer

#close(timeout) 方法,整体流程如下图:

close流程图

close

  • 红Protocol 的 Dubbo SPI Wrapper 实现类 框部分:因为 ProtocolListenerWrapper 和 ProtocolFilterWrapper 和 ,所以调用 DubboProtocol#destroy() 方法时,会先调用它们。目前仅仅是一层包装,没有逻辑 ,代码如下:
1
2
3
4
5
6
7
8
9
10
11
// ProtocolListenerWrapper.java
@Override
public void destroy() {
    protocol.destroy();
}

// ProtocolFilterWrapper.java
@Override
public void destroy() {
    protocol.destroy();
}

2.2.2 HeaderExchangeClient

#close(timeout) 方法,整体流程如下图:

close流程图

close

2.2.3 DubboInvoker

#destroy() 方法,销毁 ExchangeClient 。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void destroy() {
    // 忽略,若已经销毁
    if (super.isDestroyed()) {
        return;
    } else {
        // double check to avoid dup close
        // 双重锁校验,避免已经关闭
        destroyLock.lock();
        try {
            if (super.isDestroyed()) {
                return;
            }
            // 标记关闭
            super.destroy();
            // 移除出 `invokers`
            if (invokers != null) {
                invokers.remove(this);
            }
            // 关闭 ExchangeClient 们
            for (ExchangeClient client : clients) {
                try {
                    client.close(ConfigUtils.getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        } finally {
            // 释放锁
            destroyLock.unlock();
        }
    }
}

  • 代码比较易懂,胖友看代码注释。下面只挑选几个方法分享。
  • AbstractInvoker#isDestroyed() 方法,判断是否已经销毁。代码如下:
1
2
3
4
5
6
7
8
/**
 * 是否销毁
 */
private AtomicBoolean destroyed = new AtomicBoolean(false);

public boolean isDestroyed() {
    return destroyed.get();
}

  • AbstractInvoker#destroy() 方法,标记已经销毁。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
 * 是否可用
 */
private volatile boolean available = true;

@Override
public void destroy() {
    if (!destroyed.compareAndSet(false, true)) {
        return;
    }
    setAvailable(false);
}

protected void setAvailable(boolean available) {
    this.available = available;
}

1
- 并且,会标记 DubboInvoker 已经**不可用**。- 标记已经销毁后,再调用 `#invoke(Invocation)` 方法,会抛出 RpcException 异常。代码如下:
1
2
3
4
5
6
7
8
9
10
@Override
public Result invoke(Invocation inv) throws RpcException {
    if (destroyed.get()) {
        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
                + " use dubbo version " + Version.getVersion()
                + " is DESTROYED, can not be invoked any more!");
    }

    // ... 省略其他代码
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    * x```

- 第 20 至 27 行:**循环**
,调用
ReferenceCountExchangeClient#close(timeout)
方法,关闭客户端。 实际上,在
DubboProtocol#destroy()
方法中,已经关闭客户端。虽然看起来重复,实际不然。因为远程服务提供者关闭时, DubboInvoker 需要进行销毁,此时必须关闭客户端的链接。所以,DubboInvoker 必须有这块逻辑。

### 2.2.4 DubboExporter

#unexport() 方法,取消暴露。代码如下:

```java
/**
 * 服务键
 */
private final String key;
/**
 * Exporter 集合
 *
 * key: 服务键
 *
 * 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
 */
private final Map<String, Exporter<?>> exporterMap;

@Override
public void unexport() {
    // 取消暴露
    super.unexport();
    // 移除自己
    exporterMap.remove(key);
}

  • 调用 AbstractExporter#unexport() 方法,取消暴露。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Invoker 对象
 */
private final Invoker<T> invoker;
/**
 * 是否取消暴露服务
 */
private volatile boolean unexported = false;

@Override
public void unexport() {
    // 标记已经取消暴露
    if (unexported) {
        return;
    }
    unexported = true;
    // 销毁
    getInvoker().destroy();
}

1
2
3
4
5
6
- 其中,invoker 如下图所示:![invoker结构图](/assets/images/learning/dubbo/dubbo-graceful-shutdown/e8787dbccddbeec1442406581efbe2c5.png)
    * 这个 Invoker 通过 JavassistProxyFactory 创建,实际实现了 AbstractProxyInvoker 抽象类。所以 `#destroy()` 方法如下,代码如下:`

```java
@Override
public void destroy() { }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
        +  空的,嘿嘿嘿。```

## 2.3 RegistryProtocol

#destroy() 方法,取消所有 Exporter 的暴露。代码如下:

```java
/**
 * 绑定关系集合。
 *
 * key:服务 Dubbo URL
 */
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();

@Override
public void destroy() {
    // 获得 Exporter 数组
    List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
    // 取消所有 Exporter 的暴露
    for (Exporter<?> exporter : exporters) {
        exporter.unexport();
    }
    // 清空
    bounds.clear();
}

  • 循环暴露 ,调用 ExporterChangeableWrapper#unexport() 方法,取消服务 。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * 暴露的 Exporter 对象
 */
private Exporter<T> exporter;

@Override
public void unexport() {
    String key = getCacheKey(this.originInvoker);
    // 移除出 `bounds`
    bounds.remove(key);
    // 取消暴露
    exporter.unexport();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
- 因为服务提供者</font>**集成</font>**了配置规则 Configurator ,所以需要使用到 ExporterChangeableWrapper ,保存原有 Invoker 对象。    * 也因此,上述所有取消暴露逻辑,是无法销毁 ExporterChangeableWrapper 在 </font>bounds</font> 的映射,需要通过 RegistryProtocol 的 </font>#destroy()</font> 方法实现。    * 也因此,此处调用</font>**暴露的 Exporter 对象</font>**exporter</font> ,已经被 </font>AbstractExporter#unexport()</font> 方法,取消暴露。但是呢,这里又不能去掉这块逻辑,因为没准有地方,需要调用 </font>ExporterChangeableWrapper#unexport()</font> 方法呢。```

# 3. ExecutorUtil

## 3.1 gracefulShutdown

#gracefulShutdown(executor, timeout) 方法,**优雅**关闭,禁止新的任务提交,将原有任务执行完。

```java
public static void gracefulShutdown(Executor executor, int timeout) {
    // 忽略,若不是 ExecutorService ,或者已经关闭
    if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
        return;
    }
    // 关闭,禁止新的任务提交,将原有任务执行完
    final ExecutorService es = (ExecutorService) executor;
    try {
        es.shutdown(); // Disable new tasks from being submitted <1>
    } catch (SecurityException ex2) {
        return;
    } catch (NullPointerException ex2) {
        return;
    }
    // 等待原有任务执行完。若等待超时,强制结束所有任务
    try {
        if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
            es.shutdownNow();
        }
    } catch (InterruptedException ex) {
        // 发生 InterruptedException 异常,也强制结束所有任务
        es.shutdownNow();
        Thread.currentThread().interrupt();
    }
    // 若未关闭成功,新开线程去关闭
    if (!isShutdown(es)) {
        newThreadToCloseExecutor(es);
    }
}

3.2 shutdownNow

#shutdownNow(executor, timeout) 方法,强制关闭,包括打断原有执行中的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void shutdownNow(Executor executor, final int timeout) {
    // 忽略,若不是 ExecutorService ,或者已经关闭
    if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
        return;
    }
    // 立即关闭,包括原有任务也打断
    final ExecutorService es = (ExecutorService) executor;
    try {
        es.shutdownNow(); // <1>
    } catch (SecurityException ex2) {
        return;
    } catch (NullPointerException ex2) {
        return;
    }
    // 等待原有任务被打断完成
    try {
        es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
    // 若未关闭成功,新开线程去关闭
    if (!isShutdown(es)) {
        newThreadToCloseExecutor(es);
    }
}

  • 不是 #gracefulShutdown(executor, timeout) 方法不同的是, <1> 处调用的是 #shutdownNow() 方法,而 #shutdown() 方法。

3.3 newThreadToCloseExecutor

#newThreadToCloseExecutor(ExecutorService) 方法,新开线程,不断强制关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static void newThreadToCloseExecutor(final ExecutorService es) {
    if (!isShutdown(es)) {
        shutdownExecutor.execute(new Runnable() {
            public void run() {
                try {
                    // 循环 1000 次,不断强制结束线程池
                    for (int i = 0; i < 1000; i++) {
                        // 立即关闭,包括原有任务也打断
                        es.shutdownNow();
                        // 等待原有任务被打断完成
                        if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                            break;
                        }
                    }
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        });
    }
}

666. 彩蛋

理论来说,服务提供者如果要关闭,大体流程如下:

provider => registry :移除自己provider => consumer :我准备关闭了,不要调用我所有 consumer => provider :好的,我知道了provider => consumer :处理完所有原有请求provider 关闭

但是实际情况非常复杂,如果依赖 consumer 去应答和确认。所以 Dubbo 的选择是:

  • provider 从 registry ,移除自己。并且 sleep 等待一定时间(开发者可配),等待 consumer 获得到通知。当然,这个过程不是绝对能够成功 的。例如,consumer 连接不上 registry ,但是连的上 provider 。
  • provider 通知 consumer ,自己准备关闭,不要请求自己。全部通知完成后,等处理完原有请求。完成后,关闭本地服务器及线程池。

当然,consumer 也有优雅关闭,等待所有发起的请求结束。相对简单的多。

推荐阅读文章:

本文由作者按照 CC BY 4.0 进行授权