SpringBootでSpring WebFluxのAnnotation-based Programming Modelを試す

SpringBootでSpring WebFluxのAnnotation-based Programming Modelを試してみたメモです。

Spring WebFlux

SpringFramewark5(SpringBoot2)からSpring WebFluxが追加されます。
Spring WebFluxはリアクティブプログラミング(非同期でイベント駆動なノンブロッキングアプリケーション)のフレームワークです。23. WebFlux framework


WebFluxではAnnotation-based Programming ModelとFunctional Programming Modelがあり、
Annotation-based Programming ModelはSpringMVCと同じようにアノテーションベースでプログラミングを行います。
今回はAnnotation-based Programming Modelで単純なREST APIでのCRUDを試そうと思います。
(Functional Programming Modelは別で試して書こうと思います)

Maven

parentとしてspring-boot-starter-parentを指定しました。
SpringBoot2のRELEASEがまだなので、最新の2.0.0.M7で試してみます。

dependencyにはspring-boot-starter-webfluxを指定しました。

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.M7</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
</dependencies>

後はutilityとしてlombokを追加しました。

pom.xml

<!-- utility -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.18</version>
    <scope>provided</scope>
</dependency>
Flux, Mono

Spring WebFluxはReactive Streamsの実装として、Reactorを使用しています。
Project Reactor

Reactive StreamsのJava実装だとRxJava2のほうがメジャーだと思いますが、
RxJavaを知っている人はRxJavaのクラスがReactorのどれに相当するかを当てはめていけばよい気がします。

ReactorのPublisherにはFluxとMonoがあります。
Fluxは0からN個の要素を持つPublisherで、
Monoは0または1個の要素を持つPublisherです。

RxJavaではFlowableとMaybeに相当します。

Model

RESTのリソースとして下記のシンプルなUserクラスを定義。

@AllArgsConstructor
@Data
public static class User {
    private long id;
    private String name;
    private int age;
}
Repository

リポジトリCRUDの各メソッドを定義。

UserRepository.java

public interface UserRepository {
    Mono<User> getById(long id);
    Flux<User> getAll();
    Mono<Void> save(Mono<User> user);
    Mono<Void> saveAll(Flux<User> users);
    Mono<User> update(long id, Mono<User> user);
    Mono<Void> delete(long id);
}

リポジトリの各メソッドを実装。
簡単なデータストアとしてMapにUserを保持するようにしました。
(ブロッキングになっちゃっているかも??)

UserRepositoryImpl.java

@Repository
public class UserRepositoryImpl implements UserRepository {
    private Map<Long, User> users = new HashMap<>();

    @PostConstruct
    public void init() {
        users.put(1L, new User(1, "Alice", 20));
        users.put(2L, new User(2, "Bobby", 32));
        users.put(3L, new User(3, "Cindy", 41));
    }

     :
     :

getById()でidをキーにUserを1件取得。
justOrEmpty()で引数のデータのMonoを生成します。
justOrEmpty()ではnullの場合onComplete()が実行されます。
log()でReactive Streamの各シグナルがトレース出来ます。

    @Override
    public Mono<User> getById(long id) {
        return Mono.justOrEmpty(users.get(id)).log();
    }

getAll()で全Userを取得。
fromIterable()でIterableの各要素からFluxを生成します。

    @Override
    public Flux<User> getAll() {
        return Flux.fromIterable(users.values()).log();
    }

save()でUserを登録。
map()でMonoの要素に対する処理を記述。
then()で要素を持たないMonoを返します。

    @Override
    public Mono<Void> save(Mono<User> user) {
        return user.map(u -> {
            System.out.println("saved:" + u.getId());
            users.put(u.getId(), u);
            return u;
        }).log().then();
    }

saveAll()で複数のUserを登録。
処理内容は上記のsave()と同じです。

    @Override
    public Mono<Void> saveAll(Flux<User> users) {
        return users.map(u -> {
            System.out.println("saved:" + u.getId());
            this.users.put(u.getId(), u);
            return u;
        }).log().then();
    }

update()でUserを更新。
map()で更新し、更新後のUserを返します。

