KotlinのCoroutineを試す (Channels)

kotlinのcoroutineを試してみたメモです。

kotlin公式のcoroutineチュートリアルのChannelsの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/channels.html

下記バージョンで試してみます。

  • kotlin 1.6.10
  • kotlinx-coroutines-core:1.6.0

Dependency

gradleを使って試してみます。

build.gradle

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.6.10'
}

group 'com.example.coroutine.kotlin'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0'
}

compileKotlin {
    kotlinOptions.jvmTarget = "11"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "11"
}

Util

実行時間と実行スレッドを表示しながら標準出力するために下記のlog関数を準備しておきます。

Utils.kt

fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")

Channels

遅延された値(deferred)はcoroutine間で一つの値を転送する便利な方法を提供します。
Channelは値のstreamを転送する方法を提供します。

Channel basics

ChannelBlockingQueueととても似たコンセプトです。
大きく異なる点はブロッキングputの代わりにsuspendのsendがあり、ブロッキングtakeの代わりにsuspendのreceiveがあります。

ChannelBasics.kt

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { log(channel.receive()) }
    log("Done!")
}

実行結果

1 [2022-01-28T15:13:19.197916300Z] [main]
4 [2022-01-28T15:13:19.261916200Z] [main]
9 [2022-01-28T15:13:19.261916200Z] [main]
16 [2022-01-28T15:13:19.261916200Z] [main]
25 [2022-01-28T15:13:19.262917100Z] [main]
Done! [2022-01-28T15:13:19.262917100Z] [main]

Closing and iteration over channels

キューとは違い、これ以上要素が来ないことを示すためにクローズすることが出来ます。
受信側ではchannelから要素を受信するために通常のforループを使うと便利です。

概念的に closeは特別なクローズトークンをchannelへ送るようなものです。
iterationはクローズトークンを受け取るとすぐに停止し、クローズする前に送られたすべての送信された要素は受信されることを保証します。

ClosingAndIterationOverChannels.kt

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) log(y)
    log("Done!")
}

実行結果

1 [2022-01-28T15:31:53.502137900Z] [main]
2 [2022-01-28T15:31:53.564137100Z] [main]
3 [2022-01-28T15:31:53.564137100Z] [main]
4 [2022-01-28T15:31:53.564137100Z] [main]
5 [2022-01-28T15:31:53.566136800Z] [main]
Done! [2022-01-28T15:31:53.566136800Z] [main]

Building channel producers

要素のシーケンスを生成するcoroutineのパターンはとてもよくあるパターンです。
これはproducer-consumerパターンの一部で、並行なコードでよく見つけることが出来ます。
このようなproducerをchannelをパラメータとして受け取る関数に抽象化することが出来ます。
しかしこれは結果は関数から戻される必要があるという常識に反しています。

produceという名前の便利なcoroutine builderがあり、これはproducer側でこれを簡単に書くことが出来、
consumeEach拡張関数はconsumer側でforloopを置き換えます。

BuildingChannelProducers.kt

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { log(it) }
    log("Done!")
}

実行結果

1 [2022-01-28T15:58:58.226226700Z] [main]
4 [2022-01-28T15:58:58.297224200Z] [main]
9 [2022-01-28T15:58:58.298225300Z] [main]
16 [2022-01-28T15:58:58.298225300Z] [main]
25 [2022-01-28T15:58:58.300229900Z] [main]
Done! [2022-01-28T15:58:58.304226800Z] [main]

Pipelines

pipelineは値のストリームを可能な限り無限に生成するcoroutineのパターンです。

Pipelines.kt

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

そして、他のcoroutineはストリームをconsumeし、なにかの処理をして、他の結果を生成します。
下記の例では値は2乗されます

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

メインコードを開始し全体のpipelineをつなぎます。

fun main() = runBlocking {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    repeat(5) {
        log(squares.receive()) // print first five
    }
    log("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
}

実行結果

1 [2022-01-28T16:18:08.915881500Z] [main]
4 [2022-01-28T16:18:08.984878800Z] [main]
9 [2022-01-28T16:18:08.985881Z] [main]
16 [2022-01-28T16:18:08.985881Z] [main]
25 [2022-01-28T16:18:08.985881Z] [main]
Done! [2022-01-28T16:18:08.985881Z] [main]

coroutineを生成するすべての関数はCoroutineScopeに拡張として定義されており、
アプリケーションで消えないグローバルなcoroutineを持たないように、structured concurrencyに頼る事ができます。

Prime numbers with pipeline

coroutineのpipelineを使って素数を生成する、pipelineの極端な例を見てましょう。
まず数値の無限シーケンスから始めます。

PrimeNumbersWithPipeline.kt

fun CoroutineScope.numberFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

下記のpipelineは数値のストリームをfilterし、与えられた素数で割り切れるすべての数字を取り除きます。

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int): ReceiveChannel<Int> = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

