Java8 CompletableFuture

Java8で追加されたCompletableFutureについてのメモです。

Java8で便利なCompletableFutureが追加されたので試したメモです。

CompletableFuture

CompletableFutureでは、結果の取得後にその結果に対して処理を実行することが出来ます。
また、複数のCompletableFutureの完了を待って処理を行ったり、
いずれかのCompletableFutureの完了を待って処理を行えます。

完了後に処理を行うメソッドは、引数に取る関数型インターフェースによって

  • Functionの場合applyXXXX()
  • Consumerの場合acceptXXX()
  • Runnableの場合runXXX()

というメソッド名になっています。


Futureで結果に対して後処理をする場合
Futureから取得した結果に対して、下記のメソッドdoubleNum()を適用したい場合。

private static class NumUtil {
    static int doubleNum(int num) {
        return 2 * num;
    }
}

Future.get()で結果を取得し、結果に対してdoubleNum()を適用します。

private static AtomicInteger atomicInt = new AtomicInteger();

@Test
public void futureTest() throws ExecutionException, InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();

    Callable<Integer> callable = () -> atomicInt.incrementAndGet();

    Future<Integer> future = executor.submit(callable);
    int num = future.get();
    int doubled = NumUtil.doubleNum(num);

    assertThat(doubled).isEqualTo(2);
}


CompletableFutureで結果に対して後処理をする場合
CompletableFutureの場合、非同期でタスクを実行するsupplyAsync()にタスクを指定します。

supplyAsync()は完了を待たずに戻り値としてCompletableFutureが返ります。
それに対してthenApply()で後処理を指定します。
最終的な結果はFutureと同様にget()で取得します。

@Test
public void completableFutureTest() throws ExecutionException, InterruptedException {
    Supplier<Integer> supplier = () -> atomicInt.incrementAndGet();

    int doubled = CompletableFuture.supplyAsync(supplier)
            .thenApply(NumUtil::doubleNum)
            .get();

    assertThat(doubled).isEqualTo(2);
}

CompletableFutureのメソッド

get()

get()でCompletableFutureが完了するのを待機して結果を取得します。

@Test
public void supplyAsyncTest() throws ExecutionException, InterruptedException {
    Supplier<Integer> supplier = () -> atomicInt.incrementAndGet();

    int result = CompletableFuture.supplyAsync(supplier)
            .get();

    assertThat(result).isEqualTo(1);
}
getNow()

getNow()でfutureが完了している場合には値を取得して、完了していない場合には引数で指定した値を返します。

@Test
public void getNowTest() throws ExecutionException, InterruptedException {
    Supplier<Integer> supplier = () -> {
        try {
            Thread.sleep(1_000L);
        } catch (InterruptedException ignored) {
        }
        return atomicInt.incrementAndGet();
    };

    int result = CompletableFuture.supplyAsync(supplier)
            .getNow(999);

    assertThat(result).isEqualTo(999);
}
thenApply()

futureの結果に対して引数で指定したFunctionを実行します。

@Test
public void thenApplyTest() throws ExecutionException, InterruptedException {
    Supplier<Integer> supplier = () -> atomicInt.incrementAndGet();

    int doubled = CompletableFuture.supplyAsync(supplier)
            .thenApply(NumUtil::doubleNum)
            .get();

    assertThat(doubled).isEqualTo(2);
}
thenAccept()

futureの結果に対して引数で指定したConsumerを実行します。
Consumerなので結果は戻りません。

@Test
public void thenAcceptTest() throws ExecutionException, InterruptedException {
    Supplier<Integer> supplier = () -> atomicInt.incrementAndGet();

    CompletableFuture.supplyAsync(supplier)
            .thenAccept(System.out::println);
}
thenRun()

futureの結果に対して引数で指定したRunnableを実行します。
Runnableなので結果は戻りません。

@Test
public void thenRunTest() {
    Supplier<Integer> supplier = () -> {
        int generated = atomicInt.incrementAndGet();
        try {
            Thread.sleep(3_000L);
        } catch (InterruptedException ignored) {
        }
        return generated;
    };

    Runnable printTask = () -> System.out.println("task done : " + atomicInt.get());

    CompletableFuture.supplyAsync(supplier)
            .thenRun(printTask);
}
handle()

handle()でfutureが異常終了した場合の処理(結果を戻す処理)を記述できます。
第1引数に結果、第2引数に例外が指定されて呼ばれます。
例外が発生してない場合、第2引数の例外はnullになります。

