KotlinのCoroutineを試す (Asynchronous Flow)

kotlinのcoroutineを試してみたメモです。

kotlin公式のcoroutineチュートリアルのAsynchronous Flowの写経とメモです。
公式を見たほうが最新で正確な情報が得られます。
https://kotlinlang.org/docs/flow.html

下記バージョンで試してみます。

  • kotlin 1.6.10
  • kotlinx-coroutines-core:1.6.0

Dependency

gradleを使って試してみます。

build.gradle

plugins {
    id 'org.jetbrains.kotlin.jvm' version '1.6.10'
}

group 'com.example.coroutine.kotlin'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0'
}

compileKotlin {
    kotlinOptions.jvmTarget = "11"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "11"
}

Util

実行時間と実行スレッドを表示しながら標準出力するために下記のlog関数を準備しておきます。

Utils.kt

fun log(msg: String) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")

Asynchronous Flow

suspend関数は非同期で一つの値を返しますが、非同期で複数の計算した値を返すにはどうすればよいでしょうか?
そこでkotlinのFlowを利用します。

Representing multiple values

kotlinでは複数の値はcollectionsを使用して表現することが出来ます。例えば、3つの値を持つListを返すsimple関数があり、
forEachを利用してすべての値をprintします。

RepresentingMultipleValues.kt

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> log(value) }
}

実行結果

1 [2022-01-27T16:57:22.393380Z] [main]
2 [2022-01-27T16:57:22.440376200Z] [main]
3 [2022-01-27T16:57:22.440376200Z] [main]

Sequences

もしCPUを利用するブロッキングな処理で数を計算する場合(各計算に100msecかかる場合)、Sequenceを使用して数を表現することが出来ます。

RepresentingMultipleValuesSequences.kt

fun simpleSeq(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(500) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simpleSeq().forEach { value -> log(value) }
}

実行結果

1 [2022-01-27T16:56:49.377941600Z] [main]
2 [2022-01-27T16:56:49.942941400Z] [main]
3 [2022-01-27T16:56:50.447941900Z] [main]

このコードは同じ数を表示しますが、それぞれを表示する前に100msec待ちます。

Suspending functions

しかし、上記の計算はコードを実行しているmain threadをブロッキングします。
これらの数値が非同期で計算される場合、simple関数にsuspendをつけることでブロッキングなしで計算され、結果はリストで返されます。

RepresentingMultipleValuesSuspending.kt

suspend fun simpleSus(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking {
    simpleSus().forEach { value -> log(value) }
}

実行結果

1 [2022-01-27T16:58:24.114585700Z] [main]
2 [2022-01-27T16:58:24.161584100Z] [main]
3 [2022-01-27T16:58:24.162596500Z] [main]

1秒待った後、数字が表示されます。

Flows

List<Int>を戻り値にした場合、すべての値は一度に返されます。
非同期に計算されたストリームであることを表現するために、同期で計算される値にSequence<Int>を使うようにFlow<Int>を使うことが出来ます。

RepresentingMultipleValuesFlows.kt

fun simpleFlow(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(500) // pretend we are doing something useful here
        // If use Thread.sleep, the main thread is blocked
        // Thread.sleep(500)
        emit(i) // emit next value
    }
}

fun main() = runBlocking {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            log("I'm not blocked $k")
            delay(500)
        }
    }
    // Collect the flow
    simpleFlow().collect { value -> log(value) }
}

実行結果

I'm not blocked 1 [2022-01-27T16:59:17.077063400Z] [main]
1 [2022-01-27T16:59:17.566065500Z] [main]
I'm not blocked 2 [2022-01-27T16:59:17.661070900Z] [main]
2 [2022-01-27T16:59:18.084596800Z] [main]
I'm not blocked 3 [2022-01-27T16:59:18.164251100Z] [main]
3 [2022-01-27T16:59:18.598993Z] [main]

上記のFlowの例とそれ以前の例では下記の点が異なります

  • Flowのbuilder関数としてflowを呼んでいます
  • flow{}builderブロック内のコードはsuspend出来ます
  • simpleFlow関数はsuspendは付与されません
  • emit関数を使用して値がemitされます
  • collect関数を使用して値がcollectされます

flow{}内のdelayThread.sleepに変えることも出来ます。その場合main threadはblockingされます。

Flows are cold

Flowsequenceと同様にcold streamです。flow builderの中のコードはcollectされるまで実行されません。
下記のコードを実行すると分かります。

FlowsAreCold.kt

fun coldFlow(): Flow<Int> = flow {
    log("Flow started")
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    log("Calling coldFlow...")
    val flow = coldFlow()
    log("Calling collect...")
    flow.collect { value ->
        log(value)
    }
    log("Calling collect again...")
    flow.collect { value ->
        log(value)
    }
}

実行結果

Calling coldFlow... [2022-01-27T17:00:49.365531700Z] [main]
Calling collect... [2022-01-27T17:00:49.420530900Z] [main]
Flow started [2022-01-27T17:00:49.424531100Z] [main]
1 [2022-01-27T17:00:50.443529300Z] [main]
2 [2022-01-27T17:00:51.459554700Z] [main]
3 [2022-01-27T17:00:52.464383500Z] [main]
Calling collect again... [2022-01-27T17:00:52.464906300Z] [main]
Flow started [2022-01-27T17:00:52.464906300Z] [main]
1 [2022-01-27T17:00:53.466936900Z] [main]
2 [2022-01-27T17:00:54.477829Z] [main]
3 [2022-01-27T17:00:55.490575100Z] [main]

