KotlinのCoroutineを試す (Coroutine exceptions handling)

kotlinのcoroutineを試してみたメモです。

kotlin公式のcoroutineチュートリアルのCoroutine exceptions handlingの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/exception-handling.html

下記バージョンで試してみます。

  • kotlin 1.6.10
  • kotlinx-coroutines-core:1.6.0

Dependency

gradleを使って試してみます。

build.gradle

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.6.10'
}

group 'com.example.coroutine.kotlin'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0'
}

compileKotlin {
    kotlinOptions.jvmTarget = "11"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "11"
}

Util

実行時間と実行スレッドを表示しながら標準出力するために下記のlog関数を準備しておきます。

Utils.kt

fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")

Coroutine exceptions handling

このセクションは例外ハンドリングと例外時のキャンセルを扱います。
我々はすでにsuspendポイントでキャンセルされたcoroutineはCancellationExceptionを投げ、
coroutineの機構によって無視されることを知っています。
ここではもしキャンセルの間に例外が投げられたときや、同じcoroutineの複数の子が例外を投げたときに何が起こるか見ていきます。

Exception propagation

coroutine builderには2つの種類があります。
例外を自動的に伝播する(launchactor)、またはユーザへそれらを公開する(asyncproduce)。
これらのbuilderが他のcoroutineの子ではないルートのcoroutineを作成するために使われた場合、
前者のbuilderはJavaThread.uncaughtExceptionHandlerと同じように、例外をキャッチされない例外として扱い、
一方後者は例えばawaitreceiveを経由して最後の例外を消費するようにユーザに依存します。
(producereceiveChannelsのセクションで扱っています)

これはGlobalScopeを使ったルートcoroutineを作成する簡単な例で実際にやってみることが出来ます。

GlobalScopeは自明ではない方法でバックファイアが起こる(裏目に出る)繊細なAPIです。
アプリケーション全体でルートcoroutineを作ることは、珍しい正当なGlobalScopeの使い方のひとつです。
そのため、@OptIn(DelicateCoroutinesApi::class)GlobalScopeを使うことを明示的にopt-inすべきです。

ExceptionPropagation.kt

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val job = GlobalScope.launch { // root coroutine with launch
        log("Throwing exception from launch")
        throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    log("Joined failed job")
    val deferred = GlobalScope.async { // root coroutine with async
        log("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }
    try {
        deferred.await()
        log("Unreached")
    } catch (e: ArithmeticException) {
        log("Caught ArithmeticException")
    }
}

実行結果

Throwing exception from launch [2022-02-09T15:43:03.905020700Z] [DefaultDispatcher-worker-1]
Joined failed job [2022-02-09T15:43:03.956020600Z] [main]
Throwing exception from async [2022-02-09T15:43:03.957017800Z] [DefaultDispatcher-worker-1]
Caught ArithmeticException [2022-02-09T15:43:03.958018900Z] [main]
Exception in thread "DefaultDispatcher-worker-1" java.lang.IndexOutOfBoundsException
    at com.example.coroutine.coroutineexceptionshandling.ExceptionPropagationKt$main$1$job$1.invokeSuspend(ExceptionPropagation.kt:13)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)

CoroutineExceptionHandler

コンソールへのキャッチされない(uncaught)例外の出力のデフォルトの振る舞いをカスタマイズすることが出来ます。
ルートcoroutineのCoroutineExceptionHandlerコンテキスト要素は、このルートcoroutineとすべてのその子のために汎用のcatchブロックとして使われ、カスタム例外ハンドリングされることがります。
これはThread.uncaughtExceptionHandlerと同様です。CoroutineExceptionHandlerの例外からは回復出来ません。
ハンドラが呼ばれた際にそのcoroutineは対応する例外ですでに完了しています。
通常、ハンドラは例外のログ出力、エラーメッセージ出力、終了、アプリケーションの再起動のために使われます。

JVM上ではServiceLoader経由でCoroutineExceptionHandlerを登録すると、すべてのcoroutineに対する
グローバルの例外ハンドラを再定義することが出来ます。
グローバル例外ハンドラは何もハンドラが指定されなかった時に使用されるThread.defaultUncaughtExceptionHandlerと同様です。
Androidでは、uncaughtExceptionPreHandlerがグローバルcoroutine例外ハンドラとして登録されています。

CoroutineExceptionHandlerはキャッチされない(uncaught)例外上でのみ呼び出されます。(他の方法では処理されなかった例外)
特に、すべての子coroutine(他のJobのコンテキストで作成されたcoroutine)はその親のcoroutineに例外のハンドリングを委譲します。
それもまたルートになるまでその親へ委譲され、それらのコンテキストに登録されたCoroutineExceptionHandlerは決して使用されません。
それに加え、asyncbuilderは常にすべての例外をキャッチし、結果のDeferredオブジェクトでそれらを表します。
そのためそのCoroutineExceptionHandlerは何も効果を持ちません。

管理スコープで動いているcoroutineは親に例外が伝搬されません、そしてこのルールから外されます。
さらなる詳細はSupervisionセクションを参照。

CoroutineExceptionHandler.kt

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        log("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) { // root coroutine, running in GlobalScope
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) { // also root, but async instead of launch
        throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await()
    }
    joinAll(job, deferred)
}

