KotlinのCoroutineを試す (Coroutine context and dispatchers)

kotlinのcoroutineを試してみたメモです。

kotlin公式のcoroutineチュートリアルのCoroutine context and dispatchersの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html

下記バージョンで試してみます。

  • kotlin 1.6.10
  • kotlinx-coroutines-core:1.6.0

Dependency

gradleを使って試してみます。

build.gradle

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.6.10'
}

group 'com.example.coroutine.kotlin'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0'
}

compileKotlin {
    kotlinOptions.jvmTarget = "11"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "11"
}

Coroutine context and dispatchers

Dispatchers and threads

coroutineは常にkotlinの標準のライブラリで定義されているCoroutineContext型のコンテキストで実行されます。
コンテキストはいくつかの要素で集合で、主にJobとdispatcherです。

coroutineのコンテキストには、coroutineがどのスレッドで実行するかを決定するcoroutine dispatcherが含まれています。
coroutine dispatcherはcoroutineの実行を特定のスレッドに限定したり、スレッドプールにdispatchしたり、
特定のスレッドに限定されないように(Unconfined)することが出来ます。

launchasyncのようなすべてのcroutine builderはオプショナルでCoroutineContextをパラメータで取ることが出来ます。
CoroutineContextパラメータで新しいcoroutineと他のコンテキストを明示的に指定することが出来ます。

DispatchersAndThreads.kt

@ObsoleteCoroutinesApi
fun main() = runBlocking<Unit> {
    launch {
        // context of the parent, main runBlocking coroutine
        println("main runBlocking: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]")
    }
    launch(Dispatchers.Unconfined) {
        // not confined -- will work with main thread
        println("Unconfined: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]")
    }
    launch(Dispatchers.Default) {
        // will get dispatched to DefaultDispatcher
        println("Default: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]")
    }
    launch(newSingleThreadContext("MyOwnThread")) {
        // will get its own new thread
        println("newSingleThreadContext: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]")
    }
}

実行

Unconfined: I'm working in thread [2021-12-26T18:43:18.376325900Z] [main]
Default: I'm working in thread [2021-12-26T18:43:18.441323700Z] [DefaultDispatcher-worker-1]
newSingleThreadContext: I'm working in thread [2021-12-26T18:43:18.451323900Z] [MyOwnThread]
main runBlocking: I'm working in thread [2021-12-26T18:43:18.451323900Z] [main]

launch{}がパラメータ無しの場合、実行したCoroutineScopeのコンテキスト(とdispatcher)を継承します。
上の例ではmain threadのrunBlocking coroutineのコンテキストを継承します。

Dispatchers.Unconfinedは特別なdispatcherで、これはmain threadで実行されてるように見えますが、実際は異なるメカニズムです。
(詳細は後述)

GlobalScopeでcoroutineが実行された時のデフォルトのdispatcherは、Dispatchers.Defaultで表され、
共有バックグラウンドスレッドプールのthreadが使用されます。

newSingleThreadContextはcoroutineが実行するための専用threadを生成します。
専用threadはとても高コストのリソースなので、実際のアプリケーションでは不要になったらclose()で解放するか、 top-level変数に保持してアプリケーション全体で再利用する必要があります。

Unconfined vs confined dispatcher

Dispatchers.Unconfined dispatcherは呼び出し元のthreadでcoroutineを開始します。
しかし途中のsuspend関数までで、再開後はsuspend関数で決定したthreadでcoroutineを再開します。
Unconfined dispatcherはCPUを消費しないcoroutineやUIのような特定のthreadに限定された共有データを更新しないcroutineに適してます。

一方dispatcherはデフォルトで外側のCoroutineScopeから継承されます。
特にrunBlocking coroutineのデフォルトのdispatcherは呼び出し元のthreadに限定されます。
それを継承するとFIFOスケジューリングを使用して実行をこのthreadに制限することが出来ます。
つまりrunBlocking{}から継承されたコンテキストを持つcoroutineはmain threadで実行され、
unconfinedはdelayが使用しているデフォルトの実行threadで再開されます。
(Unconfined dispatcherは特定のコーナーケースで役立ちますが、一般的なコードではunconfined dispatcherは使用しないでください)

UnconfinedVsConfinedDispatcher.kt

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) {
        // not confined -- will work with main thread
        println("Unconfined: I'm working in thread [${Instant.now()}] [${Thread.currentThread().name}]")
        delay(500)
        println("Unconfined: After delay in thread [${Instant.now()}] [${Thread.currentThread().name}]")
    }
    launch {
        // context of the parent, main runBlocking coroutine
        println("main runBlocking: I'm working in thread [${Instant.now()}] [${Thread.currentThread().name}]")
        delay(1000)
        println("main runBlocking: After delay in thread [${Instant.now()}] [${Thread.currentThread().name}]")
    }
}

