読者です 読者をやめる 読者になる 読者になる

RxJavaでReactive StreamsのBack Pressureを試す

RxJavaでReactive StreamsのBackPressureを試したメモです。

RxJavaでReactive StreamsのBackPressureを試してみました。
RxJavaは2.0からReactive Streamsに対応しています。

PublisherとSubscriber

RxJava2のReactive Streamsに対応したPublisherとSubscriberはそれぞれ、
Flowable、Subscriberとなっています。
(RxJava1のReactive Streams未対応なものはそれぞれObservableとObserver)

BackPressure

BackPressureは流量制御の仕組みです。
データを生産する側より、データを受信して処理する側の処理性能が低い場合、データがオーバーフローしてしまいます。
そのためデータを受信する側が、自分の処理可能なデータ量をデータを生産する側に伝えることで、
データを生産する側が必要な数だけデータを提供することが可能になります。

RxJavaではBackPressureを指定する方法として、
Flowable.create()時にBackpressureStrategyを指定する方法と、
Flowable.onBackpressureXXX()メソッドで後から指定する方法があるようです。

BackpressureStrategy onBackpressureXXX 概要
BUFFER onBackpressureBuffer() 生成されたデータを無制限でバッファする
ERROR onBackpressureBuffer(int capacity) capacityまで生成されたデータをバッファする。バッファの容量を超過するとBufferOverflowExceptionが発生する
DROP onBackpressureDrop() 受け取る準備が出来るまでに生成されたデータを破棄する
LATEST onBackpressureLatest() 最新のデータ以外破棄する

ReactiveXのサイトの図
・onBackpressureBuffer
f:id:pppurple:20170425024333p:plain:w600

・onBackpressureDrop
f:id:pppurple:20170425024344p:plain:w600

・onBackpressureLatest
f:id:pppurple:20170425024349p:plain:w600

Maven Dependency

依存関係としてio.reactivex.rxjava2を追加しました。

pom.xml

<dependencies>
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.0.6</version>
    </dependency>
</dependencies>
Subscriber

テスト用にログを出力する下記のSubscriberを用意。
requestにはLong.MAX_VALUEを指定して無制限にし、流量制御はobserveOnで指定するようにします。

onNextではThread.sleep()で擬似的な重い処理を入れています。

MySubscriber.java

public class MySubscriber<T> implements Subscriber<T> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println("  --> onSubscribe");
        this.subscription = subscription;
        this.subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(T data) {
        System.out.println("  --> onNext: " + data);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError!!!");
        throwable.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("  --> onComplete");
    }
}
onBackpressureBuffer()

Flowable側はintervalで100msec毎に0~9のデータを生成して、
Subscriber側は1,000msec毎にデータを処理するようにしてます。

observeOnで以降の処理を別スレッドで行うようにして、Flowableに対して2ずつrequestを行うようにしてみます。

BackpressureBuffer.java

public class BackpressureBuffer {
    public static void main(String[] args) throws InterruptedException {

        Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS)
                .take(10)
                .doOnSubscribe(subscription -> System.out.println("<-- subscribe"))
                .doOnNext(data -> System.out.println("Flowable generated data:" + data))
                .onBackpressureBuffer();

        flowable.doOnRequest(req -> System.out.println("<-- request: " + req))
                .observeOn(Schedulers.computation(), false, 2)
                .doOnRequest(req -> System.out.println("  <-- request: " + req))
                .subscribe(new MySubscriber<>());

        Thread.sleep(11_000L);
    }
}

実行結果

<-- subscribe
  --> onSubscribe
  <-- request: 9223372036854775807
<-- request: 2
Flowable generated data:0
  --> onNext: 0
Flowable generated data:1
Flowable generated data:2
Flowable generated data:3
Flowable generated data:4
Flowable generated data:5
Flowable generated data:6
Flowable generated data:7
Flowable generated data:8
Flowable generated data:9
  --> onNext: 1
<-- request: 2
  --> onNext: 2
  --> onNext: 3
<-- request: 2
  --> onNext: 4
  --> onNext: 5
<-- request: 2
  --> onNext: 6
  --> onNext: 7
<-- request: 2
  --> onNext: 8
  --> onNext: 9
<-- request: 2
  --> onComplete

Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
Flowableで生成されたデータが全部バッファされて2件ずつ通知されています。

  <-- request: 9223372036854775807

となっているのはobserveOnでLong.MAX_VALUEを指定している箇所です。

onBackpressureBuffer(int capacity)

上記のonBackpressureBufferと同じコードでキャパシティとして3を設定してみます。
さらにバッファがオーバーフローした際に"overflow!!!!"とメッセージを表示するようにしてみました。

BackpressureBufferWithCapacity.java

public class BackpressureBufferWithCapacity {
    public static void main(String[] args) throws InterruptedException {

        Flowable<Long> flowable = Flowable.interval(400L, TimeUnit.MILLISECONDS)
                .take(10)
                .doOnSubscribe(subscription -> System.out.println("<-- subscribe"))
                .doOnNext(data -> System.out.println("Flowable generated data:" + data))
                .onBackpressureBuffer(3, () -> System.out.println("overflow!!!!"));

        flowable.doOnRequest(req -> System.out.println("<-- request: " + req))
                .observeOn(Schedulers.computation(), false, 2)
                .doOnRequest(req -> System.out.println("  <-- request: " + req))
                .subscribe(new MySubscriber<>());

        Thread.sleep(11_000L);
    }
}