実行結果

CoroutineExceptionHandler got java.lang.AssertionError [2022-02-09T16:51:22.995709Z] [DefaultDispatcher-worker-1]

Cancellation and exceptions

キャンセルは例外とかなり関連しています。coroutineはキャンセルのためにCancellationExceptionを内部で使用しており、
これらの例外はすべてのハンドラで無視されます、そのため、catchブロックで得られるように追加のデバッグ情報のソースとしてのみ使用するべきです。
coroutineがJob.cancelを使用してキャンセルされた場合、終了されますが、その親はキャンセルされません。

CancellationAndExceptions.kt

fun main() = runBlocking {
    val job = launch {
        val child = launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                log("Child is cancelled")
            }
        }
        yield()
        log("Cancelling child")
        child.cancel()
        child.join()
        yield()
        log("Parent is not cancelled")
    }
    job.join()
}

実行結果

Cancelling child [2022-02-10T17:21:14.381561800Z] [main]
Child is cancelled [2022-02-10T17:21:14.452562900Z] [main]
Parent is not cancelled [2022-02-10T17:21:14.454562200Z] [main]

もしcoroutineがCancellationException以外の例外に遭遇した場合、その例外でその親をキャンセルします。
この振る舞いは上書きすることは出来ず、structured concurrencyのための安定したcoroutineの階層を提供するために使われます。
CoroutineExceptionHandlerの実装は子のcoroutineのためには使用されません。

ここでの例ではCoroutineExceptionHandlerGlobalScopeで作成されるcoroutineに常に導入されます。
mainのrunBlockingスコープで起動されたcoroutineに例外ハンドラを導入することは、
main coroutineは、ハンドラが導入されていたとしてもその子が例外で完了した場合常にキャンセルされるため意味がありません。

元の例外はそのすべての子が終了した場合にのみ、その親によって処理されます。
下記がそのデモンストレーションです。

CancellationAndExceptions2.kt

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        log("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) {
        launch { // the first child
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    log("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    log("The first child finished its non cancellable block")
                }
            }
        }
        launch { // the second child
            delay(10)
            log("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()
}

実行結果

Second child throws an exception [2022-02-15T13:57:54.165213100Z] [DefaultDispatcher-worker-2]
Children are cancelled, but exception is not handled until all children terminate [2022-02-15T13:57:54.240209600Z] [DefaultDispatcher-worker-2]
The first child finished its non cancellable block [2022-02-15T13:57:54.348211100Z] [DefaultDispatcher-worker-2]
CoroutineExceptionHandler got java.lang.ArithmeticException [2022-02-15T13:57:54.357212400Z] [DefaultDispatcher-worker-2]

Exceptions aggregation

couroutineの複数の子が例外で失敗した場合、一般的なルールは"最初の例外が勝つ"です。
そのため最初の例外が処理されます。最初の例外の後に発生したすべての追加の例外は隠され最初の例外に結び付けられます。

ExceptionsAggregation.kt

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        log("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE) // it gets cancelled when another sibling fails with IOException
            } finally {
                throw ArithmeticException() // the second exception
            }
        }
        launch {
            delay(100)
            throw IOException() // the first exception
        }
        delay(Long.MAX_VALUE)
    }
    job.join()
}

実行結果

CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException] [2022-02-15T16:00:12.208346800Z] [DefaultDispatcher-worker-1]

キャンセル例外は透過的でデフォルトではアンラップされます。

ExceptionsAggregation2.kt

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        log("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) {
        val inner = launch { // all this stack of coroutines will get cancelled
            launch {
                launch {
                    throw IOException() // the original exception
                }
            }
        }
        try {
            inner.join()
        } catch (e: CancellationException) {
            log("Rethrowing CancellationException with original cause")
            throw e // cancellation exception is rethrown, yet the original IOException gets to the handler
        }
    }
    job.join()
}