@Test
public void handleTest() throws ExecutionException, InterruptedException {
    String[] strings = new String[2];
    strings[0] = "aaa";
    strings[1] = "bbb";

    Supplier<String> supplier = () -> strings[3];

    String result = CompletableFuture.supplyAsync(supplier)
            .handle((t, error) -> {
                if (error != null) {
                    System.out.println("cause : " + error);
                    return "fallback value";
                } else {
                    return t;
                }
            })
            .get();

    assertThat(result).isEqualTo("fallback value");
}
whenComplete()

whenCopmplete()もhandle()と同様に例外が発生した場合の処理を記述できますが、
結果は返さずに完了します。

@Test
public void whenCompleteTest() {
    String[] strings = new String[2];
    strings[0] = "aaa";
    strings[1] = "bbb";

    Supplier<String> supplier = () -> strings[3];

    CompletableFuture.supplyAsync(supplier)
            .whenComplete((t, error) -> {
                if (error != null) {
                    System.out.println("cause : " + error);
                } else {
                    System.out.println("result : " + t);
                }
            });
}

結果

cause : java.util.concurrent.CompletionException: java.lang.ArrayIndexOutOfBoundsException: 3
thenCombine()

指定した両方のfutureが完了した後に、両方の結果に対して指定したBiFunctionを実行します。
下記の場合、addNumFutureとrandomFutureの完了後に指定したBiFunctionを実行します。

@Test
public void thenCombineTest() throws ExecutionException, InterruptedException {
    Random random = new Random();

    CompletableFuture<Integer> addNumFuture = CompletableFuture
            .supplyAsync(() -> atomicInt.addAndGet(5));

    CompletableFuture<Integer> randomFuture = CompletableFuture
            .supplyAsync(() -> random.nextInt(100));

    int result = addNumFuture.thenCombine(randomFuture, (add, rand) -> {
        System.out.println("add :" + add + ", random : " + rand);
        return add * rand;
    }).get();

    System.out.println("result : " + result);
}

結果

add :5, random : 59
result : 295
thenAcceptBoth()

指定した両方のfutureが完了した後に、両方の結果に対して指定したBiConsumerを実行します。
下記の場合、addNumFutureとrandomFutureの完了後に指定したBiConsumerを実行します。
BiConsumerなので結果は返しません。

@Test
public void thenAcceptBothTest() throws ExecutionException, InterruptedException {
    Random random = new Random();

    CompletableFuture<Integer> addNumFuture = CompletableFuture
            .supplyAsync(() -> atomicInt.addAndGet(5));

    CompletableFuture<Integer> randomFuture = CompletableFuture
            .supplyAsync(() -> random.nextInt(100));

    addNumFuture.thenAcceptBoth(randomFuture, (add, rand) -> {
        System.out.println("add :" + add + ", random : " + rand);
        System.out.println("result : " + (add + rand));
    });
}

結果

add :5, random : 62
result : 67
runAfterBoth()

指定した両方のfutureが完了した後に、指定したRunnablerを実行します。
下記の場合、addNumFutureとrandomFutureの完了後に指定したRunnableを実行します。
Runnableなので結果は返しません。

@Test
public void runAfterBothTest() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> addNumFuture = CompletableFuture
            .supplyAsync(() -> atomicInt.addAndGet(5));

    CompletableFuture<Integer> randomFuture = CompletableFuture
            .supplyAsync(() -> atomicInt.addAndGet(10));

    addNumFuture.runAfterBoth(randomFuture, () -> System.out.println("result : " + atomicInt.get()));
}

結果

result : 15
applyToEither()

指定したfutureのどちらか一方が完了した後に、その結果に対して指定したFunctionを実行します。
下記の場合、firstTaskかsecondTaskのどちらかが完了後に指定したFunctionを実行します。

private Supplier<Integer> firstTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("first : " + randomValue);
    return randomValue;
};

private Supplier<Integer> secondTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("second : " + randomValue);
    return randomValue;
};

@Test
public void applyToEitherTest() throws InterruptedException, ExecutionException {
    CompletableFuture<Integer> first  = CompletableFuture.supplyAsync(firstTask);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask);

    int result = first.applyToEither(second, (done) -> {
        System.out.println("done :" + done);
        return done;
    }).get();

    System.out.println("result : " + result);

    Thread.sleep(2_000);
}

結果

first : 546
done :546
result : 546
second : 1943
acceptEither()

