KotlinのCoroutineを試す (Cancellation and timeouts)
kotlinのcoroutineを試してみたメモです。
kotlin公式のcoroutineチュートリアルのCancellation and timeoutsの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/cancellation-and-timeouts.html
下記バージョンで試してみます。
- kotlin 1.4.31
- kotlinx-coroutines-core:1.4.3
Dependency
gradleを使って試してみます。
build.gradle
plugins { id 'org.jetbrains.kotlin.jvm' version '1.4.31' } group 'com.example.coroutine.kotlin' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3' } compileKotlin { kotlinOptions.jvmTarget = "11" } compileTestKotlin { kotlinOptions.jvmTarget = "11" }
Cancellation and timeouts
Cancelling coroutine execution
launch
は戻り値としてjob
を返すので、それを利用すると長時間実行されているbackgroud coroutineをキャンセルする事が出来ます。
CancellingCoroutineExecution.kt
fun main() = runBlocking { val job = launch { repeat(1_000) { i -> println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]") delay(1_000L) } } delay(2_300L) // delay a bit println("main: I'm tired of waiting! [${Instant.now()}] [${Thread.currentThread().name}]") job.cancel() // cancels the job job.join() // waits for job's completion println("main: Now I can quit [${Instant.now()}] [${Thread.currentThread().name}]") }
job.cancel()
が実行されるとすぐにキャンセルされます。
実行
job: I'm sleeping 0 ... [2021-03-17T18:41:29.549447400Z] [main] job: I'm sleeping 1 ... [2021-03-17T18:41:30.566447600Z] [main] job: I'm sleeping 2 ... [2021-03-17T18:41:31.568447600Z] [main] main: I'm tired of waiting! [2021-03-17T18:41:31.845448Z] [main] main: Now I can quit [2021-03-17T18:41:31.858448900Z] [main]
cancel()
とjoin()
をまとめたcancelAndJoin()
という拡張関数があるので、それを利用すると同様の結果が得られます。
CancellingCoroutineExecutionUsingCancelAndJoin.kt
fun main() = runBlocking { val job = launch { repeat(1_000) { i -> println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]") delay(1_000L) } } delay(2_300L) // delay a bit println("main: I'm tired of waiting! [${Instant.now()}] [${Thread.currentThread().name}]") job.cancelAndJoin() // cancels the job and waits for job's completion println("main: Now I can quit [${Instant.now()}] [${Thread.currentThread().name}]") }
Cancellation is cooperative
suspend関数はすべてキャンセル可能になっており、キャンセルされるとCancellationException
がthrowされます。
ただし、下記の様にcoroutineが計算処理をしていてキャンセルをチェックしてない場合はキャンセル出来ません。
CancellationIsCooperative.kt
fun main() = runBlocking { val startTime = System.currentTimeMillis() val job = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (i < 10) { // computation loop, just wastes CPU // print a message once a second if (System.currentTimeMillis() >= nextPrintTime) { println("job: I'm sleeping ${i++} ... [${Instant.now()}] [${Thread.currentThread().name}]") nextPrintTime += 1_000L } } } delay(2_300L) // delay a bit println("main: I'm tired of waiting! [${Instant.now()}] [${Thread.currentThread().name}]") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit [${Instant.now()}] [${Thread.currentThread().name}]") }
実行
job: I'm sleeping 0 ... [2021-03-17T19:19:59.956846800Z] [DefaultDispatcher-worker-2] job: I'm sleeping 1 ... [2021-03-17T19:20:00.898996400Z] [DefaultDispatcher-worker-2] job: I'm sleeping 2 ... [2021-03-17T19:20:01.898682900Z] [DefaultDispatcher-worker-2] main: I'm tired of waiting! [2021-03-17T19:20:02.263785700Z] [main] job: I'm sleeping 3 ... [2021-03-17T19:20:02.898595900Z] [DefaultDispatcher-worker-2] job: I'm sleeping 4 ... [2021-03-17T19:20:03.898597300Z] [DefaultDispatcher-worker-2] job: I'm sleeping 5 ... [2021-03-17T19:20:04.898990700Z] [DefaultDispatcher-worker-2] job: I'm sleeping 6 ... [2021-03-17T19:20:05.898904800Z] [DefaultDispatcher-worker-2] job: I'm sleeping 7 ... [2021-03-17T19:20:06.898527100Z] [DefaultDispatcher-worker-2] job: I'm sleeping 8 ... [2021-03-17T19:20:07.898805500Z] [DefaultDispatcher-worker-2] job: I'm sleeping 9 ... [2021-03-17T19:20:08.898662600Z] [DefaultDispatcher-worker-2] main: Now I can quit [2021-03-17T19:20:08.900646200Z] [main]
2300ミリ秒実行している間に、"job: I'm sleeping..."は3回実行され、その後すぐにcancelAndJoin()
が実行されます。
ただし、whileで計算処理をしていてキャンセルがチェックされないのでこの処理の完了まで待って終了します。
先程の例はdelay()
を使用しており、delay()
のようなすべてのsuspend関数はキャンセル可能ですが、
この例ではsuspend関数ではないためキャンセル出来ません。
Making computation code cancellable
先程のようなキャンセル出来ない処理をcancellableにするには2つの方法があります。
1つはsuspend関数(yield
など)を定期的に呼び出してキャンセルをチェックする方法で、
もう1つはキャンセルのステータスをチェックする方法です。
CoroutineScope
の拡張propertyのisActive
を利用してチェックしてみます。
MakingComputationCodeCancellable.kt
fun main() = runBlocking { val startTime = System.currentTimeMillis() val job = launch(Dispatchers.Default) { var nextPrintTime = startTime var i = 0 while (isActive) { // cancellable computation loop // print a message once a second if (System.currentTimeMillis() >= nextPrintTime) { println("job: I'm sleeping ${i++} ... [${Instant.now()}] [${Thread.currentThread().name}]") nextPrintTime += 1_000L } } } delay(2_300L) // delay a bit println("main: I'm tired of waiting! [${Thread.currentThread().name}]") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit [${Thread.currentThread().name}]") }
実行
job: I'm sleeping 0 ... [2021-03-18T18:14:47.670175200Z] [DefaultDispatcher-worker-1] job: I'm sleeping 1 ... [2021-03-18T18:14:48.611173400Z] [DefaultDispatcher-worker-1] job: I'm sleeping 2 ... [2021-03-18T18:14:49.611173700Z] [DefaultDispatcher-worker-1] main: I'm tired of waiting! [main] main: Now I can quit [main]
Closing resources with finally
suspend関数はキャンセル時にCancellationException
をthrowします。
そのため、try~catch
やuse
を使用するとキャンセルされた場合に終了処理を実行します。
join()
やcancelAndJoin()
は終了処理を待って完了します。
ClosingResourcesWithFinally.kt
fun main() = runBlocking { val job = launch { try { repeat(1_000) { i -> println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]") delay(1_000L) } } finally { println("job: I'm running finally. [${Instant.now()}] [${Thread.currentThread().name}]") } } delay(2_300L) // delay a bit println("main: I'm tired of waiting! [${Thread.currentThread().name}]") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit [${Thread.currentThread().name}]") }
実行
job: I'm sleeping 0 ... [2021-03-18T18:18:20.921596800Z] [main] job: I'm sleeping 1 ... [2021-03-18T18:18:21.942110500Z] [main] job: I'm sleeping 2 ... [2021-03-18T18:18:22.943108800Z] [main] main: I'm tired of waiting! [main] job: I'm running finally. [2021-03-18T18:18:23.219114800Z] [main] main: Now I can quit [main]
Run non-cancellable block
先程の例のfinally
でsuspend関数を使用すると、coroutineがキャンセルされるのでCancellationException
が発生します。
通常、ファイルなどのclose処理はsuspendではないノンブロッキングなので問題ありませんが、
キャンセルされたcoroutineでsuspendする必要がある場合は、withContext(NonCancellable)
を使用して処理出来ます。
RunNonCancellableBlock.kt
fun main() = runBlocking { val job = launch { try { repeat(1_000) { i -> println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]") delay(1_000L) } } finally { withContext(NonCancellable) { println("job: I'm running finally. [${Instant.now()}] [${Thread.currentThread().name}]") delay(1_000L) println("job: And I've just delayed for 1 sec because I'm non-cancelable. [${Instant.now()}] [${Thread.currentThread().name}]") } } } delay(2_300L) // delay a bit println("main: I'm tired of waiting! [${Thread.currentThread().name}]") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit [${Thread.currentThread().name}]") }
実行
job: I'm sleeping 0 ... [2021-03-18T19:12:36.356649300Z] [main] job: I'm sleeping 1 ... [2021-03-18T19:12:37.371110900Z] [main] job: I'm sleeping 2 ... [2021-03-18T19:12:38.372137800Z] [main] main: I'm tired of waiting! [main] job: I'm running finally. [2021-03-18T19:12:38.662108400Z] [main] job: And I've just delayed for 1 sec because I'm non-cancelable. [2021-03-18T19:12:39.664122900Z] [main] main: Now I can quit [main]
Timeout
処理が一定時間を超えてタイムアウトした場合にキャンセルする場合、
Job
を利用して明示的に処理をキャンセルすることも出来ますが、
withTimeout()
を使って一定時間後にキャンセルすることが出来ます。
Timeout.kt
fun main() = runBlocking { withTimeout(2_300L) { repeat(1_000) { i -> println("I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]") delay(1_000L) } } }
withTimeout()
で指定した時間経過後にCancellationException
のサブクラスであるTimeoutCancellationException
がthrowされます。
実行
I'm sleeping 0 ... [2021-03-23T18:11:35.863515300Z] [main] I'm sleeping 1 ... [2021-03-23T18:11:36.884656600Z] [main] I'm sleeping 2 ... [2021-03-23T18:11:37.885753700Z] [main] Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2300 ms at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186) at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156) at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497) at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274) at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69) at java.base/java.lang.Thread.run(Thread.java:834)
タイムアウトした際に何かアクションを行う必要がある場合は、
try {...} catch (e: TimeoutCancellationException) {...}
で例外をキャッチして処理を行うか、withTimeoutOrNull()
を使用します。
withTimeoutOrNull()
はwithTimeout()
と同じですが、例外をthrowする代わりにnull
を返します。
TimeoutWithTimeoutOrNull.kt
fun main() = runBlocking { val result = withTimeoutOrNull(2_300L) { repeat(1_000) { i -> println("I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]") delay(1_000L) } "Done" // will get cancelled before it produces this result } println("Result is $result. [${Instant.now()}] [${Thread.currentThread().name}]") }
実行してみると例外が発生しないことが分かります。
実行
I'm sleeping 0 ... [2021-03-23T18:36:16.971694Z] [main] I'm sleeping 1 ... [2021-03-23T18:36:17.992693700Z] [main] I'm sleeping 2 ... [2021-03-23T18:36:18.994699300Z] [main] Result is null. [2021-03-23T18:36:19.263762200Z] [main]
Asynchronous timeout and resources
withTimeout()
でのタイムアウトイベントはブロック内で実行している処理とは非同期のため、
常に発生する可能性があります。
ブロック内で何らかのリソースを取得し、ブロックの外でリソースを開放する必要がある場合が注意が必要です。
下記のResouce
クラスはクローズできるリソースの例です。
ここでは単純にリソースを取得した際にカウンタをインクリメントして、close()
でデクリメントします。
短いタイムアウトでwithTimeout()
ブロック内でリソースを取得して、外側で開放するのを大量に繰り返す処理を実行してみます。
AsynchronousTimeoutAndResources.kt
var acquired = 0 class Resource { init { acquired++ // Acquire the resource } fun close() = acquired-- // Release the resource } fun main() { runBlocking { repeat(100_000) { // Launch 100K coroutines launch { val resource = withTimeout(60) { // Timeout of 60 ms delay(50) // Delay for 50 ms Resource() // Acquire a resource and return it from withTimeout block } resource.close() // Release the resource } } } // not always print zero // Outside of runBlocking all coroutines have completed println("$acquired [${Instant.now()}] [${Thread.currentThread().name}]") // Print the number of resources still acquired }
実行すると0が表示されません。(0になる場合もある)
実行
158 [2021-03-23T19:08:13.680680100Z] [main]
これをきちんとリソースを開放して0にするためには、
withTimeout()
からリソースを返すのではなく、リソースへの参照を返すようにします。
するとfinally
で確実にリソースを開放することが出来ます。
AsynchronousTimeoutAndResourcesWithTryFinally.kt
fun main() { runBlocking { repeat(100_000) { // Launch 100K coroutines launch { var resource: Resource? = null // Not acquired yet try { withTimeout(60) { // Timeout of 60 ms delay(50) // Delay for 50 ms resource = Resource() // Store a resource to the variable if acquired } // We can do something else with the resource here } finally { resource?.close() // Release the resource if it was acquired } } } } // Outside of runBlocking all coroutines have completed println("$acquired [${Instant.now()}] [${Thread.currentThread().name}]") // Print the number of resources still acquired }
実行すると0になりました。
実行
0 [2021-03-23T19:12:21.912473900Z] [main]
サンプルコードは下記にあげました。
おわり。