実行結果

<-- subscribe
  --> onSubscribe
  <-- request: 9223372036854775807
<-- request: 2
Flowable generated data:0
  --> onNext: 0
Flowable generated data:1
Flowable generated data:2
  --> onNext: 1
Flowable generated data:3
Flowable generated data:4
Flowable generated data:5
<-- request: 2
  --> onNext: 2
Flowable generated data:6
Flowable generated data:7
  --> onNext: 3
Flowable generated data:8
overflow!!!!
<-- request: 2
onError!!!
io.reactivex.exceptions.MissingBackpressureException: Buffer is full
    at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:98)
    at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:90)
    at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
    at io.reactivex.internal.operators.flowable.FlowableTake$TakeSubscriber.onNext(FlowableTake.java:64)
    at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:84)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
Flowableで生成されたデータがバッファされていきますが、8が生成された時点でMissingBackpressureExceptionが発生しています。

onNextで3まで通知されていて、キャパシティに3を指定しているので、6までをバッファで持てると思っていたのですが、
どうやらobserveOnのring bufferで2つ値をバッファしてるので、結果8までバッファを保持できるようです。(たぶん)

onBackpressureDrop()

Flowable側はintervalで300msec毎に0~19のデータを生成して、
Subscriber側は1,000msec毎にデータを処理するようにしてます。

observeOnで以降の処理を別スレッドで行うようにして、Flowableに対して2ずつrequestを行うようにしてみます。

BackpressureDrop.java

public class BackpressureDrop {
    public static void main(String[] args) throws InterruptedException {

        Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .take(20)
                .doOnSubscribe(subscription -> System.out.println("<-- subscribe"))
                .doOnNext(data -> System.out.println("Flowable generated data:" + data))
                .onBackpressureDrop();

        flowable.doOnRequest(req -> System.out.println("<-- request: " + req))
                .observeOn(Schedulers.computation(), false, 2)
                .doOnRequest(req -> System.out.println("  <-- request: " + req))
                .subscribe(new MySubscriber<>());

        Thread.sleep(11_000L);
    }
}

実行結果

<-- subscribe
  --> onSubscribe
  <-- request: 9223372036854775807
<-- request: 2
Flowable generated data:0
  --> onNext: 0
Flowable generated data:1
Flowable generated data:2
Flowable generated data:3
  --> onNext: 1
Flowable generated data:4
Flowable generated data:5
Flowable generated data:6
<-- request: 2
Flowable generated data:7
  --> onNext: 7
Flowable generated data:8
Flowable generated data:9
Flowable generated data:10
  --> onNext: 8
Flowable generated data:11
Flowable generated data:12
Flowable generated data:13
<-- request: 2
Flowable generated data:14
  --> onNext: 14
Flowable generated data:15
Flowable generated data:16
Flowable generated data:17
  --> onNext: 15
Flowable generated data:18
Flowable generated data:19
<-- request: 2
  --> onComplete

Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
requestが来る直前までのデータは破棄され、requestの次に生成されたデータがonNextで通知されています。
SubscriberでonNextで受信したデータ以外は破棄されて通知されずにonCompleteで終了しています。

onBackpressureLatest()

Flowable側はintervalで300msec毎に0~19のデータを生成して、
Subscriber側は1,000msec毎にデータを処理するようにしてます。

observeOnで以降の処理を別スレッドで行うようにして、Flowableに対して2ずつrequestを行うようにしてみます。

BackpressureLatest.java

public class BackpressureLatest {
    public static void main(String[] args) throws InterruptedException {

        Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .take(20)
                .doOnSubscribe(subscription -> System.out.println("<-- subscribe"))
                .doOnNext(data -> System.out.println("Flowable generated data:" + data))
                .onBackpressureLatest();

        flowable.doOnRequest(req -> System.out.println("<-- request: " + req))
                .observeOn(Schedulers.computation(), false, 2)
                .doOnRequest(req -> System.out.println("  <-- request: " + req))
                .subscribe(new MySubscriber<>());

        Thread.sleep(11_000L);
    }
}

実行結果

<-- subscribe
  --> onSubscribe
  <-- request: 9223372036854775807
<-- request: 2
Flowable generated data:0
  --> onNext: 0
Flowable generated data:1
Flowable generated data:2
Flowable generated data:3
  --> onNext: 1
Flowable generated data:4
Flowable generated data:5
Flowable generated data:6
<-- request: 2
  --> onNext: 6
Flowable generated data:7
Flowable generated data:8
Flowable generated data:9
Flowable generated data:10
  --> onNext: 7
Flowable generated data:11
Flowable generated data:12
Flowable generated data:13
<-- request: 2
  --> onNext: 13
Flowable generated data:14
Flowable generated data:15
Flowable generated data:16
  --> onNext: 14
Flowable generated data:17
Flowable generated data:18
Flowable generated data:19
<-- request: 2
  --> onNext: 19
  --> onComplete

Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
onBackpressureDrop()とは異なり、requestが来る直前のデータがキャシュされてonNextで通知されています。


終わり。

ソースは下記にあげました。
github.com


【参考】

https://codezine.jp/article/detail/9570
http://qiita.com/yuya_presto/items/0e95271bc85efe7f768e