これはcoldFlow関数にsuspendをつけていない主な理由です。
coldFlow関数は即座に返り、何も待ちません。
flowはcollectで毎回開始されるので、collectが再度呼ばれた後はFlow startedが表示されます。

Flow cancellation basics

Flowはcoroutineの一般的な協調的なキャンセルを遵守します。
delayのようなキャンセル可能なsuspend関数内でflowがsusupendされている場合、flowのcollectionはキャンセルされることがあります。
下記の例はwithTimeoutOrNull内でコードを実行し停止した場合、どのようにflowがキャンセルされるかを示しています。

FlowCancellationBasics.kt

fun simpleCancellation(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        log("Emitting $i.")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(2500) { // Timeout after 2500ms
        simpleCancellation().collect { value ->
            log(value)
        }
        log("Done")
    }
}

simpleCancellation関数内で2つの値のみエミットされ表示されています。

実行結果

Emitting 1. [2022-01-27T17:02:16.554302200Z] [main]
1 [2022-01-27T17:02:16.591268800Z] [main]
Emitting 2. [2022-01-27T17:02:17.605413900Z] [main]
2 [2022-01-27T17:02:17.605413900Z] [main]

詳細はFlow cancellation checksセクションを参照。

Flow builders

前の例のflow{}builderは一番基本的なものです。
簡単にflowを定義する他のbuilderも存在します。

  • flowOfbuilderは固定の値のセットをemitするflowを定義します。
  • 様々なcollectionやsequenceは.asFlow()拡張関数を使用してflowに変換できます

flowから1から3の数値を出力する例は下記のように書けます。

FlowBuilders.kt

fun main() = runBlocking {
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value ->
        log(value)
    }
}

実行結果

1 [2022-01-27T17:05:49.256194300Z] [main]
2 [2022-01-27T17:05:49.328190100Z] [main]
3 [2022-01-27T17:05:49.328190100Z] [main]

Intermediate flow operators

flowはcollectionやsequenceのように演算子で変換することができます。
中間演算子は上流のflowに適用され、下流のflowを返します。
これらの演算子はflowがそうであるようにcoldです。
演算子それ自身はsuspend関数ではありません。即座に実行され、新しい変換されたflowの定義を返します。

基本的な演算子mapfilterのようなよく知られている名前です。
sequenceとの重要な違いは、これらの演算子内のコードブロックはsuspend関数を呼ぶことが出来ます。

例として、入力リクエストのflowは、suspend関数を実装した長い時間処理が行われる場合でも、map演算子にマップすることができます。

IntermediateFlowOperators.kt

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> log(response) }
}

実行結果

response 1 [2022-01-27T17:06:45.534434300Z] [main]
response 2 [2022-01-27T17:06:46.575061100Z] [main]
response 3 [2022-01-27T17:06:47.575205200Z] [main]

Transform operator

flowの変換演算子の中で最も一般的なものはtransformと呼ばれています。
これはmapfilterのように単純な変換のように使用したり、もっと複雑な変換をすることが出来ます。
transform演算子を使うと、任意の値を任意の回数emitすることが出来ます。

例として、transform演算子を使うと長時間非同期で処理されるリクエストが実行される前に文字列をemitすることができ、
レスポンスを出力出来ます。

TransformOperator.kt

fun main() = runBlocking {
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> log(response) }
}

実行結果

Making request 1 [2022-01-27T17:07:58.020659100Z] [main]
response 1 [2022-01-27T17:07:59.112840900Z] [main]
Making request 2 [2022-01-27T17:07:59.112840900Z] [main]
response 2 [2022-01-27T17:08:00.121133500Z] [main]
Making request 3 [2022-01-27T17:08:00.121133500Z] [main]
response 3 [2022-01-27T17:08:01.125523600Z] [main]

Size-limiting operators

takeのようなサイズ制限中間演算子は制限に到達した際に、flowの実行をキャンセルします。
coroutineのキャンセルは常にexceptionがthrowされて実行されます。
そのため、(try { } finally { }のような)リソース管理する関数はキャンセルされた場合に正常に動作します。

SizeLimitingOperators.kt

fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        log("This line will not execute.")
        emit(3)
    } finally {
        log("Finally in numbers.")
    }
}

fun main() = runBlocking {
    numbers()
        .take(2) // take only the first two
        .collect { response ->
            log(response)
        }
}

実行結果は2つ目の値をemitしたあとに、numbers()関数内のflow{}の実行が停止しているのが分かります。

実行結果

1 [2022-01-27T17:09:27.683229600Z] [main]
2 [2022-01-27T17:09:27.736233200Z] [main]
Finally in numbers. [2022-01-27T17:09:27.738230800Z] [main]

Terminal flow operators

flowの終端演算子はflowの収集を開始するsuspend関数です。collect演算子は最も一般的なものですが、
他にも多数の演算子があります。

  • toListtoSetのように様々なcollectionに変換します
  • firstのように初めの値を取得するものや、singleのような必ず一つの値をemitする演算子があります
  • reducefoldでflowから値に変換します

