KotlinのCoroutineを試す (Shared mutable state and concurrency)

kotlinのcoroutineを試してみたメモです。

kotlin公式のcoroutineチュートリアルのShared mutable state and concurrencyの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html

下記バージョンで試してみます。

  • kotlin 1.6.10
  • kotlinx-coroutines-core:1.6.0

Dependency

gradleを使って試してみます。

build.gradle

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.6.10'
}

group 'com.example.coroutine.kotlin'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0'
}

compileKotlin {
    kotlinOptions.jvmTarget = "11"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "11"
}

Util

実行時間と実行スレッドを表示しながら標準出力するために下記のlog関数を準備しておきます。

Utils.kt

fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")

Shared mutable state and concurrency

croutineはDispatchers.Defaultのようなマルチスレッドディスパッチャを使うとパラレルに実行することが出来ます。
これはすべての通常の並行問題を引き起こします。主な問題は共有された変更可能(mutable)な状態へのアクセスの同期です。
coroutineでのこの問題のいくつかの解決法はマルチスレッドでの解決法とほとんど同じですが、いくつかはユニークなものがあります。

The problem

100個のcoroutineを起動して、1000回同じアクションを実行してみましょう。
さらに比較のために完了を計測してみます。

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    log("Completed ${n * k} actions in $time ms")
}

マルチスレッドのDispatchers.Defaultを使用して共有された変更可能な変数をインクリメントするシンプルなアクションを実行します。

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    log("Counter = $counter")
}

実行結果

Completed 100000 actions in 35 ms [2022-03-02T16:49:56.238475400Z] [DefaultDispatcher-worker-3]
Counter = 78484 [2022-03-02T16:49:56.281475700Z] [main]

最後に何が表示されましたか? "Counter = 100000"が表示されることはないはずです、
なぜなら100個のcoroutineは何も同期化をせずに複数のスレッドから並行でcounterをインクリメントするためです。

Volatiles are of no help

変数をvolatileにすると並行の問題を解決出来るというのは一般的な誤解です。見てみましょう。

@Volatile
var volatileCounter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            volatileCounter++
        }
    }
    log("Counter = $volatileCounter")
}

このコードは遅くなりました、しかし"Counter = 100000"は表示されません。
なぜなら、volatile変数は対応する変数の読み書きの線形化を保証しますが(アトミックの意味)、
大きなアクションのアトミック性は提供しません。(ここではインクリメント)

実行結果

Completed 100000 actions in 22 ms [2022-03-02T16:50:29.835904800Z] [DefaultDispatcher-worker-3]
Counter = 43396 [2022-03-02T16:50:29.889907900Z] [main]

Thread-safe data structures

threadとcoroutineで動く一般的な解決法は、共有された状態に対して実行する必要がある対応する演算に対して
必要な同期を提供するスレッドセーフなデータ構造を使用することです。
単純なカウンターのケースでは、アトミックなincrementAndGet操作を持つAtomicIntegerクラスを使用できます。

val atomicCounter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            atomicCounter.incrementAndGet()
        }
    }
    log("Counter = $atomicCounter")
}

実行結果

Completed 100000 actions in 20 ms [2022-03-02T17:02:46.184104100Z] [DefaultDispatcher-worker-3]
Counter = 100000 [2022-03-02T17:02:46.238105300Z] [main]

これはこの特定の問題に対する最も早い解決法です。これはプレーンなカウンター、コレクション、キュー、他の標準のデータ構造と
それらの基本的な操作に対して動作します。
しかし、ready-to-usethread-safeな実装を持っていない複雑な状態や複雑な演算子では簡単にスケールしません。

Thread confinement fine-grained

Thread confinement(スレッド制限)はすべての特定の共有状態へのアクセスを一つのスレッドに制限する、共有の可変状態の問題へのアプローチです。
これは一般的にUI状態が一つのイベントディスパッチまたはアプリケーションに制限されるUIアプリケーションで使われます。
これはシングルスレッドコンテキスを使用するとcoroutineに簡単に適用できます。

val counterContext = newSingleThreadContext("CounterContext")
var fineGrainedCounter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // confine each increment to a single-threaded context
            withContext(counterContext) {
                fineGrainedCounter++
            }
        }
    }
    log("Counter = $fineGrainedCounter")
}

実行結果

Completed 100000 actions in 788 ms [2022-03-03T15:28:04.567275600Z] [DefaultDispatcher-worker-2]
Counter = 100000 [2022-03-03T15:28:04.596278500Z] [main]