    @Override
    public Mono<User> update(long id, Mono<User> user) {
        return user.map(u -> users.put(id, u)).log();
    }

delete()でUserを削除。
empty()で要素を持たないMonoを返します。

    @Override
    public Mono<Void> delete(long id) {
        users.remove(id);
        return Mono.empty();
    }
Controller

各エンドポイントを定義します。
SpringMVCと同様のアノテーションで定義してます。
異なるのは引数と戻り値にMonoやFluxがあるところです。

@RestController
public class WebfluxUserController {
    private final UserRepository repository;

    public WebfluxUserController(UserRepository repository) {
        this.repository = repository;
    }

    @GetMapping("/users/{id}")
    public Mono<User> getById(@PathVariable int id) {
        return repository.getById(id);
    }

    @GetMapping("/users")
    public Flux<User> getAll() {
        return repository.getAll();
    }

    @PostMapping("/users")
    Mono<Void> save(@RequestBody Mono<User> user) {
        return repository.save(user);
    }

    @PostMapping("/users/bulk")
    Mono<Void> saveBulk(@RequestBody Flux<User> users) {
        return repository.saveAll(users);
    }

    @PutMapping("/users/{id}")
    Mono<User> update(@PathVariable int id, @RequestBody Mono<User> user) {
        return repository.update(id, user);
    }

    @DeleteMapping("/users/{id}")
    Mono<Void> remove(@PathVariable int id) {
        return repository.delete(id);
    }

