KotlinのCoroutineを試す (Cancellation and timeouts)

kotlinのcoroutineを試してみたメモです。

kotlin公式のcoroutineチュートリアルのCancellation and timeoutsの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/cancellation-and-timeouts.html

下記バージョンで試してみます。

  • kotlin 1.4.31
  • kotlinx-coroutines-core:1.4.3

Dependency

gradleを使って試してみます。

build.gradle

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.4.31'
}

group 'com.example.coroutine.kotlin'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3'
}

compileKotlin {
    kotlinOptions.jvmTarget = "11"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "11"
}

Cancellation and timeouts

Cancelling coroutine execution

launchは戻り値としてjobを返すので、それを利用すると長時間実行されているbackgroud coroutineをキャンセルする事が出来ます。

CancellingCoroutineExecution.kt

fun main() = runBlocking {
    val job = launch {
        repeat(1_000) { i ->
            println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]")
            delay(1_000L)
        }
    }
    delay(2_300L) // delay a bit
    println("main: I'm tired of waiting! [${Instant.now()}] [${Thread.currentThread().name}]")
    job.cancel() // cancels the job
    job.join() // waits for job's completion
    println("main: Now I can quit [${Instant.now()}] [${Thread.currentThread().name}]")
}

job.cancel()が実行されるとすぐにキャンセルされます。

実行

job: I'm sleeping 0 ... [2021-03-17T18:41:29.549447400Z] [main]
job: I'm sleeping 1 ... [2021-03-17T18:41:30.566447600Z] [main]
job: I'm sleeping 2 ... [2021-03-17T18:41:31.568447600Z] [main]
main: I'm tired of waiting! [2021-03-17T18:41:31.845448Z] [main]
main: Now I can quit [2021-03-17T18:41:31.858448900Z] [main]

cancel()join()をまとめたcancelAndJoin()という拡張関数があるので、それを利用すると同様の結果が得られます。

CancellingCoroutineExecutionUsingCancelAndJoin.kt

fun main() = runBlocking {
    val job = launch {
        repeat(1_000) { i ->
            println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]")
            delay(1_000L)
        }
    }
    delay(2_300L) // delay a bit
    println("main: I'm tired of waiting! [${Instant.now()}] [${Thread.currentThread().name}]")
    job.cancelAndJoin() // cancels the job and waits for job's completion
    println("main: Now I can quit [${Instant.now()}] [${Thread.currentThread().name}]")
}

Cancellation is cooperative

suspend関数はすべてキャンセル可能になっており、キャンセルされるとCancellationExceptionがthrowされます。
ただし、下記の様にcoroutineが計算処理をしていてキャンセルをチェックしてない場合はキャンセル出来ません。

CancellationIsCooperative.kt

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 10) { // computation loop, just wastes CPU
            // print a message once a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ... [${Instant.now()}] [${Thread.currentThread().name}]")
                nextPrintTime += 1_000L
            }
        }
    }
    delay(2_300L) // delay a bit
    println("main: I'm tired of waiting! [${Instant.now()}] [${Thread.currentThread().name}]")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行

job: I'm sleeping 0 ... [2021-03-17T19:19:59.956846800Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 1 ... [2021-03-17T19:20:00.898996400Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 2 ... [2021-03-17T19:20:01.898682900Z] [DefaultDispatcher-worker-2]
main: I'm tired of waiting! [2021-03-17T19:20:02.263785700Z] [main]
job: I'm sleeping 3 ... [2021-03-17T19:20:02.898595900Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 4 ... [2021-03-17T19:20:03.898597300Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 5 ... [2021-03-17T19:20:04.898990700Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 6 ... [2021-03-17T19:20:05.898904800Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 7 ... [2021-03-17T19:20:06.898527100Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 8 ... [2021-03-17T19:20:07.898805500Z] [DefaultDispatcher-worker-2]
job: I'm sleeping 9 ... [2021-03-17T19:20:08.898662600Z] [DefaultDispatcher-worker-2]
main: Now I can quit [2021-03-17T19:20:08.900646200Z] [main]

