RxJavaでReactive StreamsのBack Pressureを試す
RxJavaでReactive StreamsのBackPressureを試したメモです。
RxJavaでReactive StreamsのBackPressureを試してみました。
RxJavaは2.0からReactive Streamsに対応しています。
PublisherとSubscriber
RxJava2のReactive Streamsに対応したPublisherとSubscriberはそれぞれ、
Flowable、Subscriberとなっています。
(RxJava1のReactive Streams未対応なものはそれぞれObservableとObserver)
BackPressure
BackPressureは流量制御の仕組みです。
データを生産する側より、データを受信して処理する側の処理性能が低い場合、データがオーバーフローしてしまいます。
そのためデータを受信する側が、自分の処理可能なデータ量をデータを生産する側に伝えることで、
データを生産する側が必要な数だけデータを提供することが可能になります。
RxJavaではBackPressureを指定する方法として、
Flowable.create()時にBackpressureStrategyを指定する方法と、
Flowable.onBackpressureXXX()メソッドで後から指定する方法があるようです。
BackpressureStrategy | onBackpressureXXX | 概要 |
---|---|---|
BUFFER | onBackpressureBuffer() | 生成されたデータを無制限でバッファする |
ERROR | onBackpressureBuffer(int capacity) | capacityまで生成されたデータをバッファする。バッファの容量を超過するとBufferOverflowExceptionが発生する |
DROP | onBackpressureDrop() | 受け取る準備が出来るまでに生成されたデータを破棄する |
LATEST | onBackpressureLatest() | 最新のデータ以外破棄する |
ReactiveXのサイトの図
・onBackpressureBuffer
・onBackpressureDrop
・onBackpressureLatest
Maven Dependency
依存関係としてio.reactivex.rxjava2を追加しました。
pom.xml
<dependencies> <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.0.6</version> </dependency> </dependencies>
Subscriber
テスト用にログを出力する下記のSubscriberを用意。
requestにはLong.MAX_VALUEを指定して無制限にし、流量制御はobserveOnで指定するようにします。
onNextではThread.sleep()で擬似的な重い処理を入れています。
MySubscriber.java
public class MySubscriber<T> implements Subscriber<T> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println(" --> onSubscribe"); this.subscription = subscription; this.subscription.request(Long.MAX_VALUE); } @Override public void onNext(T data) { System.out.println(" --> onNext: " + data); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onError(Throwable throwable) { System.out.println("onError!!!"); throwable.printStackTrace(); } @Override public void onComplete() { System.out.println(" --> onComplete"); } }
onBackpressureBuffer()
Flowable側はintervalで100msec毎に0~9のデータを生成して、
Subscriber側は1,000msec毎にデータを処理するようにしてます。
observeOnで以降の処理を別スレッドで行うようにして、Flowableに対して2ずつrequestを行うようにしてみます。
BackpressureBuffer.java
public class BackpressureBuffer { public static void main(String[] args) throws InterruptedException { Flowable<Long> flowable = Flowable.interval(100L, TimeUnit.MILLISECONDS) .take(10) .doOnSubscribe(subscription -> System.out.println("<-- subscribe")) .doOnNext(data -> System.out.println("Flowable generated data:" + data)) .onBackpressureBuffer(); flowable.doOnRequest(req -> System.out.println("<-- request: " + req)) .observeOn(Schedulers.computation(), false, 2) .doOnRequest(req -> System.out.println(" <-- request: " + req)) .subscribe(new MySubscriber<>()); Thread.sleep(11_000L); } }
実行結果
<-- subscribe --> onSubscribe <-- request: 9223372036854775807 <-- request: 2 Flowable generated data:0 --> onNext: 0 Flowable generated data:1 Flowable generated data:2 Flowable generated data:3 Flowable generated data:4 Flowable generated data:5 Flowable generated data:6 Flowable generated data:7 Flowable generated data:8 Flowable generated data:9 --> onNext: 1 <-- request: 2 --> onNext: 2 --> onNext: 3 <-- request: 2 --> onNext: 4 --> onNext: 5 <-- request: 2 --> onNext: 6 --> onNext: 7 <-- request: 2 --> onNext: 8 --> onNext: 9 <-- request: 2 --> onComplete
Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
Flowableで生成されたデータが全部バッファされて2件ずつ通知されています。
<-- request: 9223372036854775807
となっているのはobserveOnでLong.MAX_VALUEを指定している箇所です。
onBackpressureBuffer(int capacity)
上記のonBackpressureBufferと同じコードでキャパシティとして3を設定してみます。
さらにバッファがオーバーフローした際に"overflow!!!!"とメッセージを表示するようにしてみました。
BackpressureBufferWithCapacity.java
public class BackpressureBufferWithCapacity { public static void main(String[] args) throws InterruptedException { Flowable<Long> flowable = Flowable.interval(400L, TimeUnit.MILLISECONDS) .take(10) .doOnSubscribe(subscription -> System.out.println("<-- subscribe")) .doOnNext(data -> System.out.println("Flowable generated data:" + data)) .onBackpressureBuffer(3, () -> System.out.println("overflow!!!!")); flowable.doOnRequest(req -> System.out.println("<-- request: " + req)) .observeOn(Schedulers.computation(), false, 2) .doOnRequest(req -> System.out.println(" <-- request: " + req)) .subscribe(new MySubscriber<>()); Thread.sleep(11_000L); } }
実行結果
<-- subscribe --> onSubscribe <-- request: 9223372036854775807 <-- request: 2 Flowable generated data:0 --> onNext: 0 Flowable generated data:1 Flowable generated data:2 --> onNext: 1 Flowable generated data:3 Flowable generated data:4 Flowable generated data:5 <-- request: 2 --> onNext: 2 Flowable generated data:6 Flowable generated data:7 --> onNext: 3 Flowable generated data:8 overflow!!!! <-- request: 2 onError!!! io.reactivex.exceptions.MissingBackpressureException: Buffer is full at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:98) at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:90) at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79) at io.reactivex.internal.operators.flowable.FlowableTake$TakeSubscriber.onNext(FlowableTake.java:64) at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:84) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
Flowableで生成されたデータがバッファされていきますが、8が生成された時点でMissingBackpressureExceptionが発生しています。
onNextで3まで通知されていて、キャパシティに3を指定しているので、6までをバッファで持てると思っていたのですが、
どうやらobserveOnのring bufferで2つ値をバッファしてるので、結果8までバッファを保持できるようです。(たぶん)
onBackpressureDrop()
Flowable側はintervalで300msec毎に0~19のデータを生成して、
Subscriber側は1,000msec毎にデータを処理するようにしてます。
observeOnで以降の処理を別スレッドで行うようにして、Flowableに対して2ずつrequestを行うようにしてみます。
BackpressureDrop.java
public class BackpressureDrop { public static void main(String[] args) throws InterruptedException { Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS) .take(20) .doOnSubscribe(subscription -> System.out.println("<-- subscribe")) .doOnNext(data -> System.out.println("Flowable generated data:" + data)) .onBackpressureDrop(); flowable.doOnRequest(req -> System.out.println("<-- request: " + req)) .observeOn(Schedulers.computation(), false, 2) .doOnRequest(req -> System.out.println(" <-- request: " + req)) .subscribe(new MySubscriber<>()); Thread.sleep(11_000L); } }
実行結果
<-- subscribe --> onSubscribe <-- request: 9223372036854775807 <-- request: 2 Flowable generated data:0 --> onNext: 0 Flowable generated data:1 Flowable generated data:2 Flowable generated data:3 --> onNext: 1 Flowable generated data:4 Flowable generated data:5 Flowable generated data:6 <-- request: 2 Flowable generated data:7 --> onNext: 7 Flowable generated data:8 Flowable generated data:9 Flowable generated data:10 --> onNext: 8 Flowable generated data:11 Flowable generated data:12 Flowable generated data:13 <-- request: 2 Flowable generated data:14 --> onNext: 14 Flowable generated data:15 Flowable generated data:16 Flowable generated data:17 --> onNext: 15 Flowable generated data:18 Flowable generated data:19 <-- request: 2 --> onComplete
Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
requestが来る直前までのデータは破棄され、requestの次に生成されたデータがonNextで通知されています。
SubscriberでonNextで受信したデータ以外は破棄されて通知されずにonCompleteで終了しています。
onBackpressureLatest()
Flowable側はintervalで300msec毎に0~19のデータを生成して、
Subscriber側は1,000msec毎にデータを処理するようにしてます。
observeOnで以降の処理を別スレッドで行うようにして、Flowableに対して2ずつrequestを行うようにしてみます。
BackpressureLatest.java
public class BackpressureLatest { public static void main(String[] args) throws InterruptedException { Flowable<Long> flowable = Flowable.interval(300L, TimeUnit.MILLISECONDS) .take(20) .doOnSubscribe(subscription -> System.out.println("<-- subscribe")) .doOnNext(data -> System.out.println("Flowable generated data:" + data)) .onBackpressureLatest(); flowable.doOnRequest(req -> System.out.println("<-- request: " + req)) .observeOn(Schedulers.computation(), false, 2) .doOnRequest(req -> System.out.println(" <-- request: " + req)) .subscribe(new MySubscriber<>()); Thread.sleep(11_000L); } }
実行結果
<-- subscribe --> onSubscribe <-- request: 9223372036854775807 <-- request: 2 Flowable generated data:0 --> onNext: 0 Flowable generated data:1 Flowable generated data:2 Flowable generated data:3 --> onNext: 1 Flowable generated data:4 Flowable generated data:5 Flowable generated data:6 <-- request: 2 --> onNext: 6 Flowable generated data:7 Flowable generated data:8 Flowable generated data:9 Flowable generated data:10 --> onNext: 7 Flowable generated data:11 Flowable generated data:12 Flowable generated data:13 <-- request: 2 --> onNext: 13 Flowable generated data:14 Flowable generated data:15 Flowable generated data:16 --> onNext: 14 Flowable generated data:17 Flowable generated data:18 Flowable generated data:19 <-- request: 2 --> onNext: 19 --> onComplete
Flowableはrequestに関係なくデータを生成し、observeOnからのrequestが来ると2件データを通知しています。
onBackpressureDrop()とは異なり、requestが来る直前のデータがキャシュされてonNextで通知されています。
終わり。
ソースは下記にあげました。
github.com
【参考】
https://codezine.jp/article/detail/9570
http://qiita.com/yuya_presto/items/0e95271bc85efe7f768e