KotlinのCoroutineを試す (Shared mutable state and concurrency)
kotlinのcoroutineを試してみたメモです。
kotlin公式のcoroutineチュートリアルのShared mutable state and concurrencyの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html
下記バージョンで試してみます。
- kotlin 1.6.10
- kotlinx-coroutines-core:1.6.0
Dependency
gradleを使って試してみます。
build.gradle
plugins { id 'org.jetbrains.kotlin.jvm' version '1.6.10' } group 'com.example.coroutine.kotlin' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0' } compileKotlin { kotlinOptions.jvmTarget = "11" } compileTestKotlin { kotlinOptions.jvmTarget = "11" }
Util
実行時間と実行スレッドを表示しながら標準出力するために下記のlog
関数を準備しておきます。
Utils.kt
fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")
Shared mutable state and concurrency
croutineはDispatchers.Default
のようなマルチスレッドディスパッチャを使うとパラレルに実行することが出来ます。
これはすべての通常の並行問題を引き起こします。主な問題は共有された変更可能(mutable)な状態へのアクセスの同期です。
coroutineでのこの問題のいくつかの解決法はマルチスレッドでの解決法とほとんど同じですが、いくつかはユニークなものがあります。
The problem
100個のcoroutineを起動して、1000回同じアクションを実行してみましょう。
さらに比較のために完了を計測してみます。
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { coroutineScope { // scope for coroutines repeat(n) { launch { repeat(k) { action() } } } } } log("Completed ${n * k} actions in $time ms") }
マルチスレッドのDispatchers.Default
を使用して共有された変更可能な変数をインクリメントするシンプルなアクションを実行します。
var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter++ } } log("Counter = $counter") }
実行結果
Completed 100000 actions in 35 ms [2022-03-02T16:49:56.238475400Z] [DefaultDispatcher-worker-3] Counter = 78484 [2022-03-02T16:49:56.281475700Z] [main]
最後に何が表示されましたか? "Counter = 100000"が表示されることはないはずです、
なぜなら100個のcoroutineは何も同期化をせずに複数のスレッドから並行でcounter
をインクリメントするためです。
Volatiles are of no help
変数をvolatile
にすると並行の問題を解決出来るというのは一般的な誤解です。見てみましょう。
@Volatile var volatileCounter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { volatileCounter++ } } log("Counter = $volatileCounter") }
このコードは遅くなりました、しかし"Counter = 100000"は表示されません。
なぜなら、volatile変数は対応する変数の読み書きの線形化を保証しますが(アトミックの意味)、
大きなアクションのアトミック性は提供しません。(ここではインクリメント)
実行結果
Completed 100000 actions in 22 ms [2022-03-02T16:50:29.835904800Z] [DefaultDispatcher-worker-3] Counter = 43396 [2022-03-02T16:50:29.889907900Z] [main]
Thread-safe data structures
threadとcoroutineで動く一般的な解決法は、共有された状態に対して実行する必要がある対応する演算に対して
必要な同期を提供するスレッドセーフなデータ構造を使用することです。
単純なカウンターのケースでは、アトミックなincrementAndGet
操作を持つAtomicInteger
クラスを使用できます。
val atomicCounter = AtomicInteger() fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { atomicCounter.incrementAndGet() } } log("Counter = $atomicCounter") }
実行結果
Completed 100000 actions in 20 ms [2022-03-02T17:02:46.184104100Z] [DefaultDispatcher-worker-3] Counter = 100000 [2022-03-02T17:02:46.238105300Z] [main]
これはこの特定の問題に対する最も早い解決法です。これはプレーンなカウンター、コレクション、キュー、他の標準のデータ構造と
それらの基本的な操作に対して動作します。
しかし、ready-to-use
でthread-safe
な実装を持っていない複雑な状態や複雑な演算子では簡単にスケールしません。
Thread confinement fine-grained
Thread confinement
(スレッド制限)はすべての特定の共有状態へのアクセスを一つのスレッドに制限する、共有の可変状態の問題へのアプローチです。
これは一般的にUI状態が一つのイベントディスパッチまたはアプリケーションに制限されるUIアプリケーションで使われます。
これはシングルスレッドコンテキスを使用するとcoroutineに簡単に適用できます。
val counterContext = newSingleThreadContext("CounterContext") var fineGrainedCounter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // confine each increment to a single-threaded context withContext(counterContext) { fineGrainedCounter++ } } } log("Counter = $fineGrainedCounter") }
実行結果
Completed 100000 actions in 788 ms [2022-03-03T15:28:04.567275600Z] [DefaultDispatcher-worker-2] Counter = 100000 [2022-03-03T15:28:04.596278500Z] [main]
このコードをとても遅く動作します、なぜならこれはfine-grained
(細かい粒度)のスレッド制限を行ってるためです。
それぞれのインクリメントはマルチスレッドのDispatchers.Default
コンテキストからwithContext(counterContext)
ブロックを使用して
シングルスレッドコンテキストへ切り替えます。
Thread confinement coarse-grained
実際にはスレッド制限は大きなチャンクで動作します。例えば状態を更新するビジネスロジックの大きな塊はシングルスレッドに閉じ込められます。
下記の例はそれに似たようなもので、最初にシングルスレッドコンテキストで各coroutineを実行します。
val counterContext2 = newSingleThreadContext("CounterContext") var coarseGrainedCounter = 0 fun main() = runBlocking { // confine everything to a single-threaded context withContext(counterContext2) { massiveRun { coarseGrainedCounter++ } } log("Counter = $coarseGrainedCounter") }
実行結果
Completed 100000 actions in 30 ms [2022-03-03T15:47:55.392836100Z] [CounterContext] Counter = 100000 [2022-03-03T15:47:55.438842200Z] [main]
これはより速く動作し、正しい結果を生成します。
Mutual exclusion
この問題の相互排他な解決法は並行では決して実行されないクリティカルセクションで共有状態のすべての変更を保護することです。
そのためにブロッキングの世界では通常はsynchronized
かReentrantLock
を使います。
coroutineでの代替はMutex
と呼ばれます。これはクリティカルセクションを区切るためにlock
とunlock
関数を持ちます。
大きな違いはMutex.lock()
はサスペンド関数です。これはスレッドをブロックしません。
mutex.lock(); try { ... } finally { mutex.unlock() }
のパターンを簡単に表現するwithLock
拡張関数もあります。
val mutex = Mutex() var mutexCounter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // protect each increment with lock mutex.withLock { mutexCounter++ } } } log("Counter = $mutexCounter") }
実行結果
Completed 100000 actions in 334 ms [2022-03-03T16:30:17.495632100Z] [DefaultDispatcher-worker-7] Counter = 100000 [2022-03-03T16:30:17.544634700Z] [main]
この例でのロックはfine-grained
(細かい粒度)です、そのため代償を払います。
しかし、これは定期的に共有状態を必ず変更する必要があるが、その状態を閉じ込めるためのスレッドがない場合には良い選択です。
Actors
actor
はcoroutine、このcoroutineに閉じ込められカプセル化された状態、
他のcoroutineのコミュニケーションするためのチャネル、の組み合わせで構成されるエンティティです。
単純なactorは関数として書くことができますが、複雑な状態をもつactorはクラスのほうが適しています。
actor
coroutineビルダーがあり、これはactorのメールボックスチェネルをメッセージを受け取るためのそのスコープに結合し、
送信チャネルを結果のジョブオブジェクトに結合し、actorの単一の参照をそのハンドルとして運ぶことに便利です。
actorを使う最初のステップはactorが処理するメッセージのクラスを定義することです。
kotlinのsealed class
はこの目的に適しています。
カウンターをインクリメントするIncCounter
メッセージとその値を取得するGetCounter
メッセージを持つCounterMsg
sealed classを定義します。
後者はレスポンスを送信する必要があります。CompletableDeferred
プリミティブは後で知ることが出来るひとつの値を表し
ここではその目的として使用されます。
// Message types for counterActor sealed class CounterMsg object IntCounter : CounterMsg() // one-way message to increment counter class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
actor
coroutineビルダーを使用してactorを起動する関数を定義します。
// This function launches a new counter actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IntCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }
mainコードは簡単です。
fun main() = runBlocking<Unit> { val counter = counterActor() // create the actor withContext(Dispatchers.Default) { massiveRun { counter.send(IntCounter) } } // send a message to get a counter value from an actor val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) log("Counter = ${response.await()}") counter.close() // shutdown the actor }
実行結果
Completed 100000 actions in 726 ms [2022-03-04T16:31:47.830474300Z] [DefaultDispatcher-worker-4] Counter = 100000 [2022-03-04T16:31:47.901472Z] [main]
actor自身が実行しているコンテキストが何かは関係ありません。
actorはcoroutineでcoroutineはシーケンシャルに実行されます、そのため特定のcoroutineの状態へ限定することは
変更可能な共有状態の問題の解決法として機能します。実際にactorは自身のプライベートな状態を変更できますが、
メッセージを通してのみ互いに作用する事ができます。(あらゆるロックの必要性を除外します)
actorは負荷のある状態ではロックよりもより効率的です。なぜなら、このケースではactorは常に仕事があり、異なるコンテキストへのスイッチが全く必要ないためです。
actor
coroutineビルダーとproduce
coroutineビルダーは対をなします。actorはメッセージを受信するチャネルと関連付けられ、
一方producerはエレメントを送信するチャネルと関連付けられます。
サンプルコードは下記にあげました。
おわり。