TerminalFlowOperators.kt

fun main() = runBlocking {
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5
        .reduce { a, b -> a + b } // sum them (terminal operator)
    log(sum)
}

実行結果

55 [2022-01-27T17:10:28.989438900Z] [main]

Flows are sequential

それぞれのflowのcollectionはシーケンシャルに動作します(複数のflow上で動作する特別な演算子を除き)。
collectionは終端演算子を呼ぶcoroutineで直接動きます。デフォルトでは新しいcoroutineは起動されません。
それぞれのemitされた値は上流ストリームから下流ストリームの中間演算子で処理され、その後終端演算子へ流れます。

下記の例は、偶数でfilterし、stringにマップする例です。

FlowsAreSequential.kt

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            log("Filter $it")
            it % 2 == 0
        }
        .map {
            log("Map $it")
            "string $it"
        }
        .collect {
            log("Collect $it")
        }
}

実行結果

Filter 1 [2022-01-27T17:11:30.153900400Z] [main]
Filter 2 [2022-01-27T17:11:30.199899200Z] [main]
Map 2 [2022-01-27T17:11:30.200903400Z] [main]
Collect string 2 [2022-01-27T17:11:30.207899400Z] [main]
Filter 3 [2022-01-27T17:11:30.207899400Z] [main]
Filter 4 [2022-01-27T17:11:30.207899400Z] [main]
Map 4 [2022-01-27T17:11:30.207899400Z] [main]
Collect string 4 [2022-01-27T17:11:30.207899400Z] [main]
Filter 5 [2022-01-27T17:11:30.207899400Z] [main]

Flow context

flowのcollectionは常に呼び出し元のcoroutineのcontextで実行されます。
例えば、simpleのflowがある場合、下記のコードはsimpleflowの実装の詳細にかかわらず
コードを書いてる人によって指定されるcontext上で実行されます。

withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context
    }
}

このflowの特性はcontext preservationと呼ばれます。

よって、デフォルトではflow{}builder内のコードは対応するflowのcollectorが提供するcontextで実行されます。
例えば、呼び出したthreadを表示し、3つの値をemitするsimple関数の実装を考えた場合、

FlowContext.kt

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking {
    simple().collect() { value -> log("Collected $value") }
}

実行結果は下記のようになります。