実行

Unconfined: I'm working in thread [2021-12-24T18:27:45.925559600Z] [main]
main runBlocking: I'm working in thread [2021-12-24T18:27:45.980559800Z] [main]
Unconfined: After delay in thread [2021-12-24T18:27:46.482559100Z] [kotlinx.coroutines.DefaultExecutor]
main runBlocking: After delay in thread [2021-12-24T18:27:46.985561100Z] [main]

Debugging coroutines and threads

Debugging with IDEA

IntelliJでkotlin pluginを利用することでcoroutineのデバッグが出来ます。
Debug tool windowのCoroutinesタブで実行中とsuspend中のcoroutineの情報を確認出来ます。
coroutineは実行中のdispatcherごとにグルーピングされてます。

Debugging using logging

別な方法は、各logでthread名を表示することです。
この機能はロギングフレームワークでサポートされています。
thread名だけではcontextが分からないので、coroutineにはそれを簡単にするための機能があります。
-Dkotlinx.coroutines.debugのオプションを指定して実行します。
(IntelliJの場合、RunかdebugのEdit Run ConfigurationでVM optionsに設定)

DebuggingCoroutinesAndThreads.kt

fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")

fun main() = runBlocking {
    val a = async {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
}

3つのcoroutineがあり、runBlockin内のmain coroutineとa,bのdeferredを計算する2つのcoroutineです。
これらはすべてrunBlockingのcontextで実行され、main threadに束縛されます。

実行

I'm computing a piece of the answer [2021-12-26T19:08:17.020922300Z] [main @coroutine#2]
I'm computing another piece of the answer [2021-12-26T19:08:17.084920100Z] [main @coroutine#3]
The answer is 42 [2021-12-26T19:08:17.091919800Z] [main @coroutine#1]

log関数にはthreadと実行中のcoroutineの識別子が付与されます。
これはdebug modeがonの時に作成されたすべてのcoroutineに連番で割り当てられます。

Jumping between threads

下記のコードを-Dkotlinx.coroutines.debugオプションを指定して実行してみます。

いくつかの新しいテクニックが含まれています。
1つ目は明示的にrunBlockingにcontextを指定してます。
もう1つはwithContextを使って同じcoroutineの中でcontextを変更しています。

JumpingBetweenThreads.kt

@ObsoleteCoroutinesApi
fun main() {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}

この例ではkotlin標準関数のuseを使ってnewSingleThreadContextで生成したthreadが不要になったときに開放しています。

Started in ctx1 [2021-12-27T16:02:33.339543600Z] [Ctx1 @coroutine#1]
Working in ctx2 [2021-12-27T16:02:33.428543200Z] [Ctx2 @coroutine#1]
Back to ctx1 [2021-12-27T16:02:33.430542500Z] [Ctx1 @coroutine#1]

Job in the context

coroutineのjobはそのcontextの一部で、coroutineContext[Job]として取得することが出来ます。

JobInTheContext.kt

fun main() = runBlocking {
    println("My job is ${coroutineContext[Job]}")
    println(coroutineContext[Job]?.isActive)
}

実行結果

My job is BlockingCoroutine{Active}@fad74ee
true

Children of a coroutine

coroutineが他のcoroutineのCoroutineScopeで起動されると、CoroutineScope.coroutineContext経由でcontextを継承し、
新しいcoroutineのjobは親coroutineの子coroutineとなります。
親coroutineがキャンセルされるとすべての子coroutineも再帰的にキャンセルされます。

この親子関係は下記の2つの方法で上書きすることが出来ます。

  • coroutine起動時に明示的に異なるスコープが指定された場合(GlobalScope.launchなど)、親スコープからJobを継承しません。
  • 新しいcoroutienのcontextとして異なるJobobjectが渡された場合、親スコープのJobを上書きします。(下記の例)

どちらの場合でも起動されたcoroutineはそのスコープに束縛されず、独立して動きます。

ChildrenOfACoroutine.kt

fun main() = runBlocking {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        // it spawns two other jobs
        launch(Job()) {
            println("job1: I run in my own Job and execute independently! [${Instant.now()}] [${Thread.currentThread().name}]")
            delay(1000)
            println("job1: I am not affected by cancellation of the request [${Instant.now()}] [${Thread.currentThread().name}]")
        }

        // and the other inherits the parent context
        launch {
            delay(100)
            println("job2: I am a child of the request coroutine [${Instant.now()}] [${Thread.currentThread().name}]")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled [${Instant.now()}] [${Thread.currentThread().name}]")
        }
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation? [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行結果

job1: I run in my own Job and execute independently! [2021-12-27T16:54:58.344898600Z] [main]
job2: I am a child of the request coroutine [2021-12-27T16:54:58.498931900Z] [main]
job1: I am not affected by cancellation of the request [2021-12-27T16:54:59.393901700Z] [main]
main: Who has survived request cancellation? [2021-12-27T16:54:59.857902700Z] [main]

Parental responsibilities

親coroutineは常にすべての子coroutineの完了を待ちます。
親coroutineは起動したすべての子coroutineを明示的に追跡する必要はなく、
待つためにJob.joinを使用する必要もありません。

ParentalResponsibilities.kt

fun main() = runBlocking {
    val request = launch {
        repeat(3) { i ->
            launch {
                delay((i + 1) * 200L)
                println("Coroutine $i is done [${Instant.now()}] [${Thread.currentThread().name}]")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active [${Instant.now()}] [${Thread.currentThread().name}]")
    }
    request.join()
    println("Now processing of the request is complete [${Instant.now()}] [${Thread.currentThread().name}]")
}

実行結果

request: I'm done and I don't explicitly join my children that are still active [2021-12-27T17:13:39.104911800Z] [main]
Coroutine 0 is done [2021-12-27T17:13:39.366907100Z] [main]
Coroutine 1 is done [2021-12-27T17:13:39.571309200Z] [main]
Coroutine 2 is done [2021-12-27T17:13:39.775935100Z] [main]
Now processing of the request is complete [2021-12-27T17:13:39.776897100Z] [main]

Naming coroutines for debugging

coroutineが頻繁にログを出力する場合や、同じcoroutineのログ出力を関連付ける必要がある場合、自動的にアサインされるidはよいものです。
しかし、coroutineが特定のリクエストや特定のバックグラウンドタスクに紐づく場合、デバッグのために明示的に名前をつけるのがよいです。
CoroutineNamecontext要素はthread名と同じ役割です。debugモードONの場合にcoroutineを実行しているthread名に含まれます。

下記のコードはこのコンセプトの例です。

NamingCoroutinesForDebugging.kt

fun main() = runBlocking {
    log("Started main coroutine")
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(1000)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}

-Dkotlinx.coroutines.debugオプションをつけて実行してみます。

Started main coroutine [2021-12-27T17:31:51.331218300Z] [main @coroutine#1]
Computing v1 [2021-12-27T17:31:51.917218400Z] [main @v1coroutine#2]
Computing v2 [2021-12-27T17:31:52.412247300Z] [main @v2coroutine#3]
The answer for v1 / v2 = 42 [2021-12-27T17:31:52.417217900Z] [main @coroutine#1]

Combining context elements

coroutine contextに複数の要素を定義したい場合があります。
その場合+を使用します。
例えば明示的に指定したdispatcherと明示的に指定した名前を持つcoroutineを起動出来ます。

CombiningContextElements.kt

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread [${Instant.now()}] [${Thread.currentThread().name}]")
    }
}

-Dkotlinx.coroutines.debugオプションをつけて実行してみます。

I'm working in thread [2021-12-27T17:40:20.476381100Z] [DefaultDispatcher-worker-1 @test#2]

Coroutine scope

context、子、jobについてまとめてみます。
アプリケーションがライフサイクルを持ち、coroutineではないと仮定します。
例えばAndroidアプリケーションを書き、フェッチ、データ更新、アニメーションなどの非同期処理を行うために
様々なcroutineをAndroid activityのcontextで起動します。
それらのすべてのcoroutineはメモリリークを避けるためにactivityが終了する時にキャンセルさせる必要があります。
もちろんactivityのライフサイクルとそのcoroutineを紐付けて、手動でcontextとjobを手動で操作することも出来ます。
しかし、kotlinx.coroutinesはそれらをカプセル化する抽象化を提供します(CoroutineScope)
すべてのcoroutine builderはCoroutineScopeの拡張として定義されているので、すでにcoroutine scopeについては馴染みがあるはずです。

acitivityのライフサイクルに紐付けたCoroutineScopeインスタンスを作成することで、coroutineのライフサイクルを管理します。
CoroutineScopeインスタンスCoroutineScope()またはMainScope()というファクトリメソッドで作成出来ます。
前者は汎用目的のスコープを作成し、後者はUI アプリケーション用のスコープを作成し、Dispatchers.Mainをデフォルトのdispatcherとして使用します。

定義されたスコープを使用してこのactivityのスコープ内でcoroutineを起動できます。
下記の例では、10個のcoroutineを異なる時間遅延させて起動します。

CoroutineScope.kt

class Activity {
    private val mainScope = CoroutineScope(Dispatchers.Default)

    fun destroy() {
        mainScope.cancel()
    }

    fun doSomething() {
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L)
                println("Coroutine $i is done [${Instant.now()}] [${Thread.currentThread().name}]")
            }
        }
    }
}

main関数ではacitivityを作成し、doSomething()を呼び、500ms後にactivityをdestroy()します。
するとdoSomething()から起動されたすべてのcoroutineがキャンセルされます。
activityがdestroyされたあと、少し待機したあと何もメッセージが出力されないのが分かります。

fun main() = runBlocking {
    val activity = Activity()
    activity.doSomething()
    println("Launched coroutines [${Instant.now()}] [${Thread.currentThread().name}]")
    delay(500L)
    println("Destroying activity! [${Instant.now()}] [${Thread.currentThread().name}]")
    activity.destroy()
    delay(1000L)
}

実行結果

Launched coroutines [2021-12-28T17:12:48.702801200Z] [main]
Coroutine 0 is done [2021-12-28T17:12:48.911800400Z] [DefaultDispatcher-worker-1]
Coroutine 1 is done [2021-12-28T17:12:49.112800500Z] [DefaultDispatcher-worker-1]
Destroying activity! [2021-12-28T17:12:49.252801100Z] [main]

初めの2つのcoroutineだけがメッセージを出力し、それ以外はActivity.destroy()の中のjob.cancel()を呼ぶだけでキャンセルされます。

Thread-local data

coroutineに(もしくはcoroutine間)でthread localなデータを渡せると便利な場合があります。
しかし、coroutineは特定のthreadに束縛されてないため、手動で行うとボイラープレートになってします可能性があります。

ThreadLocalではasContextElementという拡張関数が役立ちます。
これは追加のcontext要素を作成し、与えられたThreadLocalのデータを保持し、coroutineがcontextを切り替えるたびにリストアします。

下記が例です。

ThreadLocalData.kt

val threadLocal = ThreadLocal<String?>()

fun main() = runBlocking {
    threadLocal.set("main")
    println("Pre-main, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'")
}

上の例ではDispatchers.Defaultを使ったバックグラウンドスレッドプールで新しいcoroutineを起動しているため
そのスレッドプールとは異なるthreadで動きますが、coroutineがどのthreadで実行されても
threadLocal.asContextElement(value = "launch")を使用して指定したthread localの変数の値は保持されています。

-Dkotlinx.coroutines.debugオプションをつけて実行してみます。

実行結果

Pre-main, current thread: [Thread[main @coroutine#1,5,main]], thread local value: 'main'
Launch start, current thread: [Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]], thread local value: 'launch'
After yield, current thread: [Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]], thread local value: 'launch'
Post-main, current thread: [Thread[main @coroutine#1,5,main]], thread local value: 'main'

対応するcontext要素を設定するのは忘れがちです。
もしそのcoroutineを実行しているthreadが異なる場合、coroutineからアクセスされるthread local変数が予期しない値を保持する場合があります。
このような状況を避けるために、ensurePresentを使用して不適切な使用に対してfail-fastすることが推奨されます。

ThreadLocalはfirst-classサポートでkotlinx.coroutinesが提供するどのprimitiveで使用できます。
ただし1つだけ制約があります。
thread localが変更されたとき、新しい値はcoroutineの呼び出しもとに伝搬されず(context要素がすべてのThreadLocalオブジェクトを追跡できないため)、更新された値は次のsuspensionで失われます。
coroutineでthread localの値を更新するにはwithContextを使用します。
(詳細はasContextElementを参照)

あるいは、class Counter(var i: Int)のようにミュータブルなclassに値を保持し、
それをthread local変数に保持することも出来ます。
しかし、この場合このミュータブルなclassの変数の並行更新に対する同期処理を行う必要があります。

高度な使い方として例えばlogging MDC、トランザクションcontextや他のライブラリのように
データ受け渡しのために内部でthread localを使用している場合、実装すべきThreadContextElementインターフェースのドキュメントを参照してください。

 
サンプルコードは下記にあげました。

github.com

おわり。