KotlinのCoroutineを試す (Channels)
kotlinのcoroutineを試してみたメモです。
kotlin公式のcoroutineチュートリアルのChannelsの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/channels.html
下記バージョンで試してみます。
- kotlin 1.6.10
- kotlinx-coroutines-core:1.6.0
Dependency
gradleを使って試してみます。
build.gradle
plugins { id 'org.jetbrains.kotlin.jvm' version '1.6.10' } group 'com.example.coroutine.kotlin' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0' } compileKotlin { kotlinOptions.jvmTarget = "11" } compileTestKotlin { kotlinOptions.jvmTarget = "11" }
Util
実行時間と実行スレッドを表示しながら標準出力するために下記のlog
関数を準備しておきます。
Utils.kt
fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")
Channels
遅延された値(deferred)はcoroutine間で一つの値を転送する便利な方法を提供します。
Channelは値のstreamを転送する方法を提供します。
Channel basics
Channel
はBlockingQueue
ととても似たコンセプトです。
大きく異なる点はブロッキングのput
の代わりにsuspendのsend
があり、ブロッキングのtake
の代わりにsuspendのreceive
があります。
ChannelBasics.kt
fun main() = runBlocking { val channel = Channel<Int>() launch { // this might be heavy CPU-consuming computation or async logic, we'll just send five squares for (x in 1..5) channel.send(x * x) } // here we print five received integers: repeat(5) { log(channel.receive()) } log("Done!") }
実行結果
1 [2022-01-28T15:13:19.197916300Z] [main] 4 [2022-01-28T15:13:19.261916200Z] [main] 9 [2022-01-28T15:13:19.261916200Z] [main] 16 [2022-01-28T15:13:19.261916200Z] [main] 25 [2022-01-28T15:13:19.262917100Z] [main] Done! [2022-01-28T15:13:19.262917100Z] [main]
Closing and iteration over channels
キューとは違い、これ以上要素が来ないことを示すためにクローズすることが出来ます。
受信側ではchannelから要素を受信するために通常のfor
ループを使うと便利です。
概念的に close
は特別なクローズトークンをchannelへ送るようなものです。
iterationはクローズトークンを受け取るとすぐに停止し、クローズする前に送られたすべての送信された要素は受信されることを保証します。
ClosingAndIterationOverChannels.kt
fun main() = runBlocking { val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x) channel.close() // we're done sending } // here we print received values using `for` loop (until the channel is closed) for (y in channel) log(y) log("Done!") }
実行結果
1 [2022-01-28T15:31:53.502137900Z] [main] 2 [2022-01-28T15:31:53.564137100Z] [main] 3 [2022-01-28T15:31:53.564137100Z] [main] 4 [2022-01-28T15:31:53.564137100Z] [main] 5 [2022-01-28T15:31:53.566136800Z] [main] Done! [2022-01-28T15:31:53.566136800Z] [main]
Building channel producers
要素のシーケンスを生成するcoroutineのパターンはとてもよくあるパターンです。
これはproducer-consumerパターンの一部で、並行なコードでよく見つけることが出来ます。
このようなproducerをchannelをパラメータとして受け取る関数に抽象化することが出来ます。
しかしこれは結果は関数から戻される必要があるという常識に反しています。
produceという名前の便利なcoroutine builderがあり、これはproducer
側でこれを簡単に書くことが出来、
consumeEach
拡張関数はconsumer側でfor
loopを置き換えます。
BuildingChannelProducers.kt
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { for (x in 1..5) send(x * x) } fun main() = runBlocking { val squares = produceSquares() squares.consumeEach { log(it) } log("Done!") }
実行結果
1 [2022-01-28T15:58:58.226226700Z] [main] 4 [2022-01-28T15:58:58.297224200Z] [main] 9 [2022-01-28T15:58:58.298225300Z] [main] 16 [2022-01-28T15:58:58.298225300Z] [main] 25 [2022-01-28T15:58:58.300229900Z] [main] Done! [2022-01-28T15:58:58.304226800Z] [main]
Pipelines
pipelineは値のストリームを可能な限り無限に生成するcoroutineのパターンです。
Pipelines.kt
fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // infinite stream of integers starting from 1 }
そして、他のcoroutineはストリームをconsumeし、なにかの処理をして、他の結果を生成します。
下記の例では値は2乗されます
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (x in numbers) send(x * x) }
メインコードを開始し全体のpipelineをつなぎます。
fun main() = runBlocking { val numbers = produceNumbers() // produces integers from 1 and on val squares = square(numbers) // squares integers repeat(5) { log(squares.receive()) // print first five } log("Done!") // we are done coroutineContext.cancelChildren() // cancel children coroutines }
実行結果
1 [2022-01-28T16:18:08.915881500Z] [main] 4 [2022-01-28T16:18:08.984878800Z] [main] 9 [2022-01-28T16:18:08.985881Z] [main] 16 [2022-01-28T16:18:08.985881Z] [main] 25 [2022-01-28T16:18:08.985881Z] [main] Done! [2022-01-28T16:18:08.985881Z] [main]
coroutineを生成するすべての関数はCoroutineScope
に拡張として定義されており、
アプリケーションで消えないグローバルなcoroutineを持たないように、structured concurrency
に頼る事ができます。
Prime numbers with pipeline
coroutineのpipelineを使って素数を生成する、pipelineの極端な例を見てましょう。
まず数値の無限シーケンスから始めます。
PrimeNumbersWithPipeline.kt
fun CoroutineScope.numberFrom(start: Int) = produce<Int> { var x = start while (true) send(x++) // infinite stream of integers from start }
下記のpipelineは数値のストリームをfilterし、与えられた素数で割り切れるすべての数字を取り除きます。
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int): ReceiveChannel<Int> = produce<Int> { for (x in numbers) if (x % prime != 0) send(x) }
2から数値のストリームを開始し、現在のchannelから素数を取得し、見つかったそれぞれの素数に対して新しいpipelineステージを起動するpipelineを作成します。
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
下記の例は最初の10個の素数を出力し、メインスレッドのコンテキストでpipeline全体を実行します。
すべてのcoroutineはmainのrunBlocking
coroutineのスコープで起動されたので、
開始したすべてのcoroutineの明示的なリストを保持する必要はありません。
最初の10個の素数を出力した後、すべての子coroutineをキャンセルするためにcancelChildren
拡張関数を使用します。
fun main() = runBlocking { var cur = numberFrom(2) repeat(10) { val prime = cur.receive() log(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren() // cancel all children to let main finish }
実行結果
2 [2022-01-28T16:58:25.287202800Z] [main] 3 [2022-01-28T16:58:25.349209100Z] [main] 5 [2022-01-28T16:58:25.349209100Z] [main] 7 [2022-01-28T16:58:25.350207700Z] [main] 11 [2022-01-28T16:58:25.351204700Z] [main] 13 [2022-01-28T16:58:25.351204700Z] [main] 17 [2022-01-28T16:58:25.352206800Z] [main] 19 [2022-01-28T16:58:25.353205800Z] [main] 23 [2022-01-28T16:58:25.354207100Z] [main] 29 [2022-01-28T16:58:25.355204900Z] [main]
標準ライブラリのiterator
coroutine builderを使うことで同じpipelineを生成することが出来ます。
produce
をiterator
、send
をyield
、receive
をnext
、ReceiveChannel
をIterator
に置き換えて、coroutine scopeを取り除けます。
runBlocking
も必要ありません。しかし、上記のようにchannelを使ったpipelineの利点は、Dispatchers.Default
コンテキストで実行する場合、
複数のCPUを使うことが出来ます。
いずれにせよ、これは素数を探すための非常に非実用的な方法です。
実際にはpipelineは他のsuspendの呼び出しがあり(リモートサービスの非同期呼び出しのような)、
これらのpipelineはsequence
やiterator
を使って作ることが出来ません。
なぜなら完全非同期のproduce
とは異なり、これらは任意のsuspendを許可しないためです。
Fan-out
複数のcoroutineは同じchannelから受信することができ、それぞれの間で分担します。
定期的に数値を生成するcoroutine(1秒ごとに10の数字)を開始してみます
FanOut.kt
fun CoroutineScope.produceNumbers2() = produce<Int> { var x = 1 while (true) { send(x++) delay(100) } }
そして、複数のcoroutineプロセッサーを持つことができます。この例では、それらはidを出力し数値を受信するだけです。
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { log("Processor #$id received $msg") } }
それでは、5つのプロセッサーを起動し、約1秒動かしてみます。
fun main() = runBlocking { val producer = produceNumbers2() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() }
実行結果
Processor #0 received 1 [2022-02-02T16:56:47.533826200Z] [main] Processor #0 received 2 [2022-02-02T16:56:47.706824400Z] [main] Processor #1 received 3 [2022-02-02T16:56:47.816824500Z] [main] Processor #2 received 4 [2022-02-02T16:56:47.925825400Z] [main] Processor #3 received 5 [2022-02-02T16:56:48.033823300Z] [main] Processor #4 received 6 [2022-02-02T16:56:48.142822600Z] [main] Processor #0 received 7 [2022-02-02T16:56:48.250414Z] [main] Processor #1 received 8 [2022-02-02T16:56:48.358377400Z] [main]
producer coroutineのキャンセルはchannelをクローズします。
そのため、最終的にはプロセッサーのcoroutineが実行しているchannel上でiterationが終了することに注意してください。
そのうえ、launchProcessor
のコードでfan-out
を実行するため、for
ループでchannel上で明示的に繰り返されることに注意してください。
consumeEach
とは異なり、このfor
ループパターンは複数のcoroutineから使われても完全に安全です。
もしプロセッサーcoroutineの一つが失敗した場合、他のcoroutineはchannelを処理し続け、
consumerEach
から書かれたプロセッサーは常に正常/異常終了の基となるchannelを消費(キャンセル)します。
Fan-in
複数のcoroutineは同じchannelに送信することができます。たとえば、stringのchannelと
指定した時間delayしてこのchannelに指定した文字列を繰り返し送信するsuspend関数があるとします。
FanIn.kt
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } }
文字列を送信する2つのcoroutineを起動して何が起こるか見てみましょう。
(この例ではmain coroutineの子としてmainスレッドのコンテキストで実行します)
fun main() = runBlocking { val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { // receive first six log(channel.receive()) } coroutineContext.cancelChildren() // cancel all children to let main finish }
実行結果
foo [2022-02-03T15:37:09.619993500Z] [main] foo [2022-02-03T15:37:09.820991800Z] [main] BAR! [2022-02-03T15:37:09.915012500Z] [main] foo [2022-02-03T15:37:10.024063500Z] [main] foo [2022-02-03T15:37:10.225077300Z] [main] BAR! [2022-02-03T15:37:10.416076100Z] [main]
Buffered channels
これまで見てきたchannelはbufferを持ちませんでした。
senderとreceiverが出会ったとき(ランデブー)、bufferなしのchannelは要素を移動します。
もし最初に送信が発動されると、受信が発動されるまでsuspendされ、
もし最初に受信が発動されると、送信が発動されるまでsuspendされます。
Channel()
ファクトリ関数とproduce
ビルダーはどちらもoptionalでバッファサイズを指定するためのcapacity
パラメータをとります。
bufferはsenderがsuspendする前に複数の要素を送信することを可能にします。
これはbufferがいっぱいになるとblockする、capacityを指定したBlockingQueue
と同様です。
下記のコードで動きを見てみます。
BufferedChannels.kt
fun main() = runBlocking { val channel = Channel<Int>(4) // create buffered channel val sender = launch { // launch sender coroutine repeat(10) { log("Sending $it") // print before sending each element channel.send(it) // will suspend when buffer is full } } // don't receive anything... just wait.... delay(1000) sender.cancel() // cancel sender coroutine }
capacity 4のバッファーchannelを使って、Sending
を5回表示します
実行結果
Sending 0 [2022-02-03T16:02:42.563411Z] [main] Sending 1 [2022-02-03T16:02:42.610413200Z] [main] Sending 2 [2022-02-03T16:02:42.610413200Z] [main] Sending 3 [2022-02-03T16:02:42.610413200Z] [main] Sending 4 [2022-02-03T16:02:42.610413200Z] [main]
はじめの4つのがbufferに追加され、5つ目を送信しようとするとsenderはsuspendします。
Channels are fair
chaneelへの送信と受信の操作は、複数のcoroutineからの呼び出しの順番に関しては公平です。
これらはfirst-in、first-outの順番で提供され、例えばreceive
を呼び出す最初のcoroutineが要素を取得します。
下記の例では2つのcoroutine ping
とpong
は共有されたtable
channelからball
オブジェクトを受信しています。
ChannelsAreFair.kt
data class Ball(var hits: Int) fun main() = runBlocking { val table = Channel<Ball>() // a shared table launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them } suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ log("$name $ball") delay(300) // wait a bit table.send(ball) // send the ball back } }
ping
coroutineが最初に開始し、最初にballを受信します。
ballをtableへ返した後、ping
coroutineは直ちにballの受信を再開しますが、
ballはpong
coroutineによって受信されます。なぜならすでに待機していたためです。
実行結果
ping Ball(hits=1) [2022-02-03T16:34:02.204584800Z] [main] pong Ball(hits=2) [2022-02-03T16:34:02.554583800Z] [main] ping Ball(hits=3) [2022-02-03T16:34:02.866104600Z] [main] pong Ball(hits=4) [2022-02-03T16:34:03.179647100Z] [main]
時々channelは使われているexecutorの性質によって公平ではないように見える実行をすることがあります。
詳細はこちらのissueを見てください。
https://github.com/Kotlin/kotlinx.coroutines/issues/111
Ticker channels
Ticker channelは特別なランデブーchannelです。
これはこのchannelから最後に消費されてから与えられた時間が経過するたびにUnit
を送信します。
しかし、単体では実用的ではないように見えるかもしれませんが、
複雑なタイムベースのproduce
pipelineや、ウィンドウ処理や、他の時間に依存する処理を行う演算を作成するのに
便利なbuildingブロックです。
Ticker channelはon tick
アクションの実行のためにselect
で使うことが出来ます。
これらのchannelを作成するためにticker
ファクトリメッソッドを使用します。
これ以上の要素がないことを示すために、その上でReceiveChannel.cancel
を使う必要があります。
どのように動くか見てみます。
TickerChannels.kt
fun main() = runBlocking { val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // no initial delay nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay println("Next element is not ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement") // Emulate large consumption delays println("Consumer pauses for 150ms") delay(150) // Next element is available immediately nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // Note that the pause between `receive` calls is taken into account and next element arrives faster nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() // indicate that no more elements are needed }
実行結果
Initial element is available immediately: kotlin.Unit Next element is not ready in 50 ms: null Next element is ready in 100 ms: kotlin.Unit Consumer pauses for 150ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
ticker
はconsumerの停止の可能性を認識していて、デフォルトでは停止が発生していれば次の送信する要素の遅延を調整し、
生成された要素の一定のrateを維持しようとします。
オプショナルで、mode
パラメータがTickerMode.FIXED_DELAY
を指定して、各要素の遅延を一定に維持するようにすることが出来ます。
サンプルコードは下記にあげました。
おわり。