Started simple flow [2022-01-08T20:45:18.735591500Z] [main @coroutine#1]
Collected 1 [2022-01-08T20:45:18.797592500Z] [main @coroutine#1]
Collected 2 [2022-01-08T20:45:18.797592500Z] [main @coroutine#1]
Collected 3 [2022-01-08T20:45:18.797592500Z] [main @coroutine#1]

simple().collectがmain threadから呼ばれるため、simpleのflowのbodyもまたmain threadで呼ばれています。
これはfast-runningや実行contextを気にしない呼び出し元をブロッキングしない非同期コードには最適なデフォルトです。

Wrong emission withContext

しかし、長時間実行されるCPUを消費するコードはDispatchers.Defaultのcontextで実行される必要があり、
UIを更新するコードはDispatchers.Mainのcontextで実行される必要があります。
通常withContextはKotlin coroutineを使用するコードでcontextを変更するために使用されますが、
flow{}builderのコードはcontext preservation 特性を守る必要があり、異なるcontextからはemitすることは出来ません。

下記のコードを実行してみます。

WrongEmissionWithContext.kt

fun simple3(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking {
    simple3().collect { value -> println(value) }
}

実行すると下記のようなエラーになります

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [BlockingCoroutine{Active}@737a3e52, BlockingEventLoop@42e1ea79],
        but emission happened in [DispatchedCoroutine{Active}@5edc2e34, Dispatchers.Default].
        Please refer to 'flow' documentation or use 'flowOn' instead

flowOn operator

exceptionはflow emissionのcotextを変更するために使われるflowOn関数を参照しています。
flowのcotextを変更する正しい方法は下記の例を見てください。
どのように動くかを見せるために対応するスレッド名を表示しています。

FlowOnOperator.kt

fun simple4(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking {
    simple4().collect { value ->
        log("Collected $value")
    }
}

実行結果

Emitting 1 [2022-01-11T17:09:55.967504400Z] [DefaultDispatcher-worker-1 @coroutine#2]
Collected 1 [2022-01-11T17:09:56.010500100Z] [main @coroutine#1]
Emitting 2 [2022-01-11T17:09:56.124531700Z] [DefaultDispatcher-worker-1 @coroutine#2]
Collected 2 [2022-01-11T17:09:56.124531700Z] [main @coroutine#1]
Emitting 3 [2022-01-11T17:09:56.236530900Z] [DefaultDispatcher-worker-1 @coroutine#2]
Collected 3 [2022-01-11T17:09:56.236530900Z] [main @coroutine#1]

main threadの中でcollectionが発生している間、flow{}がバックグラウンドthreadでどのように動くか注意してください。

ここで気づく別の事は、flowOn演算子はflowのデフォルトのシーケンシャルの性質が変更されたことです。
collectionは一つのcoroutine"coroutine#1"で発生し、emissionはcollection coroutineと並行で実行されている別のcoroutine"coroutine#2"で発生します。
flowOn演算子はそのcontextのCoroutineDispatherを変更する必要がある場合、上流ストリームのための別のcoroutineを生成します。

Buffering

異なるcoroutineで異なるflowの部分を実行することは、flowのcollectするための全体の時間、
とりわけ長時間実行する非同期演算が含まれる場合に役に立ちます。
例えば、simple5 flowによるemissionが遅く(一つの要素を生成するのに100msecかかる)
collectorもまた遅く一つの要素を処理するのに300msecかかる場合を考えてみます。
このようなflowで3つの数字でどのくらい時間がかかるか見てみます。

Buffering.kt

fun simple5(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple5().collect { value ->
            delay(300) // pretend we are processing it for 300 ms
            log(value)
        }
    }
    log("Collected in $time ms")
}

fun log(msg: Int) = println("$msg [${Instant.now()}] [${Thread.currentThread().name}]")

collection全体を処理するのに約1300msec(それぞれに400msec)かかっています。

実行結果

1 [2022-01-11T17:32:25.891781200Z] [main]
2 [2022-01-11T17:32:26.358782900Z] [main]
3 [2022-01-11T17:32:26.782780500Z] [main]
Collected in 1317 ms [2022-01-11T17:32:26.795784400Z] [main]

flowでbuffer演算子を使うことが出来ます。
シーケンシャルに実行するのとは対照的に、simple5 flow上でemitするコードと並行でcollecting処理を実行することが出来ます。

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple5().buffer()
            .collect { value -> // buffer emissions, don't wait
                delay(300) // pretend we are processing it for 300 ms
                log(value)
            }
    }
    log("Collected in $time ms")
}

処理するパイプラインを効果的に生成し、最初の数値に100msecだけ待機し、その後それぞれの数値の処理に300msecを費やすだけで
同じ数字を少し早く生成します。
この方法は実行に約1000msecかかります。

実行結果

1 [2022-01-12T17:39:24.206261100Z] [main]
2 [2022-01-12T17:39:24.579888500Z] [main]
3 [2022-01-12T17:39:24.893857400Z] [main]
Collected in 1168 ms [2022-01-12T17:39:24.912849Z] [main]

flowOn演算子はCoroutineDispatherを変更する必要がある場合同じバッファリングメカニズムを使用します。
しかしここでは実行contextを変更せずに明示的にbufferingをリクエストしています。

Conflation

flowが操作の一部の結果か操作状態の更新を表す場合、それぞれの値を操作する必要がないかもしれません。
しかし代わりに最新の値を処理します。
この場合、collectorが処理で非常に遅い場合、conflate演算子は中間値をスキップするために使うことが出来ます。
前の例を元にすると、

Conflation.kt

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple5()
            .conflate() // conflate emissions, don't process each one
            .collect { value ->
                delay(300) // pretend we are processing it for 300 ms
                log(value)
            }
    }
    log("Collected in $time ms")
}

初めの値が処理されている間に2つめと3つめの値がすでに生成されています。
そのため、2つ目の値は合成され、最新の値(3つ目)だけがcollectorに届けられます。

実行結果

1 [2022-01-14T19:00:09.291166900Z] [main]
3 [2022-01-14T19:00:09.666164300Z] [main]
Collected in 860 ms [2022-01-14T19:00:09.678163Z] [main]

Processing the latest value

Conflationはemitterとcollectorが両方遅い場合に処理をスピードアップする一つの方法です。
これはemitされた値を捨てることで行ってます。
他の方法としては、遅いcollectorをキャンセルし、新しい値がemitされるごとにrestartすることです。
xxx演算子と本質的に同じロジックとして動作するが新しい値でブロックのコードをキャンセルするxxxLatest演算子ファミリーがあります。
先程の例をconflatecollectLatestに変更して試してみます。

ProcessingTheLatestValue.kt

fun main() = runBlocking {
    val time = measureTimeMillis {
        simple5()
            .collectLatest { value -> // cancel & restart on the latest value
                log("Collecting $value")
                delay(300) // pretend we are processing it for 300 ms
                log("Done $value")
            }
    }
    log("Collected in $time ms")
}

collectLatestのbodyは300msecかかりますが、新しい値は100msecごとにemitされます。
blockは各値で実行されますが、最後の値のみ完了しています。

実行結果

Collecting 1 [2022-01-16T13:18:12.465516600Z] [main]
Collecting 2 [2022-01-16T13:18:12.620517Z] [main]
Collecting 3 [2022-01-16T13:18:12.727545100Z] [main]
Done 3 [2022-01-16T13:18:13.041528300Z] [main]
Collected in 765 ms [2022-01-16T13:18:13.059519Z] [main]

Composing multiple flows

複数のflowを構成するための多くの方法があります。

Zip

kotlin標準ライブラリのSequence.zip拡張関数のように、2つのflowの対応する値を合成するzip演算子があります。

Zip.kt

fun main() = runBlocking {
    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { log(it) } // collect and print
}

実行結果

1 -> one [2022-01-16T15:20:41.147086700Z] [main]
2 -> two [2022-01-16T15:20:41.179085100Z] [main]
3 -> three [2022-01-16T15:20:41.181086200Z] [main]

Combine

flowが変数か演算子の最新の値を表す場合、対応するflowの最新の値による計算と、
いずれかの上流のflowが値をemitするたびに再計算が必要となる場合があります。
その対応する演算子のファミリーはcombineと呼ばれます。

例えば、前の例の数字が300msecごとに更新されると、文字列は400msecごとに更新され、zip演算子を使っての圧縮は
400msecごとに出力されるにもかかわらず、同じ結果となります。

この例では各要素を遅延させ、sample flowをemitするコードをより宣言的で短くするためにonEach中間演算子を使用しています。

Combine.kt

fun main() = runBlocking {
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
        .collect { value -> // collect and print
            log("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

実行結果

1 -> one at 466 ms from start [2022-01-16T16:30:21.517337400Z] [main]
2 -> two at 843 ms from start [2022-01-16T16:30:21.874084400Z] [main]
3 -> three at 1259 ms from start [2022-01-16T16:30:22.290649300Z] [main]

zipのかわりにcombine演算子を使ってみます

fun main() = runBlocking {
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print
            log("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

かなり違う結果となります。numsstrsのflowからそれぞれemissionが出力されるごとに1行ずつ出力されます。

実行結果

1 -> one at 492 ms from start [2022-01-16T16:29:28.372165Z] [main]
2 -> one at 684 ms from start [2022-01-16T16:29:28.542163100Z] [main]
2 -> two at 964 ms from start [2022-01-16T16:29:28.822677Z] [main]
3 -> two at 996 ms from start [2022-01-16T16:29:28.855684900Z] [main]
3 -> three at 1371 ms from start [2022-01-16T16:29:29.229045Z] [main]

Flattening flows

flowは非同期で受信した値のシーケンスを表します、
そのため各値が別のシーケンス値のためのリクエストをトリガーする状況になるのはとても簡単です。
例えば、500msec離れた2つの文字列のflowを返す下記の関数があり、

FlatteningFlows.kt

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

3つの数値のflowがあり、下記のようにそれぞれの値でrequestFlowを呼ぶ場合

(1..3).asFlow().map { requestFlow(it) }

すると、さらなる処理のために一つのflowへflattenさせる必要があるflowのflow(Flow<Flow<String>>)となります。
このためにcollectionとsequenceにはflattenflatMap演算子があります。
しかし、flowの非同期の性質があり、flattenの異なるモードが必要になるため
flowにはflatten演算子のファミリーがあります。

flatMapConcat

Concatenating モードはflatMapConcatflattenConcat演算子で実装されています。
これらは対応するシーケンス演算子の最も直接的でアナログなものです。
これらは下記の例のように、次の値のcollectionを開始する前に内側のflowが完了するまで待ちます。

FlatMapConcat.kt

fun main() = runBlocking {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // collect and print
            log("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

結果を見ると、flatMapConcatのシーケンシャルな性質が明らかです。

実行結果

1: First at 142 ms from start [2022-01-16T19:34:41.178374600Z] [main]
1: Second at 700 ms from start [2022-01-16T19:34:41.721341700Z] [main]
2: First at 806 ms from start [2022-01-16T19:34:41.827339500Z] [main]
2: Second at 1310 ms from start [2022-01-16T19:34:42.331340100Z] [main]
3: First at 1422 ms from start [2022-01-16T19:34:42.443340Z] [main]
3: Second at 1923 ms from start [2022-01-16T19:34:42.944370900Z] [main]

flatMapMerge

別のflattenモードはすべての入ってくるflowを並行にcollectし、それらの値を1つのflowにマージすることで
可能な限り早くemitします。
これはflatMapMergeflattenMergeで実装されています。
どちらでもオプショナルでconcurrencyパラメータを取り、同時にcollectされる並行のflowの数を制限します。
(デフォルトはDEFAULT_CONCURRENCYです)

FlatMapMerge.kt

fun main() = runBlocking {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print
            log("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

flatMapMergeの並行な性質は明らかです。

実行結果

1: First at 182 ms from start [2022-01-16T19:50:42.276387300Z] [main]
2: First at 266 ms from start [2022-01-16T19:50:42.344387100Z] [main]
3: First at 378 ms from start [2022-01-16T19:50:42.456399500Z] [main]
1: Second at 696 ms from start [2022-01-16T19:50:42.774386900Z] [main]
2: Second at 773 ms from start [2022-01-16T19:50:42.851389900Z] [main]
3: Second at 883 ms from start [2022-01-16T19:50:42.961391400Z] [main]

flatMapMergeはコードブロック(この例では{ requestFlow(it) })をシーケンシャルに呼びますが、
結果のflowは並行でcollectすることに注意してください。
この動作はシーケンシャルにmap { requestFlow(it) } を実行し、そのあと結果に対してflattenMergeを呼ぶのと同等です。

flatMapLatest

"Processing the latest value"の項で示したcollectLatestと似た方法として、
新しいflowがemitされるとすぐに以前のflowのcollectionがキャンセルされる"Latest"flattenモードがあります。
これはflatMapLatest演算子で実装されています。

FlatMapLatest.kt

fun main() = runBlocking {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapLatest { requestFlow(it) }
        .collect { value -> // collect and print
            log("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

この例の出力はflatMapLatestがどのように動いているかが分かるよい実例です。

実行結果

1: First at 182 ms from start [2022-01-16T19:59:11.651387400Z] [main]
2: First at 298 ms from start [2022-01-16T19:59:11.750382500Z] [main]
3: First at 409 ms from start [2022-01-16T19:59:11.861384600Z] [main]
3: Second at 921 ms from start [2022-01-16T19:59:12.373383200Z] [main]

flatMapLatestは新しい値のブロック(この例では{ requestFlow(it) })のすべてのコードをキャンセルすることに注意してください。
この例では違いはありませんが、なぜならrequestFlowの呼び出し自身は速く、suspendではなく、キャンセル出来ないためです。
しかし、そこでdelayのようなsuspend関数を使った場合これが発生します。

Flow exceptions

Flowのcollectionはemitterや演算子内のコードが例外を投げる時に、例外とともに完了することがあります。
これらの例外を扱ういくつかの方法です。

Collector try and catch

collectorは例外を扱うためにkotlinのtry/catchを使うことが出来ます。

CollectorTryAndCatch.kt

fun simple6(): Flow<Int> = flow {
    for (i in 1..3) {
        log("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking {
    try {
        simple6().collect { value ->
            log(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        log("Caught $e")
    }
}

このコードはcollect終端演算子内の例外を正常にキャッチし、その後、それ以上の値はemitされてません。

実行結果

Emitting 1 [2022-01-19T16:43:35.716721600Z] [main]
1 [2022-01-19T16:43:35.782721500Z] [main]
Emitting 2 [2022-01-19T16:43:35.793725300Z] [main]
2 [2022-01-19T16:43:35.793725300Z] [main]
Caught java.lang.IllegalStateException: Collected 2 [2022-01-19T16:43:35.801721700Z] [main]

Everything is caught

前の例ではemitterか中間・終端演算子内でどのような例外が発生しても実際にキャッチします。
例えば、emitされた値をstringにmapするように変更してみます。
しかし、対応するコードは例外を生成します。

EverythingIsCaught.kt

fun simple7(): Flow<String> =
    flow {
        for (i in 1..3) {
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.map { value ->
        check(value <= 1) { "Crashed on $value" }
        "string $value"
    }

fun main() = runBlocking {
    try {
        simple7().collect { value -> log(value) }
    } catch (e: Throwable) {
        log("Caught $e")
    }
}

実行結果

Emitting 1 [2022-01-19T16:59:30.879808100Z] [main]
string 1 [2022-01-19T16:59:30.928812100Z] [main]
Emitting 2 [2022-01-19T16:59:30.928812100Z] [main]
Caught java.lang.IllegalStateException: Crashed on 2 [2022-01-19T16:59:30.934807600Z] [main]

Exception transparency

しかし、emitterのコードはどのようにして例外ハンドリングの振る舞いをカプセル化出来るのでしょう。

Flowは例外に対して透過的でなくてはならず、try catchグロックの中のflow{}builder内で値をemitすることは例外の透過性に違反します。
これは例外を投げるcollectorは前の例のようにtry catchと使うことで常にcatch出来るということを保証します。

emitterは例外の透過性を守るためにcatch演算子を使うことが出来、例外ハンドリグのカプセル化することが出来ます。
catch演算子の本体では例外を分析することが出来、どの例外をキャッチしたかによって異なる方法で対応できます。

  • 例外はthrowを使って再スロー出来ます
  • 例外はcatchの本体からemitを使うことで、値をemitすることが出来ます
  • 例外は無視、ロギング、他のコードによって処理することが出来ます

例として、例外のキャッチ内でテキストをemitしてみます

ExceptionTransparency.kt

fun main() = runBlocking {
    simple7()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> log(value) }
}

try catchを使わないですが、結果は同じです。

実行結果

Emitting 1 [2022-01-24T16:39:31.454981900Z] [main]
string 1 [2022-01-24T16:39:31.501983300Z] [main]
Emitting 2 [2022-01-24T16:39:31.501983300Z] [main]
Caught java.lang.IllegalStateException: Crashed on 2 [2022-01-24T16:39:31.510982500Z] [main]

Transparent catch

catch中間演算子は例外の透過性にもとづき上流の例外のみキャッチすることが出来ます
(catchより上のすべての演算子からの例外で、catchの下ではないです)
もしcollect{}内のブロック(catchの下に配置されている)が例外をスローする場合、それはキャッチされません。

TransparentCatch.kt

fun simple8(): Flow<Int> = flow {
    for (i in 1..3) {
        log("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    simple8()
        .catch { e -> log("Caught $e") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            log(value)
        }
}

"Caught ..."のメッセージはcatch演算子があるにもかかわらず表示されません。

実行結果

Emitting 1 [2022-01-24T16:51:46.300799900Z] [main]
1 [2022-01-24T16:51:46.359794800Z] [main]
Emitting 2 [2022-01-24T16:51:46.374794Z] [main]
Exception in thread "main" java.lang.IllegalStateException: Collected 2

Catching declaratively

onEachのcollect演算子の本体を移動し、catch演算子の前に置くことで、
catch演算子の宣言的な性質とすべての例外を処理する要求を組み合わせることが出来ます。
このflowのcollectionはパラメータなしのcollect()の呼び出しでトリガーされます。

CatchingDeclaratively.kt

fun main() = runBlocking {
    simple8()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }
            log(value)
        }
        .catch { e -> log("Caught $e") }
        .collect()
}

"Caught ..."メッセージが表示され、try catchを明示的に使用することなくすべての例外をキャッチすることが出来ます。

実行結果

Emitting 1 [2022-01-24T17:02:56.482095700Z] [main]
1 [2022-01-24T17:02:56.527096700Z] [main]
Emitting 2 [2022-01-24T17:02:56.544101300Z] [main]
Caught java.lang.IllegalStateException: Collected 2 [2022-01-24T17:02:56.552100200Z] [main]

Flow completion

flowのcollectionが完了するとき(正常または例外で)、アクションの実行が必要な場合があります。
すでに気づいてるかもしれませんが、2つの方法があります。命令形か宣言型です。

Imperative finally block

try catchにはさらに、collectの完了にあたりcollectorはアクションを実行するためにfinallyブロックを使うことが出来ます。

ImperativeFinallyBlock.kt

fun simple9(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking {
    try {
        simple9().collect { value -> log(value) }
    } finally {
        log("Done")
    }
}

simple9によって3つの数字が生成され、"Done"を出力します。

実行結果

1 [2022-01-24T17:15:20.824561400Z] [main]
2 [2022-01-24T17:15:20.874559900Z] [main]
3 [2022-01-24T17:15:20.874559900Z] [main]
Done [2022-01-24T17:15:20.875558600Z] [main]

Declarative handling

宣言型のアプローチとして、flowにはcollect完了した際に発動されるonCompletion中間演算子があります。

前の例はonCompletionを使用して書き換える事ができ、同じ出力となります。

DeclarativeHandling.kt

fun main() = runBlocking {
    simple9()
        .onCompletion { log("Done") }
        .collect { value -> log(value) }
}

実行結果

1 [2022-01-25T15:06:29.449730600Z] [main]
2 [2022-01-25T15:06:29.498730500Z] [main]
3 [2022-01-25T15:06:29.498730500Z] [main]
Done [2022-01-25T15:06:29.501738700Z] [main]

onCompletionの有利な点はラムダのnullableのThrowableパラメータです。
これでflowのcollectionが正常に完了したか、例外が発生したかを判定することが出来ます。
下記の例ではsimple10 flowは1をemitしたあとに例外をスローします

fun simple10(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking {
    simple10()
        .onCompletion { cause ->
            if (cause != null) {
                log("Flow completed exceptionally")
            }
        }
        .catch { cause -> log("Caught exception. $cause") }
        .collect { value -> log(value) }
}

予想したとおり、出力は下記です。

実行結果

1 [2022-01-25T15:25:22.812039400Z] [main]
Flow completed exceptionally [2022-01-25T15:25:22.903040Z] [main]
Caught exception. java.lang.RuntimeException [2022-01-25T15:25:22.939040Z] [main]

onCompletion演算子catchとは違い、例外を処理しません。
上記の例の通り、例外はまだ下流に流れます。これはさらにonCompletion演算子へ運ばれ、catch演算子で処理できます。

Successful completion

catch演算子との他の異なる点は、onCompletionはすべての例外を見て、上流のflowのcompletionが成功した場合のみnullの例外を受け取ります。
(キャンセルと失敗を除き)

SuccessfulCompletion.kt

fun main() = runBlocking {
    simple9()
        .onCompletion { cause -> log("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            log(value)
        }
}

下流の例外が原因でflowが中止され、completionのcauseがnullでないことが分かります。

実行結果

1 [2022-01-25T15:38:47.171168300Z] [main]
Flow completed with java.lang.IllegalStateException: Collected 2 [2022-01-25T15:38:47.241171100Z] [main]
Exception in thread "main" java.lang.IllegalStateException: Collected 2

Imperative versus declarative

flowのcollectの方法や、命令型、宣言型の両方で完了と例外の処理を学びました。
ここで疑問として、どの方法がよくてそれはなぜかということです。
ライブラリーとして、特定のアプローチかは主張しませんし、どちらの方法もあなたの好みはコードスタイルによって選択することが出来、有効です。

Launching flow

何らかのソースからの非同期のイベントを表すためにflowを使うのは簡単です。
この場合、入ってくるイベントへの反応とコードの一部を登録し、さらに動き続き得るaddEventListener関数に似たものが必要です。
onEach演算子はこの役割を務めます。しかし、onEachは中間演算子です。
flowをcollectするための終端演算子も必要です。でなければonEachを呼んでも何も結果が起きません。

もしonEachの後にcollect終端演算子を使うと、その後のコードはflowがcollectされるまで待ちます。

LaunchingFlow.kt

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking {
    events()
        .onEach { event -> log("Event: $event") }
        .collect() // <--- Collecting the flow waits
    log("Done")
}

実行結果

Event: 1 [2022-01-25T16:14:06.240358600Z] [main]
Event: 2 [2022-01-25T16:14:06.400353300Z] [main]
Event: 3 [2022-01-25T16:14:06.509352200Z] [main]
Done [2022-01-25T16:14:06.510353500Z] [main]

launchIn終端演算子がここで役に立ちます。
collectlaunchInに置き換えると別々のcoroutineでflowのcollectionを起動でき、その後のコードの実行は即継続させることが出来ます。

fun main() = runBlocking {
    events()
        .onEach { event -> log("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    log("Done")
}

実行結果

Done [2022-01-25T16:23:53.801660900Z] [main]
Event: 1 [2022-01-25T16:23:54.024658400Z] [main]
Event: 2 [2022-01-25T16:23:54.127657200Z] [main]
Event: 3 [2022-01-25T16:23:54.234274Z] [main]

launchInへの必須のパラメータは、flowをcollectするcoroutineが起動するCoroutineScopeを指定します。
上記の例ではこのスコープはrunBlocking coroutine builderからきています。
そのためflowが動いている間、このrunBlockingスコープはその子のcoroutineの完了を待ち、main関数が戻らないようにし、この例が終了しないようにします。

実際のアプリケーションではスコープ宣言されたライフタイムとともにentityから来ます。
このentityのライフタイムが終了するとすぐに、対応するスコープはキャンセルされ、対応するflowのcollectionはキャンセルされます。
onEach { ... }.launchIn(scope)のペアの方法はaddEventListenerのように働きます。
しかし、キャンセルと構造化された並列がこの目的をはたすので、対応するremoveEventListener関数は必要ありません。

注意として、launchInもまたJobを返します。
これはスコープ全体をキャンセルせずに対応するflow collectionのcoroutineだけをキャンセルしたり、joinさせることが出来ます。

Flow cancellation checks

参考までにflow builderはそれぞれのemitされた値でキャンセルに対してensureActiveチェックを追加で実施します。
これはflow{}からemitされるビジーループがキャンセル可能なことを意味します。

FlowCancellationChecks.kt

fun foo(): Flow<Int> = flow {
    for (i in 1..5) {
        log("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    foo().collect { value ->
        if (value == 3) cancel()
        log(value)
    }
}

3まで数字が上がっていき4をemitしようとした後にCancellationExceptionが投げられます

実行結果

Emitting 1 [2022-01-26T15:49:27.166112300Z] [main]
1 [2022-01-26T15:49:27.212112600Z] [main]
Emitting 2 [2022-01-26T15:49:27.224111500Z] [main]
2 [2022-01-26T15:49:27.224111500Z] [main]
Emitting 3 [2022-01-26T15:49:27.224111500Z] [main]
3 [2022-01-26T15:49:27.228110500Z] [main]
Emitting 4 [2022-01-26T15:49:27.228110500Z] [main]
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@517cd4b

しかし、ほとんどの他のflow演算子はそれぞれのパフォーマンス的な理由から追加のキャンセルチェックをしません。
例えば、同じビジーループを書き込むためにIntRange.asFlow拡張を使い、どこでもsuspendしない場合、キャンセルはチェックされません。

fun main() = runBlocking {
    (1..5).asFlow().collect { value ->
        if (value == 3) cancel()
        log(value)
    }
}

1から5までのすべての数字がcollectされ、runBlockingから戻る前にのみキャンセルが検出されます。

実行結果

1 [2022-01-26T15:57:28.425460100Z] [main]
2 [2022-01-26T15:57:28.494454500Z] [main]
3 [2022-01-26T15:57:28.499456100Z] [main]
4 [2022-01-26T15:57:28.499456100Z] [main]
5 [2022-01-26T15:57:28.499456100Z] [main]
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@4516af24

Making busy flow cancellable

coroutineによるビジーループがある場合、明示的にキャンセルをチェックする必要があります。
.onEach { currentCoroutineContext().ensureActive() }を追加することが出来ますが、
それを提供するcancellable演算子をすぐに使うことが出来ます。

MakingBusyFlowCancellable.kt

fun main() = runBlocking {
    (1..5).asFlow().cancellable()
        .collect { value ->
            if (value == 3) cancel()
            log(value)
        }
}

cancellable演算子を使うと1から3までの数字だけがcollectされます。

実行結果

1 [2022-01-26T16:11:35.059531800Z] [main]
2 [2022-01-26T16:11:35.126530300Z] [main]
3 [2022-01-26T16:11:35.131528900Z] [main]
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@5fcd892a

Flow and Reactive Streams

Reactive StreamsかRxJavaやReactorのようなreactiveフレームワークに精通している人にとって、Flowのデザインはとても似ています。

実際、Reactive Streamsと様々な実装にインスパイアされています。
しかしFlowの主なゴールはてきる限りシンプルなデザインを持ち、kotlinとsuspensionにフレンドリーで、構造化された並列性を尊重することです。
このゴールを達成することはreactiveのパイオニアと彼らの多大な働きなしでは不可能でした。
Reactive Streams and Kotlin Flows記事で完全なストーリーを読むことが出来ます。

異なっていても、概念的にFlowはreactive streamであり、reactive Publisherに変換可能であり、逆も同様です。
これらのコンバーターは型にはまらないkotlinx.coroutinesによって提供されており、対応するreactiveモジュールを見つけることが出来ます。
(Reactive Streamsにはcoroutines-reactive、Reactor にはkotlinx-coroutines-reactor、RxJava2/RxJava3にはkotlinx-coroutines-rx2/kotlinx-coroutines-rx3)
統合モジュールは、FlowからまたはFlowへの変換、ReactorのContextの統合、様々なreactiveエンティティと連携するsuspensionフレンドリーな方法が含まれます。

 
サンプルコードは下記にあげました。

github.com

おわり。