理解 CompletableFuture 的任务与回调函数的线程

继续对 CompletableFuture 的学习,本然依然不对它的众多方法的介绍,其实也不容易通过一篇述说完所有 CompletableFuture 的操作。此处重点了解下 CompletableFuture 几类操作时所使用的线程,CompletableFuture 的方法重点在它的静态方法以及实现自 CompletionStage 接口的方法,如果是意图异步化编程,反而自我标榜的 Future 中的方法用的少了。

CompletableFuture 根据任务的主从关系为

  1. 提交任务的方法,如静态方法 supplyAsync(supplier[, executor]),  runAsync(runnable[, executor])
  2. 回调函数,即对任务执行后所作出回应的方法,多数方法了,如 thenRun(action), thenRunAsync(action[, executor]), whenComplete(action), whenCompleteAsync(action[, executor]) 等

根据执行方法可分为同步与异步方法,任务都是要被异步执行,所以提交任务的方法都是异步的。而对任务作出回应的方法很多分为两个版本,如

  1. 同步方法,如 thenRun(action), whenComplete(action)
  2. 异步方法,如 thenRunAsync(action[, executor]), whenCompleteAsync(action[, executor]), 异步方法可以传入线程池,否则用默认的

因此所要理解的 CompletableFuture 的线程会涉及到任务与回调函数所使用的线程。

先来看一下执行任务的线程

我们之所以使用 Future(CompletableFuture) 是希望任务不阻塞当前线程,所以它总是会在一个新的线程中去执行,正如方法名所示 supplyAsync()runAsync() ,而不会有 supply()run()sypplyAsync()runAsync() 的区别是前者有返回值,后者无返回值,我们这里不关心返回值的问题,因此选择 runAsync() 方法为例,它有两个重载方法

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

第二个方法,我们可以安排任务在指定的 executor(线程池) 中执行,可以调用 Executors 的工具方法获得所需类型的 ExecutorService,如 newCachedThreadPool, newFixedThreadPool, newWorkStrealingPool, newSingleThreadExecutor。所以手工方法创建,如 new ForkJoinPool()。

那我们看不传入 Executor, 默认时任务在什么线程中执行,跟踪一下 runAsync(Runnable runnable) 方法就知道了

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

追踪 asyncPool

private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);

/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) { new Thread(r).start(); }
    

在我的 i5 CPU 上跑出的 ForkJoinPool.getCommonPoolParallelism() 值是 3,所以任务默认会提交到 ForkJoinPool.commonPool() 中去执行。

小结一下,执行任务时所用的线程可以自己提供,或采用默认的 ForkJoinPool.commonPool()

再看回调函数代码所用的线程

与提交任务的方法不同,回调函数通常有三个版本,以 thenRun 为例

  1. public CompletableFuture<Void> thenRun(Runnable action)
  2. public CompletableFuture<Void> thenRunAsync(Runnable action)
  3. public CompletableFuture<Void> thenRunAsync(Runnable action,  Executor executor)

第一个方法的 action 会在当前线程中执行,也就是调用 thenRun 方法所在的线程??? 这里有个疑问,可参考后面一个试验结果

thenRun() 可能使用当前线程或是执行任务的线程池来执行回调函数的代码,它同样不会阻塞当前线程,仍然是一个异步调用, 想像它是一个监听模式,可能会在主线程或是用任务的线程池来执行 thenRun() 中的代码。

第二个方法从实现中看到的也是用的 asyncPool 线程池,也就是说它将与前面的 runAsync(runnable) 共用 ForkJoinPool.commonPool() 线程池。

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action); 

第三个方法,更不必多说,通过参数 executor 传入指定的线程池实现。

下面一个完整的测试例子看看每个子代码块是由什么线程来执行的