実行結果

Rethrowing CancellationException with original cause [2022-02-15T16:08:17.987100900Z] [DefaultDispatcher-worker-4]
CoroutineExceptionHandler got java.io.IOException [2022-02-15T16:08:18.058098200Z] [DefaultDispatcher-worker-4]

Supervision

これまで学習してきた通り、キャンセルはがoroutineの全階層に伝搬する双方向性の関係です。
一方向性のキャンセルが必要なケースをみてみましょう。

そのケースの良い例はUIのスコープで定義されたUIコンポーネントとjobです。
もしUIの子タスクのいくつかが失敗した場合、常にUIコンポーネント全体をキャンセル(事実上のkill)する必要はありません。
しかし、UIコンポーネントが破棄されると(そしてそのjobがキャンセルされると)、すべての子のjobの結果はもはや不要なため
すべての子jobをキャンセルする必要があります。

別の例としては複数の子のjobを生成するサーバプロセスです。それらの実行を管理、失敗の追跡、失敗したものの再起動をする必要があります。

Supervision job

SupervisorJobはこれらの目的のために使用できます。これは通常のJobと同様です。唯一の例外は下方へのみキャンセルが伝搬されることです。
これは下記の例で簡単に実証出来ます。

SupervisionJob.kt

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // launch the first child -- its exception is ignored for this example (don't do this in practice!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
            log("The first child is failing")
            throw AssertionError("The first child is cancelled")
        }
        // launch the second child
        val secondChild = launch {
            firstChild.join()
            // Cancellation of the first child is not propagated to the second child
            log("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // But cancellation of the supervisor is propagated
                log("The second child is cancelled because the supervisor was cancelled")
            }
        }
        // wait until the first child fails & completes
        firstChild.join()
        log("Cancelling the supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}

実行結果

The first child is failing [2022-02-16T16:18:10.956179Z] [main]
The first child is cancelled: true, but the second one is still active [2022-02-16T16:18:11.017180300Z] [main]
Cancelling the supervisor [2022-02-16T16:18:11.018179900Z] [main]
The second child is cancelled because the supervisor was cancelled [2022-02-16T16:18:11.021178900Z] [main]

Supervision scope

coroutineScopeの代わりに、スコープされた並行処理にsupervisorScopeを使うことが出来ます。
これは一方向にのみキャンセルを伝搬し、自身が失敗した場合のみすべての子をキャンセルします。
coroutineScopeがそうするように、完了するまですべての子を待ちます。

SupervisionScope.kt

fun main() = runBlocking {
    try {
        supervisorScope {
            val child = launch {
                try {
                    log("The child is sleeping")
                    delay(Long.MAX_VALUE)
                } finally {
                    log("The child is cancelled")
                }
            }
            // Give our child a chance to execute and print using yield
            yield()
            log("Throwing an exception from the scope")
            throw AssertionError()
        }
    } catch (e: AssertionError) {
        log("Caught an assertion error")
    }
}

実行結果

The child is sleeping [2022-02-16T16:39:00.500962Z] [main]
Throwing an exception from the scope [2022-02-16T16:39:00.580962Z] [main]
The child is cancelled [2022-02-16T16:39:00.590962100Z] [main]
Caught an assertion error [2022-02-16T16:39:00.592960Z] [main]
Exceptions in supervised coroutines

通常のjobとsupervisor jobの他の重要な違いは例外の扱いです。それぞれの子は自身の例外を例外ハンドリングのメカニズムを利用して処理すべきです。
この違いは子の失敗は親に伝搬しないという事実から来ています。
これはsupervisorScopeの中で直接起動されたcoroutineが、ルートのcoroutinと同様の方法でそのスコープにインストールされたCoroutineExceptionHandlerを使用するということを意味します。
(詳細はCoroutineExceptionHandlerセクションを参照)

ExceptionsInSupervisedCoroutines.kt

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        log("CoroutineExceptionHandler got $exception")
    }
    supervisorScope {
        val child = launch(handler) {
            log("The child throws an exception")
            throw AssertionError()
        }
        log("The scope is completing")
    }
    log("The scope is completed")
}

実行結果

The scope is completing [2022-02-20T06:12:21.581204700Z] [main]
The child throws an exception [2022-02-20T06:12:21.644203300Z] [main]
CoroutineExceptionHandler got java.lang.AssertionError [2022-02-20T06:12:21.656205300Z] [main]
The scope is completed [2022-02-20T06:12:21.656205300Z] [main]

 
サンプルコードは下記にあげました。

github.com

おわり。