KotlinのCoroutineを試す (Asynchronous Flow)
kotlinのcoroutineを試してみたメモです。
kotlin公式のcoroutineチュートリアルのAsynchronous Flowの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/flow.html
- Dependency
- Util
- Asynchronous Flow
- Representing multiple values
- Flows are cold
- Flow cancellation basics
- Flow builders
- Intermediate flow operators
- Terminal flow operators
- Flows are sequential
- Flow context
- Buffering
- Composing multiple flows
- Flattening flows
- Flow exceptions
- Exception transparency
- Flow completion
- Imperative versus declarative
- Launching flow
- Flow and Reactive Streams
下記バージョンで試してみます。
- kotlin 1.6.10
- kotlinx-coroutines-core:1.6.0
Dependency
gradleを使って試してみます。
build.gradle
plugins { id 'org.jetbrains.kotlin.jvm' version '1.6.10' } group 'com.example.coroutine.kotlin' version '1.0-SNAPSHOT' repositories { mavenCentral() } dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0' } compileKotlin { kotlinOptions.jvmTarget = "11" } compileTestKotlin { kotlinOptions.jvmTarget = "11" }
Util
実行時間と実行スレッドを表示しながら標準出力するために下記のlog
関数を準備しておきます。
Utils.kt
fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")
Asynchronous Flow
suspend関数は非同期で一つの値を返しますが、非同期で複数の計算した値を返すにはどうすればよいでしょうか?
そこでkotlinのFlow
を利用します。
Representing multiple values
kotlinでは複数の値はcollectionsを使用して表現することが出来ます。例えば、3つの値を持つList
を返すsimple
関数があり、
forEach
を利用してすべての値をprintします。
RepresentingMultipleValues.kt
fun simple(): List<Int> = listOf(1, 2, 3) fun main() { simple().forEach { value -> log(value) } }
実行結果
1 [2022-01-27T16:57:22.393380Z] [main] 2 [2022-01-27T16:57:22.440376200Z] [main] 3 [2022-01-27T16:57:22.440376200Z] [main]
Sequences
もしCPUを利用するブロッキングな処理で数を計算する場合(各計算に100msecかかる場合)、Sequence
を使用して数を表現することが出来ます。
RepresentingMultipleValuesSequences.kt
fun simpleSeq(): Sequence<Int> = sequence { // sequence builder for (i in 1..3) { Thread.sleep(500) // pretend we are computing it yield(i) // yield next value } } fun main() { simpleSeq().forEach { value -> log(value) } }
実行結果
1 [2022-01-27T16:56:49.377941600Z] [main] 2 [2022-01-27T16:56:49.942941400Z] [main] 3 [2022-01-27T16:56:50.447941900Z] [main]
このコードは同じ数を表示しますが、それぞれを表示する前に100msec待ちます。
Suspending functions
しかし、上記の計算はコードを実行しているmain threadをブロッキングします。
これらの数値が非同期で計算される場合、simple
関数にsuspend
をつけることでブロッキングなしで計算され、結果はリストで返されます。
RepresentingMultipleValuesSuspending.kt
suspend fun simpleSus(): List<Int> { delay(1000) // pretend we are doing something asynchronous here return listOf(1, 2, 3) } fun main() = runBlocking { simpleSus().forEach { value -> log(value) } }
実行結果
1 [2022-01-27T16:58:24.114585700Z] [main] 2 [2022-01-27T16:58:24.161584100Z] [main] 3 [2022-01-27T16:58:24.162596500Z] [main]
1秒待った後、数字が表示されます。
Flows
List<Int>
を戻り値にした場合、すべての値は一度に返されます。
非同期に計算されたストリームであることを表現するために、同期で計算される値にSequence<Int>
を使うようにFlow<Int>
を使うことが出来ます。
RepresentingMultipleValuesFlows.kt
fun simpleFlow(): Flow<Int> = flow { // flow builder for (i in 1..3) { delay(500) // pretend we are doing something useful here // If use Thread.sleep, the main thread is blocked // Thread.sleep(500) emit(i) // emit next value } } fun main() = runBlocking { // Launch a concurrent coroutine to check if the main thread is blocked launch { for (k in 1..3) { log("I'm not blocked $k") delay(500) } } // Collect the flow simpleFlow().collect { value -> log(value) } }
実行結果
I'm not blocked 1 [2022-01-27T16:59:17.077063400Z] [main] 1 [2022-01-27T16:59:17.566065500Z] [main] I'm not blocked 2 [2022-01-27T16:59:17.661070900Z] [main] 2 [2022-01-27T16:59:18.084596800Z] [main] I'm not blocked 3 [2022-01-27T16:59:18.164251100Z] [main] 3 [2022-01-27T16:59:18.598993Z] [main]
上記のFlow
の例とそれ以前の例では下記の点が異なります
Flow
のbuilder関数としてflowを呼んでいますflow{}
builderブロック内のコードはsuspend出来ますsimpleFlow
関数はsuspendは付与されませんemit
関数を使用して値がemitされますcollect
関数を使用して値がcollectされます
flow{}
内のdelay
をThread.sleep
に変えることも出来ます。その場合main threadはblockingされます。
Flows are cold
Flow
はsequence
と同様にcold streamです。flow
builderの中のコードはcollectされるまで実行されません。
下記のコードを実行すると分かります。
FlowsAreCold.kt
fun coldFlow(): Flow<Int> = flow { log("Flow started") for (i in 1..3) { delay(1000) emit(i) } } fun main() = runBlocking<Unit> { log("Calling coldFlow...") val flow = coldFlow() log("Calling collect...") flow.collect { value -> log(value) } log("Calling collect again...") flow.collect { value -> log(value) } }
実行結果
Calling coldFlow... [2022-01-27T17:00:49.365531700Z] [main] Calling collect... [2022-01-27T17:00:49.420530900Z] [main] Flow started [2022-01-27T17:00:49.424531100Z] [main] 1 [2022-01-27T17:00:50.443529300Z] [main] 2 [2022-01-27T17:00:51.459554700Z] [main] 3 [2022-01-27T17:00:52.464383500Z] [main] Calling collect again... [2022-01-27T17:00:52.464906300Z] [main] Flow started [2022-01-27T17:00:52.464906300Z] [main] 1 [2022-01-27T17:00:53.466936900Z] [main] 2 [2022-01-27T17:00:54.477829Z] [main] 3 [2022-01-27T17:00:55.490575100Z] [main]
これはcoldFlow
関数にsuspendをつけていない主な理由です。
coldFlow
関数は即座に返り、何も待ちません。
flowはcollectで毎回開始されるので、collectが再度呼ばれた後はFlow started
が表示されます。
Flow cancellation basics
Flowはcoroutineの一般的な協調的なキャンセルを遵守します。
delay
のようなキャンセル可能なsuspend関数内でflowがsusupendされている場合、flowのcollectionはキャンセルされることがあります。
下記の例はwithTimeoutOrNull
内でコードを実行し停止した場合、どのようにflowがキャンセルされるかを示しています。
FlowCancellationBasics.kt
fun simpleCancellation(): Flow<Int> = flow { for (i in 1..3) { delay(1000) log("Emitting $i.") emit(i) } } fun main() = runBlocking<Unit> { withTimeoutOrNull(2500) { // Timeout after 2500ms simpleCancellation().collect { value -> log(value) } log("Done") } }
simpleCancellation
関数内で2つの値のみエミットされ表示されています。
実行結果
Emitting 1. [2022-01-27T17:02:16.554302200Z] [main] 1 [2022-01-27T17:02:16.591268800Z] [main] Emitting 2. [2022-01-27T17:02:17.605413900Z] [main] 2 [2022-01-27T17:02:17.605413900Z] [main]
詳細はFlow cancellation checks
セクションを参照。
Flow builders
前の例のflow{}
builderは一番基本的なものです。
簡単にflowを定義する他のbuilderも存在します。
flowOf
builderは固定の値のセットをemitするflowを定義します。- 様々なcollectionやsequenceは
.asFlow()
拡張関数を使用してflowに変換できます
flowから1から3の数値を出力する例は下記のように書けます。
FlowBuilders.kt
fun main() = runBlocking { // Convert an integer range to a flow (1..3).asFlow().collect { value -> log(value) } }
実行結果
1 [2022-01-27T17:05:49.256194300Z] [main] 2 [2022-01-27T17:05:49.328190100Z] [main] 3 [2022-01-27T17:05:49.328190100Z] [main]
Intermediate flow operators
flowはcollectionやsequenceのように演算子で変換することができます。
中間演算子は上流のflowに適用され、下流のflowを返します。
これらの演算子はflowがそうであるようにcoldです。
演算子それ自身はsuspend関数ではありません。即座に実行され、新しい変換されたflowの定義を返します。
基本的な演算子はmap
やfilter
のようなよく知られている名前です。
sequenceとの重要な違いは、これらの演算子内のコードブロックはsuspend関数を呼ぶことが出来ます。
例として、入力リクエストのflowは、suspend関数を実装した長い時間処理が行われる場合でも、map
演算子にマップすることができます。
IntermediateFlowOperators.kt
suspend fun performRequest(request: Int): String { delay(1000) // imitate long-running asynchronous work return "response $request" } fun main() = runBlocking { (1..3).asFlow() // a flow of requests .map { request -> performRequest(request) } .collect { response -> log(response) } }
実行結果
response 1 [2022-01-27T17:06:45.534434300Z] [main] response 2 [2022-01-27T17:06:46.575061100Z] [main] response 3 [2022-01-27T17:06:47.575205200Z] [main]
Transform operator
flowの変換演算子の中で最も一般的なものはtransform
と呼ばれています。
これはmap
やfilter
のように単純な変換のように使用したり、もっと複雑な変換をすることが出来ます。
transform
演算子を使うと、任意の値を任意の回数emit
することが出来ます。
例として、transform
演算子を使うと長時間非同期で処理されるリクエストが実行される前に文字列をemitすることができ、
レスポンスを出力出来ます。
TransformOperator.kt
fun main() = runBlocking { (1..3).asFlow() // a flow of requests .transform { request -> emit("Making request $request") emit(performRequest(request)) } .collect { response -> log(response) } }
実行結果
Making request 1 [2022-01-27T17:07:58.020659100Z] [main] response 1 [2022-01-27T17:07:59.112840900Z] [main] Making request 2 [2022-01-27T17:07:59.112840900Z] [main] response 2 [2022-01-27T17:08:00.121133500Z] [main] Making request 3 [2022-01-27T17:08:00.121133500Z] [main] response 3 [2022-01-27T17:08:01.125523600Z] [main]
Size-limiting operators
take
のようなサイズ制限中間演算子は制限に到達した際に、flowの実行をキャンセルします。
coroutineのキャンセルは常にexceptionがthrowされて実行されます。
そのため、(try { } finally { }
のような)リソース管理する関数はキャンセルされた場合に正常に動作します。
SizeLimitingOperators.kt
fun numbers(): Flow<Int> = flow { try { emit(1) emit(2) log("This line will not execute.") emit(3) } finally { log("Finally in numbers.") } } fun main() = runBlocking { numbers() .take(2) // take only the first two .collect { response -> log(response) } }
実行結果は2つ目の値をemitしたあとに、numbers()
関数内のflow{}
の実行が停止しているのが分かります。
実行結果
1 [2022-01-27T17:09:27.683229600Z] [main] 2 [2022-01-27T17:09:27.736233200Z] [main] Finally in numbers. [2022-01-27T17:09:27.738230800Z] [main]
Terminal flow operators
flowの終端演算子はflowの収集を開始するsuspend関数です。collect
演算子は最も一般的なものですが、
他にも多数の演算子があります。
toList
やtoSet
のように様々なcollectionに変換しますfirst
のように初めの値を取得するものや、single
のような必ず一つの値をemitする演算子がありますreduce
やfold
でflowから値に変換します
TerminalFlowOperators.kt
fun main() = runBlocking { val sum = (1..5).asFlow() .map { it * it } // squares of numbers from 1 to 5 .reduce { a, b -> a + b } // sum them (terminal operator) log(sum) }
実行結果
55 [2022-01-27T17:10:28.989438900Z] [main]
Flows are sequential
それぞれのflowのcollectionはシーケンシャルに動作します(複数のflow上で動作する特別な演算子を除き)。
collectionは終端演算子を呼ぶcoroutineで直接動きます。デフォルトでは新しいcoroutineは起動されません。
それぞれのemitされた値は上流ストリームから下流ストリームの中間演算子で処理され、その後終端演算子へ流れます。
下記の例は、偶数でfilterし、stringにマップする例です。
FlowsAreSequential.kt
fun main() = runBlocking { (1..5).asFlow() .filter { log("Filter $it") it % 2 == 0 } .map { log("Map $it") "string $it" } .collect { log("Collect $it") } }
実行結果
Filter 1 [2022-01-27T17:11:30.153900400Z] [main] Filter 2 [2022-01-27T17:11:30.199899200Z] [main] Map 2 [2022-01-27T17:11:30.200903400Z] [main] Collect string 2 [2022-01-27T17:11:30.207899400Z] [main] Filter 3 [2022-01-27T17:11:30.207899400Z] [main] Filter 4 [2022-01-27T17:11:30.207899400Z] [main] Map 4 [2022-01-27T17:11:30.207899400Z] [main] Collect string 4 [2022-01-27T17:11:30.207899400Z] [main] Filter 5 [2022-01-27T17:11:30.207899400Z] [main]
Flow context
flowのcollectionは常に呼び出し元のcoroutineのcontextで実行されます。
例えば、simple
のflowがある場合、下記のコードはsimple
flowの実装の詳細にかかわらず
コードを書いてる人によって指定されるcontext上で実行されます。
withContext(context) { simple().collect { value -> println(value) // run in the specified context } }
このflowの特性はcontext preservation
と呼ばれます。
よって、デフォルトではflow{}
builder内のコードは対応するflowのcollectorが提供するcontextで実行されます。
例えば、呼び出したthreadを表示し、3つの値をemitするsimple
関数の実装を考えた場合、
FlowContext.kt
fun simple(): Flow<Int> = flow { log("Started simple flow") for (i in 1..3) { emit(i) } } fun main() = runBlocking { simple().collect() { value -> log("Collected $value") } }
実行結果は下記のようになります。
Started simple flow [2022-01-08T20:45:18.735591500Z] [main @coroutine#1] Collected 1 [2022-01-08T20:45:18.797592500Z] [main @coroutine#1] Collected 2 [2022-01-08T20:45:18.797592500Z] [main @coroutine#1] Collected 3 [2022-01-08T20:45:18.797592500Z] [main @coroutine#1]
simple().collect
がmain threadから呼ばれるため、simple
のflowのbodyもまたmain threadで呼ばれています。
これはfast-runningや実行contextを気にしない呼び出し元をブロッキングしない非同期コードには最適なデフォルトです。
Wrong emission withContext
しかし、長時間実行されるCPUを消費するコードはDispatchers.Default
のcontextで実行される必要があり、
UIを更新するコードはDispatchers.Main
のcontextで実行される必要があります。
通常withContextはKotlin coroutineを使用するコードでcontextを変更するために使用されますが、
flow{}
builderのコードはcontext preservation 特性を守る必要があり、異なるcontextからはemit
することは出来ません。
下記のコードを実行してみます。
WrongEmissionWithContext.kt
fun simple3(): Flow<Int> = flow { // The WRONG way to change context for CPU-consuming code in flow builder withContext(Dispatchers.Default) { for (i in 1..3) { Thread.sleep(100) // pretend we are computing it in CPU-consuming way emit(i) // emit next value } } } fun main() = runBlocking { simple3().collect { value -> println(value) } }
実行すると下記のようなエラーになります
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [BlockingCoroutine{Active}@737a3e52, BlockingEventLoop@42e1ea79], but emission happened in [DispatchedCoroutine{Active}@5edc2e34, Dispatchers.Default]. Please refer to 'flow' documentation or use 'flowOn' instead
flowOn operator
exceptionはflow emissionのcotextを変更するために使われるflowOn
関数を参照しています。
flowのcotextを変更する正しい方法は下記の例を見てください。
どのように動くかを見せるために対応するスレッド名を表示しています。
FlowOnOperator.kt
fun simple4(): Flow<Int> = flow { for (i in 1..3) { Thread.sleep(100) log("Emitting $i") emit(i) } }.flowOn(Dispatchers.Default) fun main() = runBlocking { simple4().collect { value -> log("Collected $value") } }
実行結果
Emitting 1 [2022-01-11T17:09:55.967504400Z] [DefaultDispatcher-worker-1 @coroutine#2] Collected 1 [2022-01-11T17:09:56.010500100Z] [main @coroutine#1] Emitting 2 [2022-01-11T17:09:56.124531700Z] [DefaultDispatcher-worker-1 @coroutine#2] Collected 2 [2022-01-11T17:09:56.124531700Z] [main @coroutine#1] Emitting 3 [2022-01-11T17:09:56.236530900Z] [DefaultDispatcher-worker-1 @coroutine#2] Collected 3 [2022-01-11T17:09:56.236530900Z] [main @coroutine#1]
main threadの中でcollectionが発生している間、flow{}
がバックグラウンドthreadでどのように動くか注意してください。
ここで気づく別の事は、flowOn
演算子はflowのデフォルトのシーケンシャルの性質が変更されたことです。
collectionは一つのcoroutine"coroutine#1"で発生し、emissionはcollection coroutineと並行で実行されている別のcoroutine"coroutine#2"で発生します。
flowOn
演算子はそのcontextのCoroutineDispather
を変更する必要がある場合、上流ストリームのための別のcoroutineを生成します。
Buffering
異なるcoroutineで異なるflowの部分を実行することは、flowのcollectするための全体の時間、
とりわけ長時間実行する非同期演算が含まれる場合に役に立ちます。
例えば、simple5
flowによるemissionが遅く(一つの要素を生成するのに100msecかかる)
collectorもまた遅く一つの要素を処理するのに300msecかかる場合を考えてみます。
このようなflowで3つの数字でどのくらい時間がかかるか見てみます。
Buffering.kt
fun simple5(): Flow<Int> = flow { for (i in 1..3) { delay(100) // pretend we are asynchronously waiting 100 ms emit(i) // emit next value } } fun main() = runBlocking { val time = measureTimeMillis { simple5().collect { value -> delay(300) // pretend we are processing it for 300 ms log(value) } } log("Collected in $time ms") } fun log(msg: Int) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")
collection全体を処理するのに約1300msec(それぞれに400msec)かかっています。
実行結果
1 [2022-01-11T17:32:25.891781200Z] [main] 2 [2022-01-11T17:32:26.358782900Z] [main] 3 [2022-01-11T17:32:26.782780500Z] [main] Collected in 1317 ms [2022-01-11T17:32:26.795784400Z] [main]
flowでbuffer
演算子を使うことが出来ます。
シーケンシャルに実行するのとは対照的に、simple5
flow上でemitするコードと並行でcollecting処理を実行することが出来ます。
fun main() = runBlocking { val time = measureTimeMillis { simple5().buffer() .collect { value -> // buffer emissions, don't wait delay(300) // pretend we are processing it for 300 ms log(value) } } log("Collected in $time ms") }
処理するパイプラインを効果的に生成し、最初の数値に100msecだけ待機し、その後それぞれの数値の処理に300msecを費やすだけで
同じ数字を少し早く生成します。
この方法は実行に約1000msecかかります。
実行結果
1 [2022-01-12T17:39:24.206261100Z] [main] 2 [2022-01-12T17:39:24.579888500Z] [main] 3 [2022-01-12T17:39:24.893857400Z] [main] Collected in 1168 ms [2022-01-12T17:39:24.912849Z] [main]
flowOn演算子はCoroutineDispatherを変更する必要がある場合同じバッファリングメカニズムを使用します。
しかしここでは実行contextを変更せずに明示的にbufferingをリクエストしています。
Conflation
flowが操作の一部の結果か操作状態の更新を表す場合、それぞれの値を操作する必要がないかもしれません。
しかし代わりに最新の値を処理します。
この場合、collectorが処理で非常に遅い場合、conflate
演算子は中間値をスキップするために使うことが出来ます。
前の例を元にすると、
Conflation.kt
fun main() = runBlocking { val time = measureTimeMillis { simple5() .conflate() // conflate emissions, don't process each one .collect { value -> delay(300) // pretend we are processing it for 300 ms log(value) } } log("Collected in $time ms") }
初めの値が処理されている間に2つめと3つめの値がすでに生成されています。
そのため、2つ目の値は合成され、最新の値(3つ目)だけがcollectorに届けられます。
実行結果
1 [2022-01-14T19:00:09.291166900Z] [main] 3 [2022-01-14T19:00:09.666164300Z] [main] Collected in 860 ms [2022-01-14T19:00:09.678163Z] [main]
Processing the latest value
Conflationはemitterとcollectorが両方遅い場合に処理をスピードアップする一つの方法です。
これはemitされた値を捨てることで行ってます。
他の方法としては、遅いcollectorをキャンセルし、新しい値がemitされるごとにrestartすることです。
xxx
演算子と本質的に同じロジックとして動作するが新しい値でブロックのコードをキャンセルするxxxLatest
演算子ファミリーがあります。
先程の例をconflate
をcollectLatest
に変更して試してみます。
ProcessingTheLatestValue.kt
fun main() = runBlocking { val time = measureTimeMillis { simple5() .collectLatest { value -> // cancel & restart on the latest value log("Collecting $value") delay(300) // pretend we are processing it for 300 ms log("Done $value") } } log("Collected in $time ms") }
collectLatest
のbodyは300msecかかりますが、新しい値は100msecごとにemitされます。
blockは各値で実行されますが、最後の値のみ完了しています。
実行結果
Collecting 1 [2022-01-16T13:18:12.465516600Z] [main] Collecting 2 [2022-01-16T13:18:12.620517Z] [main] Collecting 3 [2022-01-16T13:18:12.727545100Z] [main] Done 3 [2022-01-16T13:18:13.041528300Z] [main] Collected in 765 ms [2022-01-16T13:18:13.059519Z] [main]
Composing multiple flows
複数のflowを構成するための多くの方法があります。
Zip
kotlin標準ライブラリのSequence.zip
拡張関数のように、2つのflowの対応する値を合成するzip
演算子があります。
Zip.kt
fun main() = runBlocking { val nums = (1..3).asFlow() // numbers 1..3 val strs = flowOf("one", "two", "three") // strings nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string .collect { log(it) } // collect and print }
実行結果
1 -> one [2022-01-16T15:20:41.147086700Z] [main] 2 -> two [2022-01-16T15:20:41.179085100Z] [main] 3 -> three [2022-01-16T15:20:41.181086200Z] [main]
Combine
flowが変数か演算子の最新の値を表す場合、対応するflowの最新の値による計算と、
いずれかの上流のflowが値をemitするたびに再計算が必要となる場合があります。
その対応する演算子のファミリーはcombine
と呼ばれます。
例えば、前の例の数字が300msecごとに更新されると、文字列は400msecごとに更新され、zip
演算子を使っての圧縮は
400msecごとに出力されるにもかかわらず、同じ結果となります。
この例では各要素を遅延させ、sample flowをemitするコードをより宣言的で短くするためにonEach
中間演算子を使用しています。
Combine.kt
fun main() = runBlocking { val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip" .collect { value -> // collect and print log("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
実行結果
1 -> one at 466 ms from start [2022-01-16T16:30:21.517337400Z] [main] 2 -> two at 843 ms from start [2022-01-16T16:30:21.874084400Z] [main] 3 -> three at 1259 ms from start [2022-01-16T16:30:22.290649300Z] [main]
zip
のかわりにcombine
演算子を使ってみます
fun main() = runBlocking { val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms val startTime = System.currentTimeMillis() // remember the start time nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine" .collect { value -> // collect and print log("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
かなり違う結果となります。nums
とstrs
のflowからそれぞれemissionが出力されるごとに1行ずつ出力されます。
実行結果
1 -> one at 492 ms from start [2022-01-16T16:29:28.372165Z] [main] 2 -> one at 684 ms from start [2022-01-16T16:29:28.542163100Z] [main] 2 -> two at 964 ms from start [2022-01-16T16:29:28.822677Z] [main] 3 -> two at 996 ms from start [2022-01-16T16:29:28.855684900Z] [main] 3 -> three at 1371 ms from start [2022-01-16T16:29:29.229045Z] [main]
Flattening flows
flowは非同期で受信した値のシーケンスを表します、
そのため各値が別のシーケンス値のためのリクエストをトリガーする状況になるのはとても簡単です。
例えば、500msec離れた2つの文字列のflowを返す下記の関数があり、
FlatteningFlows.kt
fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") }
3つの数値のflowがあり、下記のようにそれぞれの値でrequestFlow
を呼ぶ場合
(1..3).asFlow().map { requestFlow(it) }
すると、さらなる処理のために一つのflowへflattenさせる必要があるflowのflow(Flow<Flow<String>>
)となります。
このためにcollectionとsequenceにはflatten
とflatMap
演算子があります。
しかし、flowの非同期の性質があり、flattenの異なるモードが必要になるため
flowにはflatten演算子のファミリーがあります。
flatMapConcat
Concatenating モードはflatMapConcat
とflattenConcat
演算子で実装されています。
これらは対応するシーケンス演算子の最も直接的でアナログなものです。
これらは下記の例のように、次の値のcollectionを開始する前に内側のflowが完了するまで待ちます。
FlatMapConcat.kt
fun main() = runBlocking { val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapConcat { requestFlow(it) } .collect { value -> // collect and print log("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
結果を見ると、flatMapConcat
のシーケンシャルな性質が明らかです。
実行結果
1: First at 142 ms from start [2022-01-16T19:34:41.178374600Z] [main] 1: Second at 700 ms from start [2022-01-16T19:34:41.721341700Z] [main] 2: First at 806 ms from start [2022-01-16T19:34:41.827339500Z] [main] 2: Second at 1310 ms from start [2022-01-16T19:34:42.331340100Z] [main] 3: First at 1422 ms from start [2022-01-16T19:34:42.443340Z] [main] 3: Second at 1923 ms from start [2022-01-16T19:34:42.944370900Z] [main]
flatMapMerge
別のflattenモードはすべての入ってくるflowを並行にcollectし、それらの値を1つのflowにマージすることで
可能な限り早くemitします。
これはflatMapMerge
とflattenMerge
で実装されています。
どちらでもオプショナルでconcurrency
パラメータを取り、同時にcollectされる並行のflowの数を制限します。
(デフォルトはDEFAULT_CONCURRENCY
です)
FlatMapMerge.kt
fun main() = runBlocking { val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print log("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
flatMapMerge
の並行な性質は明らかです。
実行結果
1: First at 182 ms from start [2022-01-16T19:50:42.276387300Z] [main] 2: First at 266 ms from start [2022-01-16T19:50:42.344387100Z] [main] 3: First at 378 ms from start [2022-01-16T19:50:42.456399500Z] [main] 1: Second at 696 ms from start [2022-01-16T19:50:42.774386900Z] [main] 2: Second at 773 ms from start [2022-01-16T19:50:42.851389900Z] [main] 3: Second at 883 ms from start [2022-01-16T19:50:42.961391400Z] [main]
flatMapMerge
はコードブロック(この例では{ requestFlow(it) }
)をシーケンシャルに呼びますが、
結果のflowは並行でcollectすることに注意してください。
この動作はシーケンシャルにmap { requestFlow(it) }
を実行し、そのあと結果に対してflattenMerge
を呼ぶのと同等です。
flatMapLatest
"Processing the latest value"の項で示したcollectLatest
と似た方法として、
新しいflowがemitされるとすぐに以前のflowのcollectionがキャンセルされる"Latest"flattenモードがあります。
これはflatMapLatest
演算子で実装されています。
FlatMapLatest.kt
fun main() = runBlocking { val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow().onEach { delay(100) } // a number every 100 ms .flatMapLatest { requestFlow(it) } .collect { value -> // collect and print log("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
この例の出力はflatMapLatest
がどのように動いているかが分かるよい実例です。
実行結果
1: First at 182 ms from start [2022-01-16T19:59:11.651387400Z] [main] 2: First at 298 ms from start [2022-01-16T19:59:11.750382500Z] [main] 3: First at 409 ms from start [2022-01-16T19:59:11.861384600Z] [main] 3: Second at 921 ms from start [2022-01-16T19:59:12.373383200Z] [main]
flatMapLatest
は新しい値のブロック(この例では{ requestFlow(it) }
)のすべてのコードをキャンセルすることに注意してください。
この例では違いはありませんが、なぜならrequestFlow
の呼び出し自身は速く、suspendではなく、キャンセル出来ないためです。
しかし、そこでdelay
のようなsuspend関数を使った場合これが発生します。
Flow exceptions
Flowのcollectionはemitterや演算子内のコードが例外を投げる時に、例外とともに完了することがあります。
これらの例外を扱ういくつかの方法です。
Collector try and catch
collectorは例外を扱うためにkotlinのtry/catch
を使うことが出来ます。
CollectorTryAndCatch.kt
fun simple6(): Flow<Int> = flow { for (i in 1..3) { log("Emitting $i") emit(i) // emit next value } } fun main() = runBlocking { try { simple6().collect { value -> log(value) check(value <= 1) { "Collected $value" } } } catch (e: Throwable) { log("Caught $e") } }
このコードはcollect
終端演算子内の例外を正常にキャッチし、その後、それ以上の値はemitされてません。
実行結果
Emitting 1 [2022-01-19T16:43:35.716721600Z] [main] 1 [2022-01-19T16:43:35.782721500Z] [main] Emitting 2 [2022-01-19T16:43:35.793725300Z] [main] 2 [2022-01-19T16:43:35.793725300Z] [main] Caught java.lang.IllegalStateException: Collected 2 [2022-01-19T16:43:35.801721700Z] [main]
Everything is caught
前の例ではemitterか中間・終端演算子内でどのような例外が発生しても実際にキャッチします。
例えば、emitされた値をstringにmap
するように変更してみます。
しかし、対応するコードは例外を生成します。
EverythingIsCaught.kt
fun simple7(): Flow<String> = flow { for (i in 1..3) { log("Emitting $i") emit(i) // emit next value } }.map { value -> check(value <= 1) { "Crashed on $value" } "string $value" } fun main() = runBlocking { try { simple7().collect { value -> log(value) } } catch (e: Throwable) { log("Caught $e") } }
実行結果
Emitting 1 [2022-01-19T16:59:30.879808100Z] [main] string 1 [2022-01-19T16:59:30.928812100Z] [main] Emitting 2 [2022-01-19T16:59:30.928812100Z] [main] Caught java.lang.IllegalStateException: Crashed on 2 [2022-01-19T16:59:30.934807600Z] [main]
Exception transparency
しかし、emitterのコードはどのようにして例外ハンドリングの振る舞いをカプセル化出来るのでしょう。
Flowは例外に対して透過的でなくてはならず、try catch
グロックの中のflow{}
builder内で値をemitすることは例外の透過性に違反します。
これは例外を投げるcollectorは前の例のようにtry catch
と使うことで常にcatch出来るということを保証します。
emitterは例外の透過性を守るためにcatch
演算子を使うことが出来、例外ハンドリグのカプセル化することが出来ます。
catch演算子の本体では例外を分析することが出来、どの例外をキャッチしたかによって異なる方法で対応できます。
- 例外は
throw
を使って再スロー出来ます - 例外は
catch
の本体からemit
を使うことで、値をemitすることが出来ます - 例外は無視、ロギング、他のコードによって処理することが出来ます
例として、例外のキャッチ内でテキストをemitしてみます
ExceptionTransparency.kt
fun main() = runBlocking { simple7() .catch { e -> emit("Caught $e") } // emit on exception .collect { value -> log(value) } }
try catch
を使わないですが、結果は同じです。
実行結果
Emitting 1 [2022-01-24T16:39:31.454981900Z] [main] string 1 [2022-01-24T16:39:31.501983300Z] [main] Emitting 2 [2022-01-24T16:39:31.501983300Z] [main] Caught java.lang.IllegalStateException: Crashed on 2 [2022-01-24T16:39:31.510982500Z] [main]
Transparent catch
catch
中間演算子は例外の透過性にもとづき上流の例外のみキャッチすることが出来ます
(catch
より上のすべての演算子からの例外で、catch
の下ではないです)
もしcollect{}
内のブロック(catch
の下に配置されている)が例外をスローする場合、それはキャッチされません。
TransparentCatch.kt
fun simple8(): Flow<Int> = flow { for (i in 1..3) { log("Emitting $i") emit(i) } } fun main() = runBlocking { simple8() .catch { e -> log("Caught $e") } .collect { value -> check(value <= 1) { "Collected $value" } log(value) } }
"Caught ..."のメッセージはcatch演算子があるにもかかわらず表示されません。
実行結果
Emitting 1 [2022-01-24T16:51:46.300799900Z] [main] 1 [2022-01-24T16:51:46.359794800Z] [main] Emitting 2 [2022-01-24T16:51:46.374794Z] [main] Exception in thread "main" java.lang.IllegalStateException: Collected 2
Catching declaratively
onEach
のcollect演算子の本体を移動し、catch
演算子の前に置くことで、
catch
演算子の宣言的な性質とすべての例外を処理する要求を組み合わせることが出来ます。
このflowのcollectionはパラメータなしのcollect()
の呼び出しでトリガーされます。
CatchingDeclaratively.kt
fun main() = runBlocking { simple8() .onEach { value -> check(value <= 1) { "Collected $value" } log(value) } .catch { e -> log("Caught $e") } .collect() }
"Caught ..."メッセージが表示され、try catch
を明示的に使用することなくすべての例外をキャッチすることが出来ます。
実行結果
Emitting 1 [2022-01-24T17:02:56.482095700Z] [main] 1 [2022-01-24T17:02:56.527096700Z] [main] Emitting 2 [2022-01-24T17:02:56.544101300Z] [main] Caught java.lang.IllegalStateException: Collected 2 [2022-01-24T17:02:56.552100200Z] [main]
Flow completion
flowのcollectionが完了するとき(正常または例外で)、アクションの実行が必要な場合があります。
すでに気づいてるかもしれませんが、2つの方法があります。命令形か宣言型です。
Imperative finally block
try catch
にはさらに、collect
の完了にあたりcollectorはアクションを実行するためにfinally
ブロックを使うことが出来ます。
ImperativeFinallyBlock.kt
fun simple9(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking { try { simple9().collect { value -> log(value) } } finally { log("Done") } }
simple9
によって3つの数字が生成され、"Done"を出力します。
実行結果
1 [2022-01-24T17:15:20.824561400Z] [main] 2 [2022-01-24T17:15:20.874559900Z] [main] 3 [2022-01-24T17:15:20.874559900Z] [main] Done [2022-01-24T17:15:20.875558600Z] [main]
Declarative handling
宣言型のアプローチとして、flowにはcollect完了した際に発動されるonCompletion
中間演算子があります。
前の例はonCompletion
を使用して書き換える事ができ、同じ出力となります。
DeclarativeHandling.kt
fun main() = runBlocking { simple9() .onCompletion { log("Done") } .collect { value -> log(value) } }
実行結果
1 [2022-01-25T15:06:29.449730600Z] [main] 2 [2022-01-25T15:06:29.498730500Z] [main] 3 [2022-01-25T15:06:29.498730500Z] [main] Done [2022-01-25T15:06:29.501738700Z] [main]
onCompletion
の有利な点はラムダのnullableのThrowable
パラメータです。
これでflowのcollectionが正常に完了したか、例外が発生したかを判定することが出来ます。
下記の例ではsimple10
flowは1をemitしたあとに例外をスローします
fun simple10(): Flow<Int> = flow { emit(1) throw RuntimeException() } fun main() = runBlocking { simple10() .onCompletion { cause -> if (cause != null) { log("Flow completed exceptionally") } } .catch { cause -> log("Caught exception. $cause") } .collect { value -> log(value) } }
予想したとおり、出力は下記です。
実行結果
1 [2022-01-25T15:25:22.812039400Z] [main] Flow completed exceptionally [2022-01-25T15:25:22.903040Z] [main] Caught exception. java.lang.RuntimeException [2022-01-25T15:25:22.939040Z] [main]
onCompletion
演算子はcatch
とは違い、例外を処理しません。
上記の例の通り、例外はまだ下流に流れます。これはさらにonCompletion
演算子へ運ばれ、catch
演算子で処理できます。
Successful completion
catch
演算子との他の異なる点は、onCompletion
はすべての例外を見て、上流のflowのcompletionが成功した場合のみnull
の例外を受け取ります。
(キャンセルと失敗を除き)
SuccessfulCompletion.kt
fun main() = runBlocking { simple9() .onCompletion { cause -> log("Flow completed with $cause") } .collect { value -> check(value <= 1) { "Collected $value" } log(value) } }
下流の例外が原因でflowが中止され、completionのcause
がnullでないことが分かります。
実行結果
1 [2022-01-25T15:38:47.171168300Z] [main] Flow completed with java.lang.IllegalStateException: Collected 2 [2022-01-25T15:38:47.241171100Z] [main] Exception in thread "main" java.lang.IllegalStateException: Collected 2
Imperative versus declarative
flowのcollectの方法や、命令型、宣言型の両方で完了と例外の処理を学びました。
ここで疑問として、どの方法がよくてそれはなぜかということです。
ライブラリーとして、特定のアプローチかは主張しませんし、どちらの方法もあなたの好みはコードスタイルによって選択することが出来、有効です。
Launching flow
何らかのソースからの非同期のイベントを表すためにflowを使うのは簡単です。
この場合、入ってくるイベントへの反応とコードの一部を登録し、さらに動き続き得るaddEventListener
関数に似たものが必要です。
onEach
演算子はこの役割を務めます。しかし、onEach
は中間演算子です。
flowをcollectするための終端演算子も必要です。でなければonEach
を呼んでも何も結果が起きません。
もしonEach
の後にcollect
終端演算子を使うと、その後のコードはflowがcollectされるまで待ちます。
LaunchingFlow.kt
// Imitate a flow of events fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } fun main() = runBlocking { events() .onEach { event -> log("Event: $event") } .collect() // <--- Collecting the flow waits log("Done") }
実行結果
Event: 1 [2022-01-25T16:14:06.240358600Z] [main] Event: 2 [2022-01-25T16:14:06.400353300Z] [main] Event: 3 [2022-01-25T16:14:06.509352200Z] [main] Done [2022-01-25T16:14:06.510353500Z] [main]
launchIn
終端演算子がここで役に立ちます。
collect
をlaunchIn
に置き換えると別々のcoroutineでflowのcollectionを起動でき、その後のコードの実行は即継続させることが出来ます。
fun main() = runBlocking { events() .onEach { event -> log("Event: $event") } .launchIn(this) // <--- Launching the flow in a separate coroutine log("Done") }
実行結果
Done [2022-01-25T16:23:53.801660900Z] [main] Event: 1 [2022-01-25T16:23:54.024658400Z] [main] Event: 2 [2022-01-25T16:23:54.127657200Z] [main] Event: 3 [2022-01-25T16:23:54.234274Z] [main]
launchIn
への必須のパラメータは、flowをcollectするcoroutineが起動するCoroutineScope
を指定します。
上記の例ではこのスコープはrunBlocking
coroutine builderからきています。
そのためflowが動いている間、このrunBlocking
スコープはその子のcoroutineの完了を待ち、main関数が戻らないようにし、この例が終了しないようにします。
実際のアプリケーションではスコープ宣言されたライフタイムとともにentityから来ます。
このentityのライフタイムが終了するとすぐに、対応するスコープはキャンセルされ、対応するflowのcollectionはキャンセルされます。
onEach { ... }.launchIn(scope)
のペアの方法はaddEventListener
のように働きます。
しかし、キャンセルと構造化された並列がこの目的をはたすので、対応するremoveEventListener
関数は必要ありません。
注意として、launchIn
もまたJob
を返します。
これはスコープ全体をキャンセルせずに対応するflow collectionのcoroutineだけをキャンセルしたり、join
させることが出来ます。
Flow cancellation checks
参考までにflow
builderはそれぞれのemitされた値でキャンセルに対してensureActive
チェックを追加で実施します。
これはflow{}
からemitされるビジーループがキャンセル可能なことを意味します。
FlowCancellationChecks.kt
fun foo(): Flow<Int> = flow { for (i in 1..5) { log("Emitting $i") emit(i) } } fun main() = runBlocking { foo().collect { value -> if (value == 3) cancel() log(value) } }
3まで数字が上がっていき4をemitしようとした後にCancellationException
が投げられます
実行結果
Emitting 1 [2022-01-26T15:49:27.166112300Z] [main] 1 [2022-01-26T15:49:27.212112600Z] [main] Emitting 2 [2022-01-26T15:49:27.224111500Z] [main] 2 [2022-01-26T15:49:27.224111500Z] [main] Emitting 3 [2022-01-26T15:49:27.224111500Z] [main] 3 [2022-01-26T15:49:27.228110500Z] [main] Emitting 4 [2022-01-26T15:49:27.228110500Z] [main] Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@517cd4b
しかし、ほとんどの他のflow演算子はそれぞれのパフォーマンス的な理由から追加のキャンセルチェックをしません。
例えば、同じビジーループを書き込むためにIntRange.asFlow
拡張を使い、どこでもsuspendしない場合、キャンセルはチェックされません。
fun main() = runBlocking { (1..5).asFlow().collect { value -> if (value == 3) cancel() log(value) } }
1から5までのすべての数字がcollectされ、runBlocking
から戻る前にのみキャンセルが検出されます。
実行結果
1 [2022-01-26T15:57:28.425460100Z] [main] 2 [2022-01-26T15:57:28.494454500Z] [main] 3 [2022-01-26T15:57:28.499456100Z] [main] 4 [2022-01-26T15:57:28.499456100Z] [main] 5 [2022-01-26T15:57:28.499456100Z] [main] Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@4516af24
Making busy flow cancellable
coroutineによるビジーループがある場合、明示的にキャンセルをチェックする必要があります。
.onEach { currentCoroutineContext().ensureActive() }
を追加することが出来ますが、
それを提供するcancellable
演算子をすぐに使うことが出来ます。
MakingBusyFlowCancellable.kt
fun main() = runBlocking { (1..5).asFlow().cancellable() .collect { value -> if (value == 3) cancel() log(value) } }
cancellable
演算子を使うと1から3までの数字だけがcollectされます。
実行結果
1 [2022-01-26T16:11:35.059531800Z] [main] 2 [2022-01-26T16:11:35.126530300Z] [main] 3 [2022-01-26T16:11:35.131528900Z] [main] Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@5fcd892a
Flow and Reactive Streams
Reactive Streams
かRxJavaやReactorのようなreactiveフレームワークに精通している人にとって、Flowのデザインはとても似ています。
実際、Reactive Streamsと様々な実装にインスパイアされています。
しかしFlowの主なゴールはてきる限りシンプルなデザインを持ち、kotlinとsuspensionにフレンドリーで、構造化された並列性を尊重することです。
このゴールを達成することはreactiveのパイオニアと彼らの多大な働きなしでは不可能でした。
Reactive Streams and Kotlin Flows
記事で完全なストーリーを読むことが出来ます。
異なっていても、概念的にFlowはreactive streamであり、reactive Publisherに変換可能であり、逆も同様です。
これらのコンバーターは型にはまらないkotlinx.coroutines
によって提供されており、対応するreactiveモジュールを見つけることが出来ます。
(Reactive Streamsにはcoroutines-reactive
、Reactor にはkotlinx-coroutines-reactor
、RxJava2/RxJava3にはkotlinx-coroutines-rx2/kotlinx-coroutines-rx3
)
統合モジュールは、Flow
からまたはFlow
への変換、ReactorのContext
の統合、様々なreactiveエンティティと連携するsuspensionフレンドリーな方法が含まれます。
サンプルコードは下記にあげました。
おわり。