文章

并发任务结果归集

并发任务结果归集

Future,FutureTask,ExecutorService…

  • 用上FutureTask任务获取结果老少皆宜, 就是CPU有消耗. FutureTask也可以做闭锁(实现了Future的语义, 表示一种抽象的可计算的结果). 通过把Callable作为一个属性, 进而把它自己作为一个执行器去继承Runnable, FutureTask实际上就是一个支持取消行为的异步任务执行器.
  • Callable就是一个回调接口, 可以泛型声明返回类型, 而Runnable是线程去执行的方法
  • FutureTask实现了Future语义,提供了start,cancel,query等功能,并且实现了Runnable接口,可以提交给线程执行
  • Java并发工具类的三板斧: 状态,队列, CAS

状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * The run state of this task, initially NEW. The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:         //可能发生的状态过度过程
 * NEW -> COMPLETING -> NORMAL         // 创建-->完成-->正常
 * NEW -> COMPLETING -> EXCEPTIONAL    // 创建-->完成-->异常
 * NEW -> CANCELLED                    // 创建-->取消
 * NEW -> INTERRUPTING -> INTERRUPTED  // 创建-->中断中-->中断结束
 */
private volatile int state; // 执行器状态
private static final int NEW = 0; // 初始值 由构造函数保证
private static final int COMPLETING = 1; // 完成进行时 正在设置任务结果
private static final int NORMAL = 2; // 正常结束 任务正常执行完毕
private static final int EXCEPTIONAL = 3; // 发生异常 任务执行过程中发生异常
private static final int CANCELLED = 4; // 已经取消 任务已经取消
private static final int INTERRUPTING = 5; // 中断进行时 正在中断运行任务的线程
private static final int INTERRUPTED = 6; // 中断结束 任务被中断

/** The underlying callable; nulled out after running */
private Callable callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

看图会更清楚

FutureTask状态转换图

Future

  • cancel 可以停止任务的执行,但不一定成功 看返回值是 true or false
  • get 阻塞获取callable的任务结果,即get阻塞住调用线程,直至计算完成返回结果
  • isCancelled 是否取消成功
  • isDone 是否完成

Future.get()获取执行结果的值,取决于执行的状态,如果任务完成会立即返回结果;否则一直阻塞直到任务进入完成状态,然后返回结果或抛出异常.

Future.get()获取执行结果的值,取决于执行的状态,如果任务完成会立即返回结果;否则一直阻塞直到任务进入完成状态,然后返回结果或抛出异常.

“运行完成”表示计算的所有可能结束的状态,包含正常结束,由于取消而结束,由于异常而结束.

当进入完成状态,他会停止在这个状态上,只要state不处于NEW状态,就说明任务已经执行完毕

FutureTask负责将计算结果从执行任务的线程传递到调用这个线程的线程, 而且确保了传递过程中结果的安全发布

UNSAFE无锁变成技术,确保了线程的安全性; 为了保持无锁变成CPU的消耗, 所以用状态标记, 减少空转的时候CPU的压力

  • 任务本尊: callable
  • 任务的执行着: runner
  • 任务的结果: outcome
  • 获取任务的结果: state+outcome+waiters
  • 中断或取消任务: state+runner+waiters

