KotlinのCoroutineを試す (Composing suspending functions)

kotlinのcoroutineを試してみたメモです。

kotlin公式のcoroutineチュートリアルのComposing suspending functionsの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/composing-suspending-functions.html

下記バージョンで試してみます。

  • kotlin 1.4.31
  • kotlinx-coroutines-core:1.4.3

Dependency

gradleを使って試してみます。

build.gradle

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.4.31'
}

group 'com.example.coroutine.kotlin'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3'
}

compileKotlin {
    kotlinOptions.jvmTarget = "11"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "11"
}

Composing suspending functions

Sequential by default

2つのsuspend関数を用意します。それぞれ少しdelayしてから数値を返します。
最初にdoSomethingUsefulOne()を実行し、次にdoSomethingUsefulTwo()を実行し、結果を足し算します。
coroutine内のコードはデフォルトでシーケンシャルです。処理時間のtimeを見ると約2秒になっています。

SequentialByDefault.kt

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}. [${Instant.now()}] [${Thread.currentThread().name}]")
    }
    println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]")
}

suspend fun doSomethingUsefulOne(): Int {
    println("processing task one... [${Instant.now()}] [${Thread.currentThread().name}]")
    // pretend we are doing something useful here
    delay(1_000L)
    println("done task one. [${Instant.now()}] [${Thread.currentThread().name}]")
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    println("processing task two... [${Instant.now()}] [${Thread.currentThread().name}]")
    // pretend we are doing something useful here, too
    delay(1_000L)
    println("done task two. [${Instant.now()}] [${Thread.currentThread().name}]")
    return 29
}

実行

processing task one... [2021-03-24T18:53:08.374509100Z] [main]
done task one. [2021-03-24T18:53:09.395847300Z] [main]
processing task two... [2021-03-24T18:53:09.396846400Z] [main]
done task two. [2021-03-24T18:53:10.399852800Z] [main]
The answer is 42. [2021-03-24T18:53:10.399852800Z] [main]
Completed in 2026 ms. [2021-03-24T18:53:10.399852800Z] [main]

Concurrent using async

doSomethingUsefulOne()doSomethingUsefulTwo()の間に依存関係がなく、
両方を同時に実行して結果を早く取得するにはasyncを利用します。

asynchlaunchと同様にcoroutineを作成しますが、Deferredを返します。
Deferredは軽量なnon-blocking futureで後から結果を取得出来ます。
await()でスレッドをblockせずに結果を取得出来ます。
DeferredJobを実装しているので、キャンセルすることも出来ます。

ConcurrentUsingAsync.kt

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]")
    }
    println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行してみると並行に実行されるため、約1秒で完了しています。

実行

processing task one... [2021-03-24T19:12:32.398027500Z] [main]
processing task two... [2021-03-24T19:12:32.423026900Z] [main]
done task one. [2021-03-24T19:12:33.420576700Z] [main]
done task two. [2021-03-24T19:12:33.423578400Z] [main]
The answer is 42. [2021-03-24T19:12:33.423578400Z] [main]
Completed in 1061 ms. [2021-03-24T19:12:33.423578400Z] [main]

Lazily started async

asyncstartパラメータにCoroutineStart.LAZYを指定すると、asyncの実行を遅延させることが出来ます。
LAZYモードでは、await()で結果を要求した時、またはそのJobstartが実行された場合にcoroutineを開始します。

先程の例とは違い、今回の例ではstartを呼び出すことで、いつ実行されるかをプログラマが制御しています。
oneを起動し、その後にtwoを起動してそれぞれの終了を待ちます。

LazilyStartedAsync.kt

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // some computation
        one.start()
        two.start()
        println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]")
    }
    println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行

processing task one... [2021-03-24T19:27:14.161246900Z] [main]
processing task two... [2021-03-24T19:27:14.178249300Z] [main]
done task one. [2021-03-24T19:27:15.177339Z] [main]
done task two. [2021-03-24T19:27:15.179246600Z] [main]
The answer is 42. [2021-03-24T19:27:15.179246600Z] [main]
Completed in 1036 ms. [2021-03-24T19:27:15.179246600Z] [main]

もしそれぞれのcoroutinestartを呼び出さずにawaitを呼び出した場合、
awaitcoroutineを実行を開始して終了を待つので、シーケンシャルな動作になります。
これはCoroutineStart.LAZYを指定した遅延処理の意図した使用例ではないので、注意が必要です。

fun main() = runBlocking {
    // If don't call start(),this will lead to sequential behavior
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]")
    }
    println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行すると逐次的に処理されて約2秒かかっています。

実行

processing task one... [2021-03-24T19:45:56.970930400Z] [main]
done task one. [2021-03-24T19:45:57.990001100Z] [main]
processing task two... [2021-03-24T19:45:57.992991900Z] [main]
done task two. [2021-03-24T19:45:58.994409Z] [main]
The answer is 42. [2021-03-24T19:45:58.994409Z] [main]
Completed in 2052 ms. [2021-03-24T19:45:58.994409Z] [main]

Async-style functions

doSomethingUsefulOnedoSomethingUsefulTwoGlobalScopeを明示的に指定してAsync styleの関数を定義出来ます。
このような関数には関数名にAsyncのsuffixを付けるとAsyncであることが明示的になります。

これらの関数はsuspend関数ではありません。
これらはどこからでも呼び出す事が出来ますが、この関数を実行すると常に非同期で実行されます。

AsyncStyleFunctions.kt

// The result type of somethingUsefulOneAsync is Deferred<Int>
fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

coroutineの外からAsync styleの関数を実行してみます。

AsyncStyleFunctions.kt

