Java8 CompletableFuture
Java8で追加されたCompletableFutureについてのメモです。
Java8で便利なCompletableFutureが追加されたので試したメモです。
CompletableFuture
CompletableFutureでは、結果の取得後にその結果に対して処理を実行することが出来ます。
また、複数のCompletableFutureの完了を待って処理を行ったり、
いずれかのCompletableFutureの完了を待って処理を行えます。
完了後に処理を行うメソッドは、引数に取る関数型インターフェースによって
- Functionの場合applyXXXX()
- Consumerの場合acceptXXX()
- Runnableの場合runXXX()
というメソッド名になっています。
Futureで結果に対して後処理をする場合
Futureから取得した結果に対して、下記のメソッドdoubleNum()を適用したい場合。
private static class NumUtil { static int doubleNum(int num) { return 2 * num; } }
Future.get()で結果を取得し、結果に対してdoubleNum()を適用します。
private static AtomicInteger atomicInt = new AtomicInteger(); @Test public void futureTest() throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Callable<Integer> callable = () -> atomicInt.incrementAndGet(); Future<Integer> future = executor.submit(callable); int num = future.get(); int doubled = NumUtil.doubleNum(num); assertThat(doubled).isEqualTo(2); }
CompletableFutureで結果に対して後処理をする場合
CompletableFutureの場合、非同期でタスクを実行するsupplyAsync()にタスクを指定します。
supplyAsync()は完了を待たずに戻り値としてCompletableFutureが返ります。
それに対してthenApply()で後処理を指定します。
最終的な結果はFutureと同様にget()で取得します。
@Test public void completableFutureTest() throws ExecutionException, InterruptedException { Supplier<Integer> supplier = () -> atomicInt.incrementAndGet(); int doubled = CompletableFuture.supplyAsync(supplier) .thenApply(NumUtil::doubleNum) .get(); assertThat(doubled).isEqualTo(2); }
CompletableFutureのメソッド
get()
get()でCompletableFutureが完了するのを待機して結果を取得します。
@Test public void supplyAsyncTest() throws ExecutionException, InterruptedException { Supplier<Integer> supplier = () -> atomicInt.incrementAndGet(); int result = CompletableFuture.supplyAsync(supplier) .get(); assertThat(result).isEqualTo(1); }
getNow()
getNow()でfutureが完了している場合には値を取得して、完了していない場合には引数で指定した値を返します。
@Test public void getNowTest() throws ExecutionException, InterruptedException { Supplier<Integer> supplier = () -> { try { Thread.sleep(1_000L); } catch (InterruptedException ignored) { } return atomicInt.incrementAndGet(); }; int result = CompletableFuture.supplyAsync(supplier) .getNow(999); assertThat(result).isEqualTo(999); }
thenApply()
futureの結果に対して引数で指定したFunctionを実行します。
@Test public void thenApplyTest() throws ExecutionException, InterruptedException { Supplier<Integer> supplier = () -> atomicInt.incrementAndGet(); int doubled = CompletableFuture.supplyAsync(supplier) .thenApply(NumUtil::doubleNum) .get(); assertThat(doubled).isEqualTo(2); }
thenAccept()
futureの結果に対して引数で指定したConsumerを実行します。
Consumerなので結果は戻りません。
@Test public void thenAcceptTest() throws ExecutionException, InterruptedException { Supplier<Integer> supplier = () -> atomicInt.incrementAndGet(); CompletableFuture.supplyAsync(supplier) .thenAccept(System.out::println); }
thenRun()
futureの結果に対して引数で指定したRunnableを実行します。
Runnableなので結果は戻りません。
@Test public void thenRunTest() { Supplier<Integer> supplier = () -> { int generated = atomicInt.incrementAndGet(); try { Thread.sleep(3_000L); } catch (InterruptedException ignored) { } return generated; }; Runnable printTask = () -> System.out.println("task done : " + atomicInt.get()); CompletableFuture.supplyAsync(supplier) .thenRun(printTask); }
handle()
handle()でfutureが異常終了した場合の処理(結果を戻す処理)を記述できます。
第1引数に結果、第2引数に例外が指定されて呼ばれます。
例外が発生してない場合、第2引数の例外はnullになります。
@Test public void handleTest() throws ExecutionException, InterruptedException { String[] strings = new String[2]; strings[0] = "aaa"; strings[1] = "bbb"; Supplier<String> supplier = () -> strings[3]; String result = CompletableFuture.supplyAsync(supplier) .handle((t, error) -> { if (error != null) { System.out.println("cause : " + error); return "fallback value"; } else { return t; } }) .get(); assertThat(result).isEqualTo("fallback value"); }
whenComplete()
whenCopmplete()もhandle()と同様に例外が発生した場合の処理を記述できますが、
結果は返さずに完了します。
@Test public void whenCompleteTest() { String[] strings = new String[2]; strings[0] = "aaa"; strings[1] = "bbb"; Supplier<String> supplier = () -> strings[3]; CompletableFuture.supplyAsync(supplier) .whenComplete((t, error) -> { if (error != null) { System.out.println("cause : " + error); } else { System.out.println("result : " + t); } }); }
結果
cause : java.util.concurrent.CompletionException: java.lang.ArrayIndexOutOfBoundsException: 3
thenCombine()
指定した両方のfutureが完了した後に、両方の結果に対して指定したBiFunctionを実行します。
下記の場合、addNumFutureとrandomFutureの完了後に指定したBiFunctionを実行します。
@Test public void thenCombineTest() throws ExecutionException, InterruptedException { Random random = new Random(); CompletableFuture<Integer> addNumFuture = CompletableFuture .supplyAsync(() -> atomicInt.addAndGet(5)); CompletableFuture<Integer> randomFuture = CompletableFuture .supplyAsync(() -> random.nextInt(100)); int result = addNumFuture.thenCombine(randomFuture, (add, rand) -> { System.out.println("add :" + add + ", random : " + rand); return add * rand; }).get(); System.out.println("result : " + result); }
結果
add :5, random : 59 result : 295
thenAcceptBoth()
指定した両方のfutureが完了した後に、両方の結果に対して指定したBiConsumerを実行します。
下記の場合、addNumFutureとrandomFutureの完了後に指定したBiConsumerを実行します。
BiConsumerなので結果は返しません。
@Test public void thenAcceptBothTest() throws ExecutionException, InterruptedException { Random random = new Random(); CompletableFuture<Integer> addNumFuture = CompletableFuture .supplyAsync(() -> atomicInt.addAndGet(5)); CompletableFuture<Integer> randomFuture = CompletableFuture .supplyAsync(() -> random.nextInt(100)); addNumFuture.thenAcceptBoth(randomFuture, (add, rand) -> { System.out.println("add :" + add + ", random : " + rand); System.out.println("result : " + (add + rand)); }); }
結果
add :5, random : 62 result : 67
runAfterBoth()
指定した両方のfutureが完了した後に、指定したRunnablerを実行します。
下記の場合、addNumFutureとrandomFutureの完了後に指定したRunnableを実行します。
Runnableなので結果は返しません。
@Test public void runAfterBothTest() throws ExecutionException, InterruptedException { CompletableFuture<Integer> addNumFuture = CompletableFuture .supplyAsync(() -> atomicInt.addAndGet(5)); CompletableFuture<Integer> randomFuture = CompletableFuture .supplyAsync(() -> atomicInt.addAndGet(10)); addNumFuture.runAfterBoth(randomFuture, () -> System.out.println("result : " + atomicInt.get())); }
結果
result : 15
applyToEither()
指定したfutureのどちらか一方が完了した後に、その結果に対して指定したFunctionを実行します。
下記の場合、firstTaskかsecondTaskのどちらかが完了後に指定したFunctionを実行します。
private Supplier<Integer> firstTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("first : " + randomValue); return randomValue; }; private Supplier<Integer> secondTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("second : " + randomValue); return randomValue; }; @Test public void applyToEitherTest() throws InterruptedException, ExecutionException { CompletableFuture<Integer> first = CompletableFuture.supplyAsync(firstTask); CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask); int result = first.applyToEither(second, (done) -> { System.out.println("done :" + done); return done; }).get(); System.out.println("result : " + result); Thread.sleep(2_000); }
結果
first : 546 done :546 result : 546 second : 1943
acceptEither()
指定したfutureのどちらか一方が完了した後に、その結果に対して指定したConsumerを実行します。
下記の場合、firstTaskかsecondTaskのどちらかが完了後に指定したConsumerを実行します。
Consumerなので結果は返しません。
private Supplier<Integer> firstTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("first : " + randomValue); return randomValue; }; private Supplier<Integer> secondTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("second : " + randomValue); return randomValue; }; @Test public void acceptEitherTest() throws ExecutionException, InterruptedException { CompletableFuture<Integer> first = CompletableFuture.supplyAsync(firstTask); CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask); first.acceptEither(second, (done) -> { System.out.println("done :" + done); }); Thread.sleep(2_000); }
結果
second : 69 done :69 first : 1945
runAfterEither()
指定したfutureのどちらか一方が完了した後に、指定したRunnableを実行します。
下記の場合、firstTaskかsecondTaskのどちらかが完了後に指定したRunnableを実行します。
Runnableなので結果は返しません。
private Supplier<Integer> firstTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("first : " + randomValue); return randomValue; }; private Supplier<Integer> secondTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("second : " + randomValue); return randomValue; }; @Test public void runAfterEitherTest() throws InterruptedException { CompletableFuture<Integer> first = CompletableFuture.supplyAsync(firstTask); CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask); first.runAfterEither(second, () -> { System.out.println("done!"); }); Thread.sleep(2_000); }
結果
first : 1158 done! second : 1772
allOf()
指定したすべてのfutureが完了した後に終了するCompletableFutureを返します。
返ってきたCompletableFutureに対してjoin()で完了するまで待機します。
allOf()の返り値はCompletableFuture<Void>なので、join()の戻り値で結果は取得できません。
結果は元の各CompletableFutureからget()などで取得します。
private Supplier<Integer> firstTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("first : " + randomValue); return randomValue; }; private Supplier<Integer> secondTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("second : " + randomValue); return randomValue; }; private Supplier<Integer> thirdTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("third : " + randomValue); return randomValue; }; @Test public void allOfTest() { CompletableFuture<Integer> first = CompletableFuture.supplyAsync(firstTask); CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask); CompletableFuture<Integer> third = CompletableFuture.supplyAsync(thirdTask); List<CompletableFuture> futureList = Arrays.asList(first, second, third); CompletableFuture.allOf( futureList.toArray(new CompletableFuture[futureList.size()]) ).join(); futureList.forEach(done -> { try { System.out.println("done : " + done.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }
結果
third : 461 first : 750 second : 1982 done : 750 done : 1982 done : 461
anyOf()
指定したfutureのうちどれか1つが完了した後に終了するCompletableFutureを返します。
返ってきたCompletableFutureに対してjoin()で完了するまで待機します。
allOf()の返り値はCompletableFuture<Void>なので、join()の戻り値で結果は取得できません。
結果は元の各CompletableFutureからget()などで取得します。
各CompletableFutureが完了しているかどうかはisDone()で判断出来ます。
private Supplier<Integer> firstTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("first : " + randomValue); return randomValue; }; private Supplier<Integer> secondTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("second : " + randomValue); return randomValue; }; private Supplier<Integer> thirdTask = () -> { int randomValue = random.nextInt(2_000); try { Thread.sleep(randomValue); } catch (InterruptedException ignored) { } System.out.println("third : " + randomValue); return randomValue; }; @Test public void anyOfTest() { CompletableFuture<Integer> first = CompletableFuture.supplyAsync(firstTask); CompletableFuture<Integer> second = CompletableFuture.supplyAsync(secondTask); CompletableFuture<Integer> third = CompletableFuture.supplyAsync(thirdTask); List<CompletableFuture> futureList = Arrays.asList(first, second, third); CompletableFuture.anyOf( futureList.toArray(new CompletableFuture[futureList.size()]) ).join(); futureList.forEach(mayDone -> { if (mayDone.isDone()) { try { System.out.println("done : " + mayDone.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } else { System.out.println("This future has not done yet."); } }); }
はじめに完了したsecondは結果が取得出来て表示されますが、
完了していないfirst,thirdはまだ完了していないメッセージが出力されます。
結果
second : 915 This future has not done yet. done : 915 This future has not done yet.
こんなとこです。
ソースは下記にあげました。
github.com