2から数値のストリームを開始し、現在のchannelから素数を取得し、見つかったそれぞれの素数に対して新しいpipelineステージを起動するpipelineを作成します。

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

下記の例は最初の10個の素数を出力し、メインスレッドのコンテキストでpipeline全体を実行します。
すべてのcoroutineはmainのrunBlocking coroutineのスコープで起動されたので、
開始したすべてのcoroutineの明示的なリストを保持する必要はありません。
最初の10個の素数を出力した後、すべての子coroutineをキャンセルするためにcancelChildren拡張関数を使用します。

fun main() = runBlocking {
    var cur = numberFrom(2)
    repeat(10) {
        val prime = cur.receive()
        log(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

実行結果

2 [2022-01-28T16:58:25.287202800Z] [main]
3 [2022-01-28T16:58:25.349209100Z] [main]
5 [2022-01-28T16:58:25.349209100Z] [main]
7 [2022-01-28T16:58:25.350207700Z] [main]
11 [2022-01-28T16:58:25.351204700Z] [main]
13 [2022-01-28T16:58:25.351204700Z] [main]
17 [2022-01-28T16:58:25.352206800Z] [main]
19 [2022-01-28T16:58:25.353205800Z] [main]
23 [2022-01-28T16:58:25.354207100Z] [main]
29 [2022-01-28T16:58:25.355204900Z] [main]

標準ライブラリのiterator coroutine builderを使うことで同じpipelineを生成することが出来ます。
produceiteratorsendyieldreceivenextReceiveChannelIteratorに置き換えて、coroutine scopeを取り除けます。
runBlockingも必要ありません。しかし、上記のようにchannelを使ったpipelineの利点は、Dispatchers.Defaultコンテキストで実行する場合、
複数のCPUを使うことが出来ます。

いずれにせよ、これは素数を探すための非常に非実用的な方法です。
実際にはpipelineは他のsuspendの呼び出しがあり(リモートサービスの非同期呼び出しのような)、
これらのpipelineはsequenceiteratorを使って作ることが出来ません。
なぜなら完全非同期のproduceとは異なり、これらは任意のsuspendを許可しないためです。

Fan-out

複数のcoroutineは同じchannelから受信することができ、それぞれの間で分担します。
定期的に数値を生成するcoroutine(1秒ごとに10の数字)を開始してみます

FanOut.kt

fun CoroutineScope.produceNumbers2() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
        delay(100)
    }
}

そして、複数のcoroutineプロセッサーを持つことができます。この例では、それらはidを出力し数値を受信するだけです。

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        log("Processor #$id received $msg")
    }
}

それでは、5つのプロセッサーを起動し、約1秒動かしてみます。

fun main() = runBlocking {
    val producer = produceNumbers2()
    repeat(5) {
        launchProcessor(it, producer)
    }
    delay(950)
    producer.cancel()
}

実行結果

Processor #0 received 1 [2022-02-02T16:56:47.533826200Z] [main]
Processor #0 received 2 [2022-02-02T16:56:47.706824400Z] [main]
Processor #1 received 3 [2022-02-02T16:56:47.816824500Z] [main]
Processor #2 received 4 [2022-02-02T16:56:47.925825400Z] [main]
Processor #3 received 5 [2022-02-02T16:56:48.033823300Z] [main]
Processor #4 received 6 [2022-02-02T16:56:48.142822600Z] [main]
Processor #0 received 7 [2022-02-02T16:56:48.250414Z] [main]
Processor #1 received 8 [2022-02-02T16:56:48.358377400Z] [main]

producer coroutineのキャンセルはchannelをクローズします。
そのため、最終的にはプロセッサーのcoroutineが実行しているchannel上でiterationが終了することに注意してください。

そのうえ、launchProcessorのコードでfan-outを実行するため、forループでchannel上で明示的に繰り返されることに注意してください。
consumeEachとは異なり、このforループパターンは複数のcoroutineから使われても完全に安全です。
もしプロセッサーcoroutineの一つが失敗した場合、他のcoroutineはchannelを処理し続け、
consumerEachから書かれたプロセッサーは常に正常/異常終了の基となるchannelを消費(キャンセル)します。