fun main() {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]")
        }
    }
    println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行

processing task one... [2021-03-24T19:52:59.251246900Z] [DefaultDispatcher-worker-1]
processing task two... [2021-03-24T19:52:59.252246600Z] [DefaultDispatcher-worker-2]
done task two. [2021-03-24T19:53:00.273330Z] [DefaultDispatcher-worker-1]
done task one. [2021-03-24T19:53:00.275332700Z] [DefaultDispatcher-worker-2]
The answer is 42. [2021-03-24T19:53:00.276326500Z] [main]
Completed in 1149 ms. [2021-03-24T19:53:00.276326500Z] [main]

val one = somethingUsefulOneAsync()の行からone.await()の箇所までにエラーで何らかの例外が発生した場合、
通常はグローバルなエラーハンドラが例外をcatchしてエラーが報告されます。
しかし、この例では開始した操作が中止された場合でもsomethingUsefulOneAsyncがバックグラウンドで実行されてしまいます。

下記のようにusefulOneAsync()からone.await()の間に、usefulTwoAsync()で例外を発生させてみます。

AsyncStyleFunctionsThrowException.kt

fun main() {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = usefulOneAsync()
        val two = usefulTwoAsync()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]")
        }
    }
    println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]")
}

// The result type of somethingUsefulOneAsync is Deferred<Int>
fun usefulOneAsync() = GlobalScope.async<Int> {
    try {
        delay(Long.MAX_VALUE) // Emulates very long computation
        doSomethingUsefulOne()
    } finally {
        println("First child was cancelled. [${Instant.now()}] [${Thread.currentThread().name}]")
    }
}

// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun usefulTwoAsync() = GlobalScope.async<Int> {
    println("Second child throws an exception. [${Instant.now()}] [${Thread.currentThread().name}]")
    throw ArithmeticException()
}

実行するとusefulTwoAsync()で例外が発生してるにもかかわらず、usefulOneAsync()はバックグラウンドで実行されたままです。

実行

Second child throws an exception. [2021-04-06T18:51:32.314642Z] [DefaultDispatcher-worker-2]

Process finished with exit code -1
(手動で強制終了)

Structured concurrency with async

doSomethingUsefulOnedoSomethingUsefulTwoを同時に実行し、その結果を返すように関数として抽出してみます。
asyncCoroutineScopeの拡張関数として定義されているので、scope内で定義する必要があります。
coroutineScopeCoroutineScopeを作成することができます。

StructuredConcurrencyWithAsync.kt

fun main() = runBlocking {
    val time = measureTimeMillis {
        println("The answer is ${concurrentSum()}. [${Instant.now()}] [${Thread.currentThread().name}]")
    }
    println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]")
}

suspend fun concurrentSum(): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

実行

processing task one... [2021-03-30T19:20:58.936223700Z] [main]
processing task two... [2021-03-30T19:20:58.982222900Z] [main]
done task one. [2021-03-30T19:20:59.973737500Z] [main]
done task two. [2021-03-30T19:20:59.982734400Z] [main]
The answer is 42. [2021-03-30T19:20:59.982734400Z] [main]
Completed in 1099 ms. [2021-03-30T19:20:59.982734400Z] [main]

下記のように2つ目のasyncの処理でArithmeticExceptionをthrowするようにしてみます。

StructuredConcurrencyWithAsyncThrowException.kt

fun main() = runBlocking<Unit> {
    try {
        failConcurrentSum()
    } catch (e: java.lang.ArithmeticException) {
        println("Computation failed with ArithmeticException. [${Instant.now()}] [${Thread.currentThread().name}]")
    }
}

suspend fun failConcurrentSum(): Int = coroutineScope {
    val one = async<Int> {
        try {
            delay(Long.MAX_VALUE) // Emulates very long computation
            42
        } catch (e: Exception) {
            println("Caught ${e.cause}. ${e.message}")
            e.printStackTrace()
            0
        } finally {
            println("First child was cancelled. [${Instant.now()}] [${Thread.currentThread().name}]")
        }
    }
    val two = async<Int> {
        println("Second child throws an exception. [${Instant.now()}] [${Thread.currentThread().name}]")
        throw ArithmeticException()
    }
    one.await() + two.await()
}

キャンセルは常にcoroutineの階層を伝搬していきます。
実行すると2つ目のasyncArithmeticExceptionthrowされると、外側のCoroutineScopeの処理が終了するため
1つ目のasynccoroutineもキャンセルされることが分かります。

実行

Second child throws an exception. [2021-04-06T18:53:40.763287100Z] [main]
Caught java.lang.ArithmeticException. Parent job is Cancelling
First child was cancelled. [2021-04-06T18:53:40.778284200Z] [main]
Computation failed with ArithmeticException. [2021-04-06T18:53:40.778284200Z] [main]
kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=ScopeCoroutine{Cancelling}@7a187f14
Caused by: java.lang.ArithmeticException
    at com.example.coroutine.composingsuspendingfunctions.StructuredConcurrencyWithAsyncThrowExceptionKt$failConcurrentSum$2$two$1.invokeSuspend(StructuredConcurrencyWithAsyncThrowException.kt:32)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
    at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
    at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
    at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
    at com.example.coroutine.composingsuspendingfunctions.StructuredConcurrencyWithAsyncThrowExceptionKt.main(StructuredConcurrencyWithAsyncThrowException.kt:9)
    at com.example.coroutine.composingsuspendingfunctions.StructuredConcurrencyWithAsyncThrowExceptionKt.main(StructuredConcurrencyWithAsyncThrowException.kt)

 
サンプルコードは下記にあげました。

github.com

おわり。