指定したfutureのどちらか一方が完了した後に、その結果に対して指定したConsumerを実行します。
下記の場合、firstTaskかsecondTaskのどちらかが完了後に指定したConsumerを実行します。
Consumerなので結果は返しません。

private Supplier<Integer> firstTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("first : " + randomValue);
    return randomValue;
};

private Supplier<Integer> secondTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("second : " + randomValue);
    return randomValue;
};

@Test
public void acceptEitherTest() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> first  = CompletableFuture.supplyAsync(firstTask);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask);

    first.acceptEither(second, (done) -> {
        System.out.println("done :" + done);
    });

    Thread.sleep(2_000);
}

結果

second : 69
done :69
first : 1945
runAfterEither()

指定したfutureのどちらか一方が完了した後に、指定したRunnableを実行します。
下記の場合、firstTaskかsecondTaskのどちらかが完了後に指定したRunnableを実行します。
Runnableなので結果は返しません。

private Supplier<Integer> firstTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("first : " + randomValue);
    return randomValue;
};

private Supplier<Integer> secondTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("second : " + randomValue);
    return randomValue;
};

@Test
public void runAfterEitherTest() throws InterruptedException {
    CompletableFuture<Integer> first  = CompletableFuture.supplyAsync(firstTask);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask);

    first.runAfterEither(second, () -> {
        System.out.println("done!");
    });

    Thread.sleep(2_000);
}

結果

first : 1158
done!
second : 1772
allOf()

指定したすべてのfutureが完了した後に終了するCompletableFutureを返します。
返ってきたCompletableFutureに対してjoin()で完了するまで待機します。

allOf()の返り値はCompletableFuture<Void>なので、join()の戻り値で結果は取得できません。
結果は元の各CompletableFutureからget()などで取得します。

private Supplier<Integer> firstTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("first : " + randomValue);
    return randomValue;
};

private Supplier<Integer> secondTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("second : " + randomValue);
    return randomValue;
};

private Supplier<Integer> thirdTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("third : " + randomValue);
    return randomValue;
};

@Test
public void allOfTest() {
    CompletableFuture<Integer> first  = CompletableFuture.supplyAsync(firstTask);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask);
    CompletableFuture<Integer> third  = CompletableFuture.supplyAsync(thirdTask);
    List<CompletableFuture> futureList = Arrays.asList(first, second, third);

    CompletableFuture.allOf(
            futureList.toArray(new CompletableFuture[futureList.size()])
    ).join();

    futureList.forEach(done -> {
                try {
                    System.out.println("done : " + done.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
    });
}

結果

third : 461
first : 750
second : 1982
done : 750
done : 1982
done : 461
anyOf()

指定したfutureのうちどれか1つが完了した後に終了するCompletableFutureを返します。
返ってきたCompletableFutureに対してjoin()で完了するまで待機します。

allOf()の返り値はCompletableFuture<Void>なので、join()の戻り値で結果は取得できません。
結果は元の各CompletableFutureからget()などで取得します。

各CompletableFutureが完了しているかどうかはisDone()で判断出来ます。

private Supplier<Integer> firstTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("first : " + randomValue);
    return randomValue;
};

private Supplier<Integer> secondTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("second : " + randomValue);
    return randomValue;
};

private Supplier<Integer> thirdTask = () -> {
    int randomValue = random.nextInt(2_000);
    try {
        Thread.sleep(randomValue);
    } catch (InterruptedException ignored) {
    }
    System.out.println("third : " + randomValue);
    return randomValue;
};

@Test
public void anyOfTest() {
    CompletableFuture<Integer> first  = CompletableFuture.supplyAsync(firstTask);
    CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask);
    CompletableFuture<Integer> third  = CompletableFuture.supplyAsync(thirdTask);
    List<CompletableFuture> futureList = Arrays.asList(first, second, third);

    CompletableFuture.anyOf(
            futureList.toArray(new CompletableFuture[futureList.size()])
    ).join();

    futureList.forEach(mayDone -> {
        if (mayDone.isDone()) {
            try {
                System.out.println("done : " + mayDone.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("This future has not done yet.");
        }
    });
}

はじめに完了したsecondは結果が取得出来て表示されますが、
完了していないfirst,thirdはまだ完了していないメッセージが出力されます。

結果

second : 915
This future has not done yet.
done : 915
This future has not done yet.


こんなとこです。


ソースは下記にあげました。
github.com