Fan-in

複数のcoroutineは同じchannelに送信することができます。たとえば、stringのchannelと
指定した時間delayしてこのchannelに指定した文字列を繰り返し送信するsuspend関数があるとします。

FanIn.kt

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

文字列を送信する2つのcoroutineを起動して何が起こるか見てみましょう。
(この例ではmain coroutineの子としてmainスレッドのコンテキストで実行します)

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        log(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

実行結果

foo [2022-02-03T15:37:09.619993500Z] [main]
foo [2022-02-03T15:37:09.820991800Z] [main]
BAR! [2022-02-03T15:37:09.915012500Z] [main]
foo [2022-02-03T15:37:10.024063500Z] [main]
foo [2022-02-03T15:37:10.225077300Z] [main]
BAR! [2022-02-03T15:37:10.416076100Z] [main]

Buffered channels

これまで見てきたchannelはbufferを持ちませんでした。
senderとreceiverが出会ったとき(ランデブー)、bufferなしのchannelは要素を移動します。
もし最初に送信が発動されると、受信が発動されるまでsuspendされ、
もし最初に受信が発動されると、送信が発動されるまでsuspendされます。

Channel()ファクトリ関数とproduceビルダーはどちらもoptionalでバッファサイズを指定するためのcapacityパラメータをとります。
bufferはsenderがsuspendする前に複数の要素を送信することを可能にします。
これはbufferがいっぱいになるとblockする、capacityを指定したBlockingQueueと同様です。

下記のコードで動きを見てみます。

BufferedChannels.kt

fun main() = runBlocking {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            log("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
}

capacity 4のバッファーchannelを使って、Sendingを5回表示します

実行結果

Sending 0 [2022-02-03T16:02:42.563411Z] [main]
Sending 1 [2022-02-03T16:02:42.610413200Z] [main]
Sending 2 [2022-02-03T16:02:42.610413200Z] [main]
Sending 3 [2022-02-03T16:02:42.610413200Z] [main]
Sending 4 [2022-02-03T16:02:42.610413200Z] [main]

はじめの4つのがbufferに追加され、5つ目を送信しようとするとsenderはsuspendします。

Channels are fair

chaneelへの送信と受信の操作は、複数のcoroutineからの呼び出しの順番に関しては公平です。
これらはfirst-in、first-outの順番で提供され、例えばreceiveを呼び出す最初のcoroutineが要素を取得します。
下記の例では2つのcoroutine pingpongは共有されたtablechannelからballオブジェクトを受信しています。

ChannelsAreFair.kt

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        log("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

pingcoroutineが最初に開始し、最初にballを受信します。
ballをtableへ返した後、pingcoroutineは直ちにballの受信を再開しますが、
ballはpongcoroutineによって受信されます。なぜならすでに待機していたためです。

実行結果

ping Ball(hits=1) [2022-02-03T16:34:02.204584800Z] [main]
pong Ball(hits=2) [2022-02-03T16:34:02.554583800Z] [main]
ping Ball(hits=3) [2022-02-03T16:34:02.866104600Z] [main]
pong Ball(hits=4) [2022-02-03T16:34:03.179647100Z] [main]

時々channelは使われているexecutorの性質によって公平ではないように見える実行をすることがあります。
詳細はこちらのissueを見てください。
https://github.com/Kotlin/kotlinx.coroutines/issues/111

Ticker channels

Ticker channelは特別なランデブーchannelです。
これはこのchannelから最後に消費されてから与えられた時間が経過するたびにUnitを送信します。
しかし、単体では実用的ではないように見えるかもしれませんが、
複雑なタイムベースのproducepipelineや、ウィンドウ処理や、他の時間に依存する処理を行う演算を作成するのに
便利なbuildingブロックです。
Ticker channelはon tickアクションの実行のためにselectで使うことが出来ます。

これらのchannelを作成するためにtickerファクトリメッソッドを使用します。
これ以上の要素がないことを示すために、その上でReceiveChannel.cancelを使う必要があります。

どのように動くか見てみます。

TickerChannels.kt

fun main() = runBlocking {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // no initial delay

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}

実行結果

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

tickerはconsumerの停止の可能性を認識していて、デフォルトでは停止が発生していれば次の送信する要素の遅延を調整し、
生成された要素の一定のrateを維持しようとします。

オプショナルで、modeパラメータがTickerMode.FIXED_DELAYを指定して、各要素の遅延を一定に維持するようにすることが出来ます。

 
サンプルコードは下記にあげました。

github.com

おわり。