KotlinのCoroutineを試す (Composing suspending functions)
kotlinのcoroutineを試してみたメモです。
kotlin公式のcoroutineチュートリアルのComposing suspending functionsの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/composing-suspending-functions.html
下記バージョンで試してみます。
- kotlin 1.4.31
- kotlinx-coroutines-core:1.4.3
Dependency
gradleを使って試してみます。
build.gradle
plugins { id 'org.jetbrains.kotlin.jvm' version '1.4.31' } group 'com.example.coroutine.kotlin' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3' } compileKotlin { kotlinOptions.jvmTarget = "11" } compileTestKotlin { kotlinOptions.jvmTarget = "11" }
Composing suspending functions
Sequential by default
2つのsuspend関数を用意します。それぞれ少しdelay
してから数値を返します。
最初にdoSomethingUsefulOne()
を実行し、次にdoSomethingUsefulTwo()
を実行し、結果を足し算します。
coroutine内のコードはデフォルトでシーケンシャルです。処理時間のtime
を見ると約2秒になっています。
SequentialByDefault.kt
fun main() = runBlocking { val time = measureTimeMillis { val one = doSomethingUsefulOne() val two = doSomethingUsefulTwo() println("The answer is ${one + two}. [${Instant.now()}] [${Thread.currentThread().name}]") } println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]") } suspend fun doSomethingUsefulOne(): Int { println("processing task one... [${Instant.now()}] [${Thread.currentThread().name}]") // pretend we are doing something useful here delay(1_000L) println("done task one. [${Instant.now()}] [${Thread.currentThread().name}]") return 13 } suspend fun doSomethingUsefulTwo(): Int { println("processing task two... [${Instant.now()}] [${Thread.currentThread().name}]") // pretend we are doing something useful here, too delay(1_000L) println("done task two. [${Instant.now()}] [${Thread.currentThread().name}]") return 29 }
実行
processing task one... [2021-03-24T18:53:08.374509100Z] [main] done task one. [2021-03-24T18:53:09.395847300Z] [main] processing task two... [2021-03-24T18:53:09.396846400Z] [main] done task two. [2021-03-24T18:53:10.399852800Z] [main] The answer is 42. [2021-03-24T18:53:10.399852800Z] [main] Completed in 2026 ms. [2021-03-24T18:53:10.399852800Z] [main]
Concurrent using async
doSomethingUsefulOne()
とdoSomethingUsefulTwo()
の間に依存関係がなく、
両方を同時に実行して結果を早く取得するにはasync
を利用します。
asynch
はlaunch
と同様にcoroutine
を作成しますが、Deferred
を返します。
Deferred
は軽量なnon-blocking futureで後から結果を取得出来ます。
await()
でスレッドをblockせずに結果を取得出来ます。
Deferred
はJob
を実装しているので、キャンセルすることも出来ます。
ConcurrentUsingAsync.kt
fun main() = runBlocking { val time = measureTimeMillis { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]") } println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]") }
実行してみると並行に実行されるため、約1秒で完了しています。
実行
processing task one... [2021-03-24T19:12:32.398027500Z] [main] processing task two... [2021-03-24T19:12:32.423026900Z] [main] done task one. [2021-03-24T19:12:33.420576700Z] [main] done task two. [2021-03-24T19:12:33.423578400Z] [main] The answer is 42. [2021-03-24T19:12:33.423578400Z] [main] Completed in 1061 ms. [2021-03-24T19:12:33.423578400Z] [main]
Lazily started async
async
のstart
パラメータにCoroutineStart.LAZY
を指定すると、async
の実行を遅延させることが出来ます。
LAZYモードでは、await()
で結果を要求した時、またはそのJob
のstart
が実行された場合にcoroutine
を開始します。
先程の例とは違い、今回の例ではstart
を呼び出すことで、いつ実行されるかをプログラマが制御しています。
oneを起動し、その後にtwoを起動してそれぞれの終了を待ちます。
LazilyStartedAsync.kt
fun main() = runBlocking { val time = measureTimeMillis { val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() } val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() } // some computation one.start() two.start() println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]") } println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]") }
実行
processing task one... [2021-03-24T19:27:14.161246900Z] [main] processing task two... [2021-03-24T19:27:14.178249300Z] [main] done task one. [2021-03-24T19:27:15.177339Z] [main] done task two. [2021-03-24T19:27:15.179246600Z] [main] The answer is 42. [2021-03-24T19:27:15.179246600Z] [main] Completed in 1036 ms. [2021-03-24T19:27:15.179246600Z] [main]
もしそれぞれのcoroutine
でstart
を呼び出さずにawait
を呼び出した場合、
await
がcoroutine
を実行を開始して終了を待つので、シーケンシャルな動作になります。
これはCoroutineStart.LAZY
を指定した遅延処理の意図した使用例ではないので、注意が必要です。
fun main() = runBlocking { // If don't call start(),this will lead to sequential behavior val time = measureTimeMillis { val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() } val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() } println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]") } println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]") }
実行すると逐次的に処理されて約2秒かかっています。
実行
processing task one... [2021-03-24T19:45:56.970930400Z] [main] done task one. [2021-03-24T19:45:57.990001100Z] [main] processing task two... [2021-03-24T19:45:57.992991900Z] [main] done task two. [2021-03-24T19:45:58.994409Z] [main] The answer is 42. [2021-03-24T19:45:58.994409Z] [main] Completed in 2052 ms. [2021-03-24T19:45:58.994409Z] [main]
Async-style functions
doSomethingUsefulOne
とdoSomethingUsefulTwo
をGlobalScope
を明示的に指定してAsync styleの関数を定義出来ます。
このような関数には関数名にAsyncのsuffixを付けるとAsyncであることが明示的になります。
これらの関数はsuspend関数ではありません。
これらはどこからでも呼び出す事が出来ますが、この関数を実行すると常に非同期で実行されます。
AsyncStyleFunctions.kt
// The result type of somethingUsefulOneAsync is Deferred<Int> fun somethingUsefulOneAsync() = GlobalScope.async { doSomethingUsefulOne() } // The result type of somethingUsefulTwoAsync is Deferred<Int> fun somethingUsefulTwoAsync() = GlobalScope.async { doSomethingUsefulTwo() }
coroutine
の外からAsync styleの関数を実行してみます。
AsyncStyleFunctions.kt
fun main() { val time = measureTimeMillis { // we can initiate async actions outside of a coroutine val one = somethingUsefulOneAsync() val two = somethingUsefulTwoAsync() // but waiting for a result must involve either suspending or blocking. // here we use `runBlocking { ... }` to block the main thread while waiting for the result runBlocking { println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]") } } println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]") }
実行
processing task one... [2021-03-24T19:52:59.251246900Z] [DefaultDispatcher-worker-1] processing task two... [2021-03-24T19:52:59.252246600Z] [DefaultDispatcher-worker-2] done task two. [2021-03-24T19:53:00.273330Z] [DefaultDispatcher-worker-1] done task one. [2021-03-24T19:53:00.275332700Z] [DefaultDispatcher-worker-2] The answer is 42. [2021-03-24T19:53:00.276326500Z] [main] Completed in 1149 ms. [2021-03-24T19:53:00.276326500Z] [main]
val one = somethingUsefulOneAsync()
の行からone.await()
の箇所までにエラーで何らかの例外が発生した場合、
通常はグローバルなエラーハンドラが例外をcatchしてエラーが報告されます。
しかし、この例では開始した操作が中止された場合でもsomethingUsefulOneAsync
がバックグラウンドで実行されてしまいます。
下記のようにusefulOneAsync()
からone.await()
の間に、usefulTwoAsync()
で例外を発生させてみます。
AsyncStyleFunctionsThrowException.kt
fun main() { val time = measureTimeMillis { // we can initiate async actions outside of a coroutine val one = usefulOneAsync() val two = usefulTwoAsync() // but waiting for a result must involve either suspending or blocking. // here we use `runBlocking { ... }` to block the main thread while waiting for the result runBlocking { println("The answer is ${one.await() + two.await()}. [${Instant.now()}] [${Thread.currentThread().name}]") } } println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]") } // The result type of somethingUsefulOneAsync is Deferred<Int> fun usefulOneAsync() = GlobalScope.async<Int> { try { delay(Long.MAX_VALUE) // Emulates very long computation doSomethingUsefulOne() } finally { println("First child was cancelled. [${Instant.now()}] [${Thread.currentThread().name}]") } } // The result type of somethingUsefulTwoAsync is Deferred<Int> fun usefulTwoAsync() = GlobalScope.async<Int> { println("Second child throws an exception. [${Instant.now()}] [${Thread.currentThread().name}]") throw ArithmeticException() }
実行するとusefulTwoAsync()
で例外が発生してるにもかかわらず、usefulOneAsync()
はバックグラウンドで実行されたままです。
実行
Second child throws an exception. [2021-04-06T18:51:32.314642Z] [DefaultDispatcher-worker-2] Process finished with exit code -1 (手動で強制終了)
Structured concurrency with async
doSomethingUsefulOne
とdoSomethingUsefulTwo
を同時に実行し、その結果を返すように関数として抽出してみます。
async
は CoroutineScope
の拡張関数として定義されているので、scope内で定義する必要があります。
coroutineScope
でCoroutineScope
を作成することができます。
StructuredConcurrencyWithAsync.kt
fun main() = runBlocking { val time = measureTimeMillis { println("The answer is ${concurrentSum()}. [${Instant.now()}] [${Thread.currentThread().name}]") } println("Completed in $time ms. [${Instant.now()}] [${Thread.currentThread().name}]") } suspend fun concurrentSum(): Int = coroutineScope { val one = async { doSomethingUsefulOne() } val two = async { doSomethingUsefulTwo() } one.await() + two.await() }
実行
processing task one... [2021-03-30T19:20:58.936223700Z] [main] processing task two... [2021-03-30T19:20:58.982222900Z] [main] done task one. [2021-03-30T19:20:59.973737500Z] [main] done task two. [2021-03-30T19:20:59.982734400Z] [main] The answer is 42. [2021-03-30T19:20:59.982734400Z] [main] Completed in 1099 ms. [2021-03-30T19:20:59.982734400Z] [main]
下記のように2つ目のasync
の処理でArithmeticException
をthrowするようにしてみます。
StructuredConcurrencyWithAsyncThrowException.kt
fun main() = runBlocking<Unit> { try { failConcurrentSum() } catch (e: java.lang.ArithmeticException) { println("Computation failed with ArithmeticException. [${Instant.now()}] [${Thread.currentThread().name}]") } } suspend fun failConcurrentSum(): Int = coroutineScope { val one = async<Int> { try { delay(Long.MAX_VALUE) // Emulates very long computation 42 } catch (e: Exception) { println("Caught ${e.cause}. ${e.message}") e.printStackTrace() 0 } finally { println("First child was cancelled. [${Instant.now()}] [${Thread.currentThread().name}]") } } val two = async<Int> { println("Second child throws an exception. [${Instant.now()}] [${Thread.currentThread().name}]") throw ArithmeticException() } one.await() + two.await() }
キャンセルは常にcoroutine
の階層を伝搬していきます。
実行すると2つ目のasync
でArithmeticException
がthrow
されると、外側のCoroutineScope
の処理が終了するため
1つ目のasync
のcoroutine
もキャンセルされることが分かります。
実行
Second child throws an exception. [2021-04-06T18:53:40.763287100Z] [main] Caught java.lang.ArithmeticException. Parent job is Cancelling First child was cancelled. [2021-04-06T18:53:40.778284200Z] [main] Computation failed with ArithmeticException. [2021-04-06T18:53:40.778284200Z] [main] kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=ScopeCoroutine{Cancelling}@7a187f14 Caused by: java.lang.ArithmeticException at com.example.coroutine.composingsuspendingfunctions.StructuredConcurrencyWithAsyncThrowExceptionKt$failConcurrentSum$2$two$1.invokeSuspend(StructuredConcurrencyWithAsyncThrowException.kt:32) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274) at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84) at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59) at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source) at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38) at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source) at com.example.coroutine.composingsuspendingfunctions.StructuredConcurrencyWithAsyncThrowExceptionKt.main(StructuredConcurrencyWithAsyncThrowException.kt:9) at com.example.coroutine.composingsuspendingfunctions.StructuredConcurrencyWithAsyncThrowExceptionKt.main(StructuredConcurrencyWithAsyncThrowException.kt)
サンプルコードは下記にあげました。
おわり。