このコードをとても遅く動作します、なぜならこれはfine-grained(細かい粒度)のスレッド制限を行ってるためです。
それぞれのインクリメントはマルチスレッドのDispatchers.DefaultコンテキストからwithContext(counterContext)ブロックを使用して
シングルスレッドコンテキストへ切り替えます。

Thread confinement coarse-grained

実際にはスレッド制限は大きなチャンクで動作します。例えば状態を更新するビジネスロジックの大きな塊はシングルスレッドに閉じ込められます。
下記の例はそれに似たようなもので、最初にシングルスレッドコンテキストで各coroutineを実行します。

val counterContext2 = newSingleThreadContext("CounterContext")
var coarseGrainedCounter = 0

fun main() = runBlocking {
    // confine everything to a single-threaded context
    withContext(counterContext2) {
        massiveRun {
            coarseGrainedCounter++
        }
    }
    log("Counter = $coarseGrainedCounter")
}

実行結果

Completed 100000 actions in 30 ms [2022-03-03T15:47:55.392836100Z] [CounterContext]
Counter = 100000 [2022-03-03T15:47:55.438842200Z] [main]

これはより速く動作し、正しい結果を生成します。

Mutual exclusion

この問題の相互排他な解決法は並行では決して実行されないクリティカルセクションで共有状態のすべての変更を保護することです。
そのためにブロッキングの世界では通常はsynchronizedReentrantLockを使います。
coroutineでの代替はMutexと呼ばれます。これはクリティカルセクションを区切るためにlockunlock関数を持ちます。
大きな違いはMutex.lock()サスペンド関数です。これはスレッドをブロックしません。

mutex.lock(); try { ... } finally { mutex.unlock() }のパターンを簡単に表現するwithLock拡張関数もあります。

val mutex = Mutex()
var mutexCounter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // protect each increment with lock
            mutex.withLock {
                mutexCounter++
            }
        }
    }
    log("Counter = $mutexCounter")
}

実行結果

Completed 100000 actions in 334 ms [2022-03-03T16:30:17.495632100Z] [DefaultDispatcher-worker-7]
Counter = 100000 [2022-03-03T16:30:17.544634700Z] [main]

この例でのロックはfine-grained(細かい粒度)です、そのため代償を払います。
しかし、これは定期的に共有状態を必ず変更する必要があるが、その状態を閉じ込めるためのスレッドがない場合には良い選択です。

Actors

actorはcoroutine、このcoroutineに閉じ込められカプセル化された状態、
他のcoroutineのコミュニケーションするためのチャネル、の組み合わせで構成されるエンティティです。
単純なactorは関数として書くことができますが、複雑な状態をもつactorはクラスのほうが適しています。

actorcoroutineビルダーがあり、これはactorのメールボックスチェネルをメッセージを受け取るためのそのスコープに結合し、
送信チャネルを結果のジョブオブジェクトに結合し、actorの単一の参照をそのハンドルとして運ぶことに便利です。

actorを使う最初のステップはactorが処理するメッセージのクラスを定義することです。
kotlinのsealed classはこの目的に適しています。
カウンターをインクリメントするIncCounterメッセージとその値を取得するGetCounterメッセージを持つCounterMsgsealed classを定義します。
後者はレスポンスを送信する必要があります。CompletableDeferredプリミティブは後で知ることが出来るひとつの値を表し
ここではその目的として使用されます。

// Message types for counterActor
sealed class CounterMsg
object IntCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

actorcoroutineビルダーを使用してactorを起動する関数を定義します。

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IntCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

mainコードは簡単です。

fun main() = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IntCounter)
        }
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    log("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}

実行結果

Completed 100000 actions in 726 ms [2022-03-04T16:31:47.830474300Z] [DefaultDispatcher-worker-4]
Counter = 100000 [2022-03-04T16:31:47.901472Z] [main]

actor自身が実行しているコンテキストが何かは関係ありません。
actorはcoroutineでcoroutineはシーケンシャルに実行されます、そのため特定のcoroutineの状態へ限定することは
変更可能な共有状態の問題の解決法として機能します。実際にactorは自身のプライベートな状態を変更できますが、
メッセージを通してのみ互いに作用する事ができます。(あらゆるロックの必要性を除外します)

actorは負荷のある状態ではロックよりもより効率的です。なぜなら、このケースではactorは常に仕事があり、異なるコンテキストへのスイッチが全く必要ないためです。

actorcoroutineビルダーとproducecoroutineビルダーは対をなします。actorはメッセージを受信するチャネルと関連付けられ、
一方producerはエレメントを送信するチャネルと関連付けられます。

 
サンプルコードは下記にあげました。

github.com

おわり。