package cc.unmi;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        System.out.println("main thread: " + Thread.currentThread());

        new Thread(Main::test1) {{
            setName("my-new-thread");
        }}.start();

        test2();
    }

    private static void test1() {

        CompletionStage<Void> futurePrice = CompletableFuture.runAsync(() -> {
                sleep(1000);
                System.out.println("test1:1 - runAsync(runnable), job thread: " + Thread.currentThread());
            }
        );

        System.out.println("test1:flag1");

        futurePrice.thenRun(() -> {
            System.out.println("test1:2 - thenRun(runnable)), action thread: " + Thread.currentThread());
        });

        System.out.println("test1:flag2");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test1:3 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
        });

    }


    private static void test2() {

        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionStage<Void> futurePrice = CompletableFuture.runAsync(() -> {
            sleep(1000);
            System.out.println("test2:1 - runAsync(runnable, executor), job thread: " + Thread.currentThread());
        }, executorService);

        System.out.println("test2:flag1");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:2 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
        });

        System.out.println("test2:flag2");

        futurePrice.thenRun(() -> {
            System.out.println("test2:3 - thenRun(runnable), action thread: " + Thread.currentThread());
        });

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:4 - thenRunAsync(runnable, executor), action thread: " + Thread.currentThread());
        }, executorService);

        executorService.shutdown();
    }

    private static void sleep(long time) {
        //if(true) return;
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
        }
    }
}

下面是在 IntelliJ IDEA 中运行时的输出

main thread: Thread[main,5,main]
test1:flag1
test2:flag1
test2:flag2
test1:flag2
test1:1 - runAsync(runnable), job thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test2:1 - runAsync(runnable, executor), job thread: Thread[pool-1-thread-1,5,main]
test1:2 - thenRun(runnable)), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:3 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test2:3 - thenRun(runnable), action thread: Thread[pool-1-thread-1,5,main]
test2:2 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]

我们把两组测试放在一起了,用 test1, test2 进行区分,我们的执行任务设定为延时 1 秒,回调时 thenRun(runnable) 并不会阻塞主线程。并且我们从输出中可以发现执行 thenRun(runnable) 回调中的代码却是使用的与执行任务时一样的线程池,并非是调用 thenRun(runnable) 方法所用的线程。thenRunAsync(runnable) 方法总是使用 ForkJoinPool.commonPool 线程池。

同样是前面的代码,如果把方法 sleep(time) 中的 //if(true) return 代码启用,使用任务很快能完成,再执行多遍,我能够得到如下两组结果:

main thread: Thread[main,5,main]
test2:flag1
test2:flag2
test2:1 - runAsync(runnable, executor), job thread: Thread[pool-1-thread-1,5,main]
test2:3 - thenRun(runnable), action thread: Thread[main,5,main]
test1:flag1
test1:flag2
test2:2 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:1 - runAsync(runnable), job thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:2 - thenRun(runnable)), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:3 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test2:4 - thenRunAsync(runnable, executor), action thread: Thread[pool-1-thread-2,5,main]

thenRun(runnable) 也能跑到非调用者线程上去,这样的测试结果确实出乎我所料。

main thread: Thread[main,5,main]
test1:flag1
test1:1 - runAsync(runnable), job thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test2:1 - runAsync(runnable, executor), job thread: Thread[pool-1-thread-1,5,main]
test2:flag1
test1:2 - thenRun(runnable)), action thread: Thread[my-new-thread,5,main]
test1:flag2
test1:3 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test2:2 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test2:flag2
test2:3 - thenRun(runnable), action thread: Thread[main,5,main]
test2:4 - thenRunAsync(runnable, executor), action thread: Thread[pool-1-thread-1,5,main]

thenRun(runnable) 跑在调用者线程上。

所以对于简单的 thenRun() 方法,我们可能会想当然的认为它会阻塞当前线程,由当前线程去执行回调代码,其实从实验中看不带 asyncthenRun() 方法仍然是一个异步方法,谁来执行回调中的代码似乎更灵活,完全由 JVM 来定,或者是 Lambda 在其中作祟,把 Lambda 换成匿名类还是一样的效果。thenRun() 大概是能根据调用者线程是否空闲来使用当前线程还是用执行任务的线程池。带 async 的方法会把任务再次提交到线程池中去。

Leave a Reply

Be the First to Comment!

avatar
wpDiscuz