2300ミリ秒実行している間に、"job: I'm sleeping..."は3回実行され、その後すぐにcancelAndJoin()が実行されます。
ただし、whileで計算処理をしていてキャンセルがチェックされないのでこの処理の完了まで待って終了します。
先程の例はdelay()を使用しており、delay()のようなすべてのsuspend関数はキャンセル可能ですが、
この例ではsuspend関数ではないためキャンセル出来ません。

Making computation code cancellable

先程のようなキャンセル出来ない処理をcancellableにするには2つの方法があります。
1つはsuspend関数(yieldなど)を定期的に呼び出してキャンセルをチェックする方法で、
もう1つはキャンセルのステータスをチェックする方法です。
CoroutineScopeの拡張propertyのisActiveを利用してチェックしてみます。

MakingComputationCodeCancellable.kt

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message once a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ... [${Instant.now()}] [${Thread.currentThread().name}]")
                nextPrintTime += 1_000L
            }
        }
    }
    delay(2_300L) // delay a bit
    println("main: I'm tired of waiting! [${Thread.currentThread().name}]")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit [${Thread.currentThread().name}]")
}

実行

job: I'm sleeping 0 ... [2021-03-18T18:14:47.670175200Z] [DefaultDispatcher-worker-1]
job: I'm sleeping 1 ... [2021-03-18T18:14:48.611173400Z] [DefaultDispatcher-worker-1]
job: I'm sleeping 2 ... [2021-03-18T18:14:49.611173700Z] [DefaultDispatcher-worker-1]
main: I'm tired of waiting! [main]
main: Now I can quit [main]

Closing resources with finally

suspend関数はキャンセル時にCancellationExceptionをthrowします。
そのため、try~catchuseを使用するとキャンセルされた場合に終了処理を実行します。
join()cancelAndJoin()は終了処理を待って完了します。

ClosingResourcesWithFinally.kt

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1_000) { i ->
                println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]")
                delay(1_000L)
            }
        } finally {
            println("job: I'm running finally. [${Instant.now()}] [${Thread.currentThread().name}]")
        }
    }
    delay(2_300L) // delay a bit
    println("main: I'm tired of waiting! [${Thread.currentThread().name}]")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit [${Thread.currentThread().name}]")
}

実行

job: I'm sleeping 0 ... [2021-03-18T18:18:20.921596800Z] [main]
job: I'm sleeping 1 ... [2021-03-18T18:18:21.942110500Z] [main]
job: I'm sleeping 2 ... [2021-03-18T18:18:22.943108800Z] [main]
main: I'm tired of waiting! [main]
job: I'm running finally. [2021-03-18T18:18:23.219114800Z] [main]
main: Now I can quit [main]

Run non-cancellable block

先程の例のfinallyでsuspend関数を使用すると、coroutineがキャンセルされるのでCancellationExceptionが発生します。
通常、ファイルなどのclose処理はsuspendではないノンブロッキングなので問題ありませんが、
キャンセルされたcoroutineでsuspendする必要がある場合は、withContext(NonCancellable)を使用して処理出来ます。

RunNonCancellableBlock.kt

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1_000) { i ->
                println("job: I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]")
                delay(1_000L)
            }
        } finally {
            withContext(NonCancellable) {
                println("job: I'm running finally. [${Instant.now()}] [${Thread.currentThread().name}]")
                delay(1_000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancelable. [${Instant.now()}] [${Thread.currentThread().name}]")
            }
        }
    }
    delay(2_300L) // delay a bit
    println("main: I'm tired of waiting! [${Thread.currentThread().name}]")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit [${Thread.currentThread().name}]")
}

実行

job: I'm sleeping 0 ... [2021-03-18T19:12:36.356649300Z] [main]
job: I'm sleeping 1 ... [2021-03-18T19:12:37.371110900Z] [main]
job: I'm sleeping 2 ... [2021-03-18T19:12:38.372137800Z] [main]
main: I'm tired of waiting! [main]
job: I'm running finally. [2021-03-18T19:12:38.662108400Z] [main]
job: And I've just delayed for 1 sec because I'm non-cancelable. [2021-03-18T19:12:39.664122900Z] [main]
main: Now I can quit [main]

Timeout

処理が一定時間を超えてタイムアウトした場合にキャンセルする場合、
Jobを利用して明示的に処理をキャンセルすることも出来ますが、
withTimeout()を使って一定時間後にキャンセルすることが出来ます。