run方法

  1. 检查state, 非NEW, 说明已经启动, 直接返回; 否则, 设置runner为当前线程, 成功则继续, 否则,返回.
  2. 调用Callable.call()方法执行任务, 成功调用set(result)方法, 失败则调用finishCompletion()方法, 唤醒阻塞在get()方法上的线程们.
  3. 如果省略ran变量, 并把”set(result);”语句移动到try代码块”ran=true;”语句处,会怎样呢?首先,从代码逻辑上看,是没有问题的,但是, 考虑到”set(result);”方法万一抛出异常甚至错误了呢? set() 方法最终会调用到用户自定义的done()方法, 所以, 不可省略.
  4. 如果state为INTERRUPTING, 则主动让出CPU, 自旋等特别的线程执行完中断流程. 见handlePossibleCancellationInterrupt(int s)方法.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void run() {
    // UNSAFE.compareAndSwapObject, CAS保证Callable任务只被执行一次 无锁编程
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return;
    try {
        Callable c = callable;
        // 拿到执行任务
        if (c != null && state == NEW) {

            // 任务不为空,并且执行器状态是初始值,才会执行;如果取消就不执行了
            V result;
            boolean ran;
            // 记录是否执行成功
            try {
                result = c.call();

                // 执行任务
                ran = true;
                // 成功
            } catch (Throwable ex) {
                result = null;

                // 异常,清空结果
                ran = false;

                // 失败
                setException(ex);
                // 记录异常
            }
            if (ran) // 问题:ran变量可以省略吗,把set(result);
                // 移到try块里面?
                set(result);
            // 设置结果
        }
        finally {
            runner = null;
            // 直到set状态前,runner一直都是非空的,为了防止并发调用run()方法。
            int s = state;
            if (s >= INTERRUPTING)
                // 有别的线程要中断当前线程,把CPU让出去,自旋等一下
                handlePossibleCancellationInterrupt(s);
        }
    }
    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            // 当state为INTERRUPTING时
            while (state == INTERRUPTING) // 表示有线程正在中断当前线程
                Thread.yield();
        // 让出CPU,自旋等待中断
    }
}

run方法重点做了以下几件事情

将runner属性设置成当前正在执行run方法的线程

调用callable成员变量的call方法来执行任务

设置执行结果outcome, 如果执行成功, 则outcome保存的就是执行结果; 如果执行过程中发生了异常, 那么outcome中保存的就是异常, 设置结果之前, 先将state状态设为中间态,对outcome的赋值完成后, 设置state状态为终止态

唤醒Treiber栈中所有等待的线程

善后清理(waiters, callable, runner设为null)

检查是否有遗漏的中断, 如果有, 等待中断状态完成

get方法详情, 阻塞等待获取

1
2
3
4
5
6
7
8
9
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 执行器状态
    if (s <= COMPLETING) // 如果状态小于等于COMPLETING,说明任务正在执行,需要等待
        s = awaitDone(false, 0L);
    // 等待
    return report(s);
    // 报告结果
}
1
2
3
4
5
6
7
8
9
10
11
12
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        // 参数校验
        throw new NullPointerException();
    int s = state;
    // 执行器状态
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 如果状态小于等于COMPLETING,说明任务正在执行,需要等待;等待指定时间,state依然小于等于COMPLETING
        throw new TimeoutException();
    // 抛出超时异常
    return report(s);
    // 报告结果
}
再看看awaitDone的实现, 要知道写死循环while(true)for(;;)的都是高手
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 计算deadline
    WaitNode q = null;
    // 等待结点
    boolean queued = false;
    // 是否已经入队
    for (;;) {
        if (Thread.interrupted()) {

            // 如果当前线程已经标记中断,则直接移除此结点,并抛出中断异常
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        // 执行器状态
        if (s > COMPLETING) {
            // 如果状态大于COMPLETING,说明任务已经完成,或者已经取消,直接返回
            if (q != null)
                q.thread = null;
            // 复位线程属性
            return s;
            // 返回
        }
        else if (s == COMPLETING)
            // 如果状态等于COMPLETING,说明正在整理结果,自旋等待一会儿
            Thread.yield();
        else if (q == null)
            // 初始,构建结点
            q = new WaitNode();
        else if (!queued)
            // 还没入队,则CAS入队
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        else if (timed) {
            // 是否允许超时
            nanos = deadline - System.nanoTime();
            // 计算等待时间
            if (nanos <= 0L) {
                // 超时
                removeWaiter(q);
                // 移除结点
                return state;
                // 返回结果
            }
            LockSupport.parkNanos(this, nanos);
            // 线程阻塞指定时间
        }
        else
            LockSupport.park(this);
        // 阻塞线程
    }
}

队列

接着来看队列, 在FutureTask中, 队列的实现是一个单向链表, 它表示所有等待任务执行完毕的线程的集合. 我们知道, FutureTask实现了Future接口, 可以获取”Task”的执行结果, 那么如果获取结果时, 任务还没有执行完毕怎么办呢? 此时获取结果的线程就会在一个等待队列中挂起, 直到任务执行完毕被唤醒. 这一点有点类似于AQS中的sync queue,

