KotlinのCoroutineを試す (Coroutine context and dispatchers)
kotlinのcoroutineを試してみたメモです。
kotlin公式のcoroutineチュートリアルのCoroutine context and dispatchersの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html
- Dependency
- Coroutine context and dispatchers
下記バージョンで試してみます。
- kotlin 1.6.10
- kotlinx-coroutines-core:1.6.0
Dependency
gradleを使って試してみます。
build.gradle
plugins { id 'org.jetbrains.kotlin.jvm' version '1.6.10' } group 'com.example.coroutine.kotlin' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0' } compileKotlin { kotlinOptions.jvmTarget = "11" } compileTestKotlin { kotlinOptions.jvmTarget = "11" }
Coroutine context and dispatchers
Dispatchers and threads
coroutineは常にkotlinの標準のライブラリで定義されているCoroutineContext
型のコンテキストで実行されます。
コンテキストはいくつかの要素で集合で、主にJobとdispatcherです。
coroutineのコンテキストには、coroutineがどのスレッドで実行するかを決定するcoroutine dispatcherが含まれています。
coroutine dispatcherはcoroutineの実行を特定のスレッドに限定したり、スレッドプールにdispatchしたり、
特定のスレッドに限定されないように(Unconfined)することが出来ます。
launch
やasync
のようなすべてのcroutine builderはオプショナルでCoroutineContext
をパラメータで取ることが出来ます。
CoroutineContext
パラメータで新しいcoroutineと他のコンテキストを明示的に指定することが出来ます。
DispatchersAndThreads.kt
@ObsoleteCoroutinesApi fun main() = runBlocking<Unit> { launch { // context of the parent, main runBlocking coroutine println("main runBlocking: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]") } launch(Dispatchers.Unconfined) { // not confined -- will work with main thread println("Unconfined: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]") } launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher println("Default: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]") } launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread println("newSingleThreadContext: I'm working in thread [${Thread.currentThread().name}] [${Instant.now()}]") } }
実行
Unconfined: I'm working in thread [2021-12-26T18:43:18.376325900Z] [main] Default: I'm working in thread [2021-12-26T18:43:18.441323700Z] [DefaultDispatcher-worker-1] newSingleThreadContext: I'm working in thread [2021-12-26T18:43:18.451323900Z] [MyOwnThread] main runBlocking: I'm working in thread [2021-12-26T18:43:18.451323900Z] [main]
launch{}
がパラメータ無しの場合、実行したCoroutineScope
のコンテキスト(とdispatcher)を継承します。
上の例ではmain threadのrunBlocking
coroutineのコンテキストを継承します。
Dispatchers.Unconfined
は特別なdispatcherで、これはmain threadで実行されてるように見えますが、実際は異なるメカニズムです。
(詳細は後述)
GlobalScope
でcoroutineが実行された時のデフォルトのdispatcherは、Dispatchers.Default
で表され、
共有バックグラウンドスレッドプールのthreadが使用されます。
newSingleThreadContext
はcoroutineが実行するための専用threadを生成します。
専用threadはとても高コストのリソースなので、実際のアプリケーションでは不要になったらclose()
で解放するか、
top-level変数に保持してアプリケーション全体で再利用する必要があります。
Unconfined vs confined dispatcher
Dispatchers.Unconfined
dispatcherは呼び出し元のthreadでcoroutineを開始します。
しかし途中のsuspend関数までで、再開後はsuspend関数で決定したthreadでcoroutineを再開します。
Unconfined dispatcherはCPUを消費しないcoroutineやUIのような特定のthreadに限定された共有データを更新しないcroutineに適してます。
一方dispatcherはデフォルトで外側のCoroutineScope
から継承されます。
特にrunBlocking
coroutineのデフォルトのdispatcherは呼び出し元のthreadに限定されます。
それを継承するとFIFOスケジューリングを使用して実行をこのthreadに制限することが出来ます。
つまりrunBlocking{}
から継承されたコンテキストを持つcoroutineはmain threadで実行され、
unconfinedはdelayが使用しているデフォルトの実行threadで再開されます。
(Unconfined dispatcherは特定のコーナーケースで役立ちますが、一般的なコードではunconfined dispatcherは使用しないでください)
UnconfinedVsConfinedDispatcher.kt
fun main() = runBlocking<Unit> { launch(Dispatchers.Unconfined) { // not confined -- will work with main thread println("Unconfined: I'm working in thread [${Instant.now()}] [${Thread.currentThread().name}]") delay(500) println("Unconfined: After delay in thread [${Instant.now()}] [${Thread.currentThread().name}]") } launch { // context of the parent, main runBlocking coroutine println("main runBlocking: I'm working in thread [${Instant.now()}] [${Thread.currentThread().name}]") delay(1000) println("main runBlocking: After delay in thread [${Instant.now()}] [${Thread.currentThread().name}]") } }
実行
Unconfined: I'm working in thread [2021-12-24T18:27:45.925559600Z] [main] main runBlocking: I'm working in thread [2021-12-24T18:27:45.980559800Z] [main] Unconfined: After delay in thread [2021-12-24T18:27:46.482559100Z] [kotlinx.coroutines.DefaultExecutor] main runBlocking: After delay in thread [2021-12-24T18:27:46.985561100Z] [main]
Debugging coroutines and threads
Debugging with IDEA
IntelliJでkotlin pluginを利用することでcoroutineのデバッグが出来ます。
Debug tool windowのCoroutinesタブで実行中とsuspend中のcoroutineの情報を確認出来ます。
coroutineは実行中のdispatcherごとにグルーピングされてます。
Debugging using logging
別な方法は、各logでthread名を表示することです。
この機能はロギングフレームワークでサポートされています。
thread名だけではcontextが分からないので、coroutineにはそれを簡単にするための機能があります。
-Dkotlinx.coroutines.debug
のオプションを指定して実行します。
(IntelliJの場合、RunかdebugのEdit Run ConfigurationでVM optionsに設定)
DebuggingCoroutinesAndThreads.kt
fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]") fun main() = runBlocking { val a = async { log("I'm computing a piece of the answer") 6 } val b = async { log("I'm computing another piece of the answer") 7 } log("The answer is ${a.await() * b.await()}") }
3つのcoroutineがあり、runBlockin
内のmain coroutineとa,bのdeferredを計算する2つのcoroutineです。
これらはすべてrunBlocking
のcontextで実行され、main threadに束縛されます。
実行
I'm computing a piece of the answer [2021-12-26T19:08:17.020922300Z] [main @coroutine#2] I'm computing another piece of the answer [2021-12-26T19:08:17.084920100Z] [main @coroutine#3] The answer is 42 [2021-12-26T19:08:17.091919800Z] [main @coroutine#1]
log関数にはthreadと実行中のcoroutineの識別子が付与されます。
これはdebug modeがonの時に作成されたすべてのcoroutineに連番で割り当てられます。
Jumping between threads
下記のコードを-Dkotlinx.coroutines.debug
オプションを指定して実行してみます。
いくつかの新しいテクニックが含まれています。
1つ目は明示的にrunBlocking
にcontextを指定してます。
もう1つはwithContext
を使って同じcoroutineの中でcontextを変更しています。
JumpingBetweenThreads.kt
@ObsoleteCoroutinesApi fun main() { newSingleThreadContext("Ctx1").use { ctx1 -> newSingleThreadContext("Ctx2").use { ctx2 -> runBlocking(ctx1) { log("Started in ctx1") withContext(ctx2) { log("Working in ctx2") } log("Back to ctx1") } } } }
この例ではkotlin標準関数のuse
を使ってnewSingleThreadContext
で生成したthreadが不要になったときに開放しています。
Started in ctx1 [2021-12-27T16:02:33.339543600Z] [Ctx1 @coroutine#1] Working in ctx2 [2021-12-27T16:02:33.428543200Z] [Ctx2 @coroutine#1] Back to ctx1 [2021-12-27T16:02:33.430542500Z] [Ctx1 @coroutine#1]
Job in the context
coroutineのjobはそのcontextの一部で、coroutineContext[Job]
として取得することが出来ます。
JobInTheContext.kt
fun main() = runBlocking { println("My job is ${coroutineContext[Job]}") println(coroutineContext[Job]?.isActive) }
実行結果
My job is BlockingCoroutine{Active}@fad74ee true
Children of a coroutine
coroutineが他のcoroutineのCoroutineScope
で起動されると、CoroutineScope.coroutineContext
経由でcontextを継承し、
新しいcoroutineのjobは親coroutineの子coroutineとなります。
親coroutineがキャンセルされるとすべての子coroutineも再帰的にキャンセルされます。
この親子関係は下記の2つの方法で上書きすることが出来ます。
- coroutine起動時に明示的に異なるスコープが指定された場合(
GlobalScope.launch
など)、親スコープからJob
を継承しません。 - 新しいcoroutienのcontextとして異なる
Job
objectが渡された場合、親スコープのJob
を上書きします。(下記の例)
どちらの場合でも起動されたcoroutineはそのスコープに束縛されず、独立して動きます。
ChildrenOfACoroutine.kt
fun main() = runBlocking { // launch a coroutine to process some kind of incoming request val request = launch { // it spawns two other jobs launch(Job()) { println("job1: I run in my own Job and execute independently! [${Instant.now()}] [${Thread.currentThread().name}]") delay(1000) println("job1: I am not affected by cancellation of the request [${Instant.now()}] [${Thread.currentThread().name}]") } // and the other inherits the parent context launch { delay(100) println("job2: I am a child of the request coroutine [${Instant.now()}] [${Thread.currentThread().name}]") delay(1000) println("job2: I will not execute this line if my parent request is cancelled [${Instant.now()}] [${Thread.currentThread().name}]") } } delay(500) request.cancel() // cancel processing of the request delay(1000) // delay a second to see what happens println("main: Who has survived request cancellation? [${Instant.now()}] [${Thread.currentThread().name}]") }
実行結果
job1: I run in my own Job and execute independently! [2021-12-27T16:54:58.344898600Z] [main] job2: I am a child of the request coroutine [2021-12-27T16:54:58.498931900Z] [main] job1: I am not affected by cancellation of the request [2021-12-27T16:54:59.393901700Z] [main] main: Who has survived request cancellation? [2021-12-27T16:54:59.857902700Z] [main]
Parental responsibilities
親coroutineは常にすべての子coroutineの完了を待ちます。
親coroutineは起動したすべての子coroutineを明示的に追跡する必要はなく、
待つためにJob.join
を使用する必要もありません。
ParentalResponsibilities.kt
fun main() = runBlocking { val request = launch { repeat(3) { i -> launch { delay((i + 1) * 200L) println("Coroutine $i is done [${Instant.now()}] [${Thread.currentThread().name}]") } } println("request: I'm done and I don't explicitly join my children that are still active [${Instant.now()}] [${Thread.currentThread().name}]") } request.join() println("Now processing of the request is complete [${Instant.now()}] [${Thread.currentThread().name}]") }
実行結果
request: I'm done and I don't explicitly join my children that are still active [2021-12-27T17:13:39.104911800Z] [main] Coroutine 0 is done [2021-12-27T17:13:39.366907100Z] [main] Coroutine 1 is done [2021-12-27T17:13:39.571309200Z] [main] Coroutine 2 is done [2021-12-27T17:13:39.775935100Z] [main] Now processing of the request is complete [2021-12-27T17:13:39.776897100Z] [main]
Naming coroutines for debugging
coroutineが頻繁にログを出力する場合や、同じcoroutineのログ出力を関連付ける必要がある場合、自動的にアサインされるidはよいものです。
しかし、coroutineが特定のリクエストや特定のバックグラウンドタスクに紐づく場合、デバッグのために明示的に名前をつけるのがよいです。
CoroutineName
context要素はthread名と同じ役割です。debugモードONの場合にcoroutineを実行しているthread名に含まれます。
下記のコードはこのコンセプトの例です。
NamingCoroutinesForDebugging.kt
fun main() = runBlocking { log("Started main coroutine") val v1 = async(CoroutineName("v1coroutine")) { delay(500) log("Computing v1") 252 } val v2 = async(CoroutineName("v2coroutine")) { delay(1000) log("Computing v2") 6 } log("The answer for v1 / v2 = ${v1.await() / v2.await()}") }
-Dkotlinx.coroutines.debug
オプションをつけて実行してみます。
Started main coroutine [2021-12-27T17:31:51.331218300Z] [main @coroutine#1] Computing v1 [2021-12-27T17:31:51.917218400Z] [main @v1coroutine#2] Computing v2 [2021-12-27T17:31:52.412247300Z] [main @v2coroutine#3] The answer for v1 / v2 = 42 [2021-12-27T17:31:52.417217900Z] [main @coroutine#1]
Combining context elements
coroutine contextに複数の要素を定義したい場合があります。
その場合+
を使用します。
例えば明示的に指定したdispatcherと明示的に指定した名前を持つcoroutineを起動出来ます。
CombiningContextElements.kt
fun main() = runBlocking<Unit> { launch(Dispatchers.Default + CoroutineName("test")) { println("I'm working in thread [${Instant.now()}] [${Thread.currentThread().name}]") } }
-Dkotlinx.coroutines.debug
オプションをつけて実行してみます。
I'm working in thread [2021-12-27T17:40:20.476381100Z] [DefaultDispatcher-worker-1 @test#2]
Coroutine scope
context、子、jobについてまとめてみます。
アプリケーションがライフサイクルを持ち、coroutineではないと仮定します。
例えばAndroidアプリケーションを書き、フェッチ、データ更新、アニメーションなどの非同期処理を行うために
様々なcroutineをAndroid activityのcontextで起動します。
それらのすべてのcoroutineはメモリリークを避けるためにactivityが終了する時にキャンセルさせる必要があります。
もちろんactivityのライフサイクルとそのcoroutineを紐付けて、手動でcontextとjobを手動で操作することも出来ます。
しかし、kotlinx.coroutines
はそれらをカプセル化する抽象化を提供します(CoroutineScop
e)
すべてのcoroutine builderはCoroutineScope
の拡張として定義されているので、すでにcoroutine scopeについては馴染みがあるはずです。
acitivityのライフサイクルに紐付けたCoroutineScope
のインスタンスを作成することで、coroutineのライフサイクルを管理します。
CoroutineScope
のインスタンスはCoroutineScope()
またはMainScope()
というファクトリメソッドで作成出来ます。
前者は汎用目的のスコープを作成し、後者はUI アプリケーション用のスコープを作成し、Dispatchers.Main
をデフォルトのdispatcherとして使用します。
定義されたスコープを使用してこのactivityのスコープ内でcoroutineを起動できます。
下記の例では、10個のcoroutineを異なる時間遅延させて起動します。
CoroutineScope.kt
class Activity { private val mainScope = CoroutineScope(Dispatchers.Default) fun destroy() { mainScope.cancel() } fun doSomething() { repeat(10) { i -> mainScope.launch { delay((i + 1) * 200L) println("Coroutine $i is done [${Instant.now()}] [${Thread.currentThread().name}]") } } } }
main関数ではacitivityを作成し、doSomething()
を呼び、500ms後にactivityをdestroy()
します。
するとdoSomething()
から起動されたすべてのcoroutineがキャンセルされます。
activityがdestroyされたあと、少し待機したあと何もメッセージが出力されないのが分かります。
fun main() = runBlocking { val activity = Activity() activity.doSomething() println("Launched coroutines [${Instant.now()}] [${Thread.currentThread().name}]") delay(500L) println("Destroying activity! [${Instant.now()}] [${Thread.currentThread().name}]") activity.destroy() delay(1000L) }
実行結果
Launched coroutines [2021-12-28T17:12:48.702801200Z] [main] Coroutine 0 is done [2021-12-28T17:12:48.911800400Z] [DefaultDispatcher-worker-1] Coroutine 1 is done [2021-12-28T17:12:49.112800500Z] [DefaultDispatcher-worker-1] Destroying activity! [2021-12-28T17:12:49.252801100Z] [main]
初めの2つのcoroutineだけがメッセージを出力し、それ以外はActivity.destroy()
の中のjob.cancel()
を呼ぶだけでキャンセルされます。
Thread-local data
coroutineに(もしくはcoroutine間)でthread localなデータを渡せると便利な場合があります。
しかし、coroutineは特定のthreadに束縛されてないため、手動で行うとボイラープレートになってします可能性があります。
ThreadLocal
ではasContextElement
という拡張関数が役立ちます。
これは追加のcontext要素を作成し、与えられたThreadLocal
のデータを保持し、coroutineがcontextを切り替えるたびにリストアします。
下記が例です。
ThreadLocalData.kt
val threadLocal = ThreadLocal<String?>() fun main() = runBlocking { threadLocal.set("main") println("Pre-main, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'") val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) { println("Launch start, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'") yield() println("After yield, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'") } job.join() println("Post-main, current thread: [${Thread.currentThread()}]," + " thread local value: '${threadLocal.get()}'") }
上の例ではDispatchers.Default
を使ったバックグラウンドスレッドプールで新しいcoroutineを起動しているため
そのスレッドプールとは異なるthreadで動きますが、coroutineがどのthreadで実行されても
threadLocal.asContextElement(value = "launch")
を使用して指定したthread localの変数の値は保持されています。
-Dkotlinx.coroutines.debug
オプションをつけて実行してみます。
実行結果
Pre-main, current thread: [Thread[main @coroutine#1,5,main]], thread local value: 'main' Launch start, current thread: [Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]], thread local value: 'launch' After yield, current thread: [Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]], thread local value: 'launch' Post-main, current thread: [Thread[main @coroutine#1,5,main]], thread local value: 'main'
対応するcontext要素を設定するのは忘れがちです。
もしそのcoroutineを実行しているthreadが異なる場合、coroutineからアクセスされるthread local変数が予期しない値を保持する場合があります。
このような状況を避けるために、ensurePresent
を使用して不適切な使用に対してfail-fastすることが推奨されます。
ThreadLocal
はfirst-classサポートでkotlinx.coroutines
が提供するどのprimitiveで使用できます。
ただし1つだけ制約があります。
thread localが変更されたとき、新しい値はcoroutineの呼び出しもとに伝搬されず(context要素がすべてのThreadLocal
オブジェクトを追跡できないため)、更新された値は次のsuspensionで失われます。
coroutineでthread localの値を更新するにはwithContext
を使用します。
(詳細はasContextElement
を参照)
あるいは、class Counter(var i: Int)
のようにミュータブルなclassに値を保持し、
それをthread local変数に保持することも出来ます。
しかし、この場合このミュータブルなclassの変数の並行更新に対する同期処理を行う必要があります。
高度な使い方として例えばlogging MDC、トランザクションcontextや他のライブラリのように
データ受け渡しのために内部でthread localを使用している場合、実装すべきThreadContextElement
インターフェースのドキュメントを参照してください。
サンプルコードは下記にあげました。
おわり。