Timeout.kt

fun main() = runBlocking {
    withTimeout(2_300L) {
        repeat(1_000) { i ->
            println("I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]")
            delay(1_000L)
        }
    }
}

withTimeout()で指定した時間経過後にCancellationExceptionのサブクラスであるTimeoutCancellationExceptionがthrowされます。

実行

I'm sleeping 0 ... [2021-03-23T18:11:35.863515300Z] [main]
I'm sleeping 1 ... [2021-03-23T18:11:36.884656600Z] [main]
I'm sleeping 2 ... [2021-03-23T18:11:37.885753700Z] [main]
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2300 ms
  at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
  at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156)
  at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
  at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
  at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69)
  at java.base/java.lang.Thread.run(Thread.java:834)

タイムアウトした際に何かアクションを行う必要がある場合は、
try {...} catch (e: TimeoutCancellationException) {...}で例外をキャッチして処理を行うか、withTimeoutOrNull()を使用します。
withTimeoutOrNull()withTimeout()と同じですが、例外をthrowする代わりにnullを返します。

TimeoutWithTimeoutOrNull.kt

fun main() = runBlocking {
    val result = withTimeoutOrNull(2_300L) {
        repeat(1_000) { i ->
            println("I'm sleeping $i ... [${Instant.now()}] [${Thread.currentThread().name}]")
            delay(1_000L)
        }
        "Done" // will get cancelled before it produces this result
    }
    println("Result is $result. [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行してみると例外が発生しないことが分かります。

実行

I'm sleeping 0 ... [2021-03-23T18:36:16.971694Z] [main]
I'm sleeping 1 ... [2021-03-23T18:36:17.992693700Z] [main]
I'm sleeping 2 ... [2021-03-23T18:36:18.994699300Z] [main]
Result is null. [2021-03-23T18:36:19.263762200Z] [main]

Asynchronous timeout and resources

withTimeout()でのタイムアウトイベントはブロック内で実行している処理とは非同期のため、
常に発生する可能性があります。
ブロック内で何らかのリソースを取得し、ブロックの外でリソースを開放する必要がある場合が注意が必要です。

下記のResouceクラスはクローズできるリソースの例です。
ここでは単純にリソースを取得した際にカウンタをインクリメントして、close()でデクリメントします。
短いタイムアウトwithTimeout()ブロック内でリソースを取得して、外側で開放するのを大量に繰り返す処理を実行してみます。

AsynchronousTimeoutAndResources.kt

var acquired = 0

class Resource {
    init {
        acquired++ // Acquire the resource
    }

    fun close() = acquired-- // Release the resource
}

fun main() {
    runBlocking {
        repeat(100_000) { // Launch 100K coroutines
            launch {
                val resource = withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    Resource() // Acquire a resource and return it from withTimeout block
                }
                resource.close() // Release the resource
            }
        }
    }

    // not always print zero
    // Outside of runBlocking all coroutines have completed
    println("$acquired [${Instant.now()}] [${Thread.currentThread().name}]") // Print the number of resources still acquired
}

実行すると0が表示されません。(0になる場合もある)

実行

158 [2021-03-23T19:08:13.680680100Z] [main]

これをきちんとリソースを開放して0にするためには、
withTimeout()からリソースを返すのではなく、リソースへの参照を返すようにします。
するとfinallyで確実にリソースを開放することが出来ます。

AsynchronousTimeoutAndResourcesWithTryFinally.kt

fun main() {
    runBlocking {
        repeat(100_000) { // Launch 100K coroutines
            launch {
                var resource: Resource? = null // Not acquired yet
                try {
                    withTimeout(60) { // Timeout of 60 ms
                        delay(50) // Delay for 50 ms
                        resource = Resource() // Store a resource to the variable if acquired
                    }
                    // We can do something else with the resource here
                } finally {
                    resource?.close() // Release the resource if it was acquired
                }
            }
        }
    }

    // Outside of runBlocking all coroutines have completed
    println("$acquired [${Instant.now()}] [${Thread.currentThread().name}]") // Print the number of resources still acquired
}

実行すると0になりました。

実行

0 [2021-03-23T19:12:21.912473900Z] [main]

 
サンプルコードは下記にあげました。

github.com

おわり。