在并发编程中使用队列通常是将当前线程包装成某种类型的数据结构扔到等待队列中, 我们先来看看每一个节点是怎么个结构

1
2
3
4
5
6
7
static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() {
        thread = Thread.currentThread();
    }
}

可见,相比于AQS的sync queue所使用的双向链表中的Node,这个WaitNode要简单多了,它只包含了一个记录线程的thread属性和指向下一个节点的next属性。

值得一提的是,FutureTask中的这个单向链表是当做栈来使用的,确切来说是当做Treiber栈来使用的,不了解Treiber栈是个啥的可以简单的把它当做是一个线程安全的栈,它使用CAS来完成入栈出栈操作(想进一步了解的话可以看这篇文章)。为啥要使用一个线程安全的栈呢,因为同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,则这些线程就要被包装成WaitNode扔到Treiber栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用CAS操作保证入栈的线程安全,对于出栈的情况也是同理。

由于FutureTask中的队列本质上是一个Treiber(驱动)栈,那么使用这个队列就只需要一个指向栈顶节点的指针就行了,在FutureTask中,就是waiters属性:

1
2
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事实上,它就是整个单向链表的头节点。

综上,FutureTask中所使用的队列的结构如下:

FutureTask队列结构图

CAS操作

CAS操作大多数是用来改变状态的,在FutureTask中也不例外。我们一般在静态代码块中初始化需要CAS操作的属性的偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
    }
    catch (Exception e) {
        throw new Error(e);
    }
}

从这个静态代码块中我们也可以看出,CAS操作主要针对3个属性,包括state、runner和waiters,说明这3个属性基本是会被多个线程同时访问的。其中state属性代表了任务的状态,waiters属性代表了指向栈顶节点的指针,这两个我们上面已经分析过了。runner属性代表了执行FutureTask中的”Task”的线程。为什么需要一个属性来记录执行任务的线程呢?这是为了中断或者取消任务做准备的,只有知道了执行任务的线程是谁,我们才能去中断它。

定义完属性的偏移量之后,接下来就是CAS操作本身了。在FutureTask,CAS操作最终调用的还是Unsafe类的compareAndSwapXXX方法,关于Unsafe,由于带薪码文 这里不再赘述。

java8中的异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TestFuture {
    static ExecutorService executor = Executors.newFixedThreadPool(2);
    public static void main(String[] args) throws InterruptedException {
        //两个线程的线程池 //小红买酒任务,这里的future2代表的是小红未来发生的操作,返回小红买东西这个操作的结果
        CompletableFuture future2 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("爸:小红你去买瓶酒!");
            try {
                System.out.println("小红出去买酒了,女孩子跑的比较慢,估计5s后才会回来…");
                Thread.sleep(5000);
                return "我买回来了!";
            }
            catch (InterruptedException e) {
                System.err.println("小红路上遭遇了不测");
                return "来世再见!";
            }
        }, executor);
        //小明买烟任务,这里的future1代表的是小明未来买东西会发生的事,返回值是小明买东西的结果
        CompletableFuture future1 = CompletableFuture.supplyAsync(() ->
        {
            System.out.println("爸:小明你去买包烟!");
            try {
                System.out.println("小明出去买烟了,可能要3s后回来…");
                Thread.sleep(3000);
                throw new InterruptedException();
                /// return "我买回来了!";
            }
            catch (InterruptedException e) {
                System.out.println("小明路上遭遇了不测!");
                return "这是我托人带来的口信,我已经不在了。";
            }
        }, executor);

        //获取小红买酒结果,从小红的操作中获取结果,把结果打印
        future2.thenAccept((e) ->
        {
            System.out.println("小红说:" + e);
        });

        //获取小明买烟的结果
        future1.thenAccept((e) ->
        {
            System.out.println("小明说:" + e);
        });
        System.out.println("爸:等啊等 西湖美景三月天嘞……");
        System.out.println("爸: 我觉得无聊甚至去了趟厕所。");
        Thread.currentThread().join(9 * 1000);
        System.out.println("爸:终于给老子买来了……huo 酒");

        //关闭线程池
        executor.shutdown();
    }
}
本文由作者按照 CC BY 4.0 进行授权