    @AllArgsConstructor
    @Data
    public static class User {
        long id;
        String name;
        int age;
    }
}

実行

curlで実行して確認してみます。

GET

id:1のユーザを取得

$ curl "http://localhost:8080/users/1" -H 'Accept: application/stream+json'
{"id":1,"name":"Alice","age":20}

ログ

2018-01-13 21:38:24.015  INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1  : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2018-01-13 21:38:24.016  INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1  : | request(1)
2018-01-13 21:38:24.016  INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1  : | onNext(WebfluxUserController.User(id=1, name=Alice, age=20))
2018-01-13 21:38:24.040  INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1  : | request(31)
2018-01-13 21:38:24.041  INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1  : | onComplete()

存在しないidのユーザを取得

$ curl "http://localhost:8080/users/5"

ログ

2018-01-13 20:45:47.886  INFO 59813 --- [ctor-http-nio-2] reactor.Mono.Empty.1  : onSubscribe([Fuseable] Operators.EmptySubscription)
2018-01-13 20:45:47.887  INFO 59813 --- [ctor-http-nio-2] reactor.Mono.Empty.1  : request(1)
2018-01-13 20:45:47.887  INFO 59813 --- [ctor-http-nio-2] reactor.Mono.Empty.1  : onComplete()

全ユーザを取得
ヘッダに"Accept: application/stream+json"を指定してjson stream形式で取得する。
この場合、backpressureが有効。

$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json'
{"id":1,"name":"Alice","age":20}
{"id":2,"name":"Bobby","age":32}
{"id":3,"name":"Cindy","age":41}

ログ
request(1),request(31)のようにbackpressureが効いている。

2018-01-13 21:40:45.917  INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4  : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
2018-01-13 21:40:45.918  INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4  : | request(1)
2018-01-13 21:40:45.918  INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4  : | onNext(WebfluxUserController.User(id=1, name=Alice, age=20))
2018-01-13 21:40:45.919  INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4  : | request(31)
2018-01-13 21:40:45.919  INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4  : | onNext(WebfluxUserController.User(id=2, name=Bobby, age=32))
2018-01-13 21:40:45.919  INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4  : | onNext(WebfluxUserController.User(id=3, name=Cindy, age=41))
2018-01-13 21:40:45.920  INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4  : | onComplete()

全ユーザを取得
Acceptヘッダを指定せずにコールすると、jsonで(配列)で返ってくる。
この場合、backpressureは無効。

$ curl "http://localhost:8080/users"
[{"id":1,"name":"Alice","age":20},{"id":2,"name":"Bobby","age":32},{"id":3,"name":"Cindy","age":41}]

ログ
request(unbounded)になっている。

2018-01-13 21:39:41.138  INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3  : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
2018-01-13 21:39:41.138  INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3  : | request(unbounded)
2018-01-13 21:39:41.138  INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3  : | onNext(WebfluxUserController.User(id=1, name=Alice, age=20))
2018-01-13 21:39:41.138  INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3  : | onNext(WebfluxUserController.User(id=2, name=Bobby, age=32))
2018-01-13 21:39:41.138  INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3  : | onNext(WebfluxUserController.User(id=3, name=Cindy, age=41))
2018-01-13 21:39:41.138  INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3  : | onComplete()
2018-01-13 21:39:41.146  INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3  : | cancel()
POST

id:4のユーザを登録

$ curl "http://localhost:8080/users" -d '{"id":4,"name":"Dan","age":44}' -H 'Content-Type: application/stream+json'

ログ

2018-01-13 20:07:46.928  INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1  : onSubscribe(FluxMap.MapSubscriber)
2018-01-13 20:07:46.930  INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1  : request(unbounded)
saved:4
2018-01-13 20:07:46.941  INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1  : onNext(WebfluxUserController.User(id=4, name=Dan, age=44))
2018-01-13 20:07:46.942  INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1  : onComplete()

確認

$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json'
{"id":1,"name":"Alice","age":20}
{"id":2,"name":"Bobby","age":32}
{"id":3,"name":"Cindy","age":41}
{"id":4,"name":"Dan","age":44}

一度に複数のUserを登録

$ curl "http://localhost:8080/users/bulk" -d '{"id":5,"name":"Emily","age":55}{"id":6,"name":"Felna","age":62}' -H 'Content-Type: application/stream+json'

ログ

2018-01-13 20:15:01.649  INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1  : onSubscribe(FluxMap.MapSubscriber)
2018-01-13 20:15:01.649  INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1  : request(unbounded)
saved:5
2018-01-13 20:15:01.657  INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1  : onNext(WebfluxUserController.User(id=5, name=Emily, age=55))
saved:6
2018-01-13 20:15:01.658  INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1  : onNext(WebfluxUserController.User(id=6, name=Felna, age=62))
2018-01-13 20:15:01.658  INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1  : onComplete()

確認

$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json'
{"id":1,"name":"Alice","age":20}
{"id":2,"name":"Bobby","age":32}
{"id":3,"name":"Cindy","age":41}
{"id":4,"name":"Dan","age":44}
{"id":5,"name":"Emily","age":55}
{"id":6,"name":"Felna","age":62}

PUT

id:1のUserを更新。
レスポンスとして更新されたUser情報が返ってくる。

$ curl -X PUT "http://localhost:8080/users/1" -d '{"id":1,"name":"Abcde","age":111}' -H 'Content-Type: application/stream+json'
{"id":1,"name":"Abcde","age":111}

ログ

2018-01-13 20:20:26.911  INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2  : onSubscribe(FluxMap.MapSubscriber)
2018-01-13 20:20:26.911  INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2  : request(1)
2018-01-13 20:20:26.911  INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2  : onNext(WebfluxUserController.User(id=1, name=Abcde, age=111))
2018-01-13 20:20:26.912  INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2  : request(1)
2018-01-13 20:20:26.912  INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2  : request(31)
2018-01-13 20:20:26.912  INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2  : onComplete()
2018-01-13 20:20:26.913  INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2  : cancel()

確認

$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json'
{"id":1,"name":"Abcde","age":111}
{"id":2,"name":"Bobby","age":32}
{"id":3,"name":"Cindy","age":41}

DELETE

id:2のUserを削除。

$ curl -X DELETE "http://localhost:8080/users/2"

確認

$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json'
{"id":1,"name":"Alice","age":20}
{"id":3,"name":"Cindy","age":41}

おわり。


【参考】
https://blog.ik.am/entries/417
https://www.slideshare.net/makingx/introduction-to-spring-webflux-jsug-sfa1
http://javasampleapproach.com/spring-framework/spring-webflux/springboot-webflux-annotation-based-programming-model



ソースコードは下記にあげました。
github.com