SpringBootでSpring WebFluxのAnnotation-based Programming Modelを試す
SpringBootでSpring WebFluxのAnnotation-based Programming Modelを試してみたメモです。
Spring WebFlux
SpringFramewark5(SpringBoot2)からSpring WebFluxが追加されます。
Spring WebFluxはリアクティブプログラミング(非同期でイベント駆動なノンブロッキングアプリケーション)のフレームワークです。23. WebFlux framework
WebFluxではAnnotation-based Programming ModelとFunctional Programming Modelがあり、
Annotation-based Programming ModelはSpringMVCと同じようにアノテーションベースでプログラミングを行います。
今回はAnnotation-based Programming Modelで単純なREST APIでのCRUDを試そうと思います。
(Functional Programming Modelは別で試して書こうと思います)
Maven
parentとしてspring-boot-starter-parentを指定しました。
SpringBoot2のRELEASEがまだなので、最新の2.0.0.M7で試してみます。
dependencyにはspring-boot-starter-webfluxを指定しました。
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.M7</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> </dependencies>
後はutilityとしてlombokを追加しました。
pom.xml
<!-- utility --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>provided</scope> </dependency>
Flux, Mono
Spring WebFluxはReactive Streamsの実装として、Reactorを使用しています。
Project Reactor
Reactive StreamsのJava実装だとRxJava2のほうがメジャーだと思いますが、
RxJavaを知っている人はRxJavaのクラスがReactorのどれに相当するかを当てはめていけばよい気がします。
ReactorのPublisherにはFluxとMonoがあります。
Fluxは0からN個の要素を持つPublisherで、
Monoは0または1個の要素を持つPublisherです。
RxJavaではFlowableとMaybeに相当します。
Model
RESTのリソースとして下記のシンプルなUserクラスを定義。
@AllArgsConstructor @Data public static class User { private long id; private String name; private int age; }
Repository
UserRepository.java
public interface UserRepository { Mono<User> getById(long id); Flux<User> getAll(); Mono<Void> save(Mono<User> user); Mono<Void> saveAll(Flux<User> users); Mono<User> update(long id, Mono<User> user); Mono<Void> delete(long id); }
リポジトリの各メソッドを実装。
簡単なデータストアとしてMapにUserを保持するようにしました。
(ブロッキングになっちゃっているかも??)
UserRepositoryImpl.java
@Repository public class UserRepositoryImpl implements UserRepository { private Map<Long, User> users = new HashMap<>(); @PostConstruct public void init() { users.put(1L, new User(1, "Alice", 20)); users.put(2L, new User(2, "Bobby", 32)); users.put(3L, new User(3, "Cindy", 41)); } : :
getById()でidをキーにUserを1件取得。
justOrEmpty()で引数のデータのMonoを生成します。
justOrEmpty()ではnullの場合onComplete()が実行されます。
log()でReactive Streamの各シグナルがトレース出来ます。
@Override public Mono<User> getById(long id) { return Mono.justOrEmpty(users.get(id)).log(); }
getAll()で全Userを取得。
fromIterable()でIterableの各要素からFluxを生成します。
@Override public Flux<User> getAll() { return Flux.fromIterable(users.values()).log(); }
save()でUserを登録。
map()でMonoの要素に対する処理を記述。
then()で要素を持たないMono
@Override public Mono<Void> save(Mono<User> user) { return user.map(u -> { System.out.println("saved:" + u.getId()); users.put(u.getId(), u); return u; }).log().then(); }
saveAll()で複数のUserを登録。
処理内容は上記のsave()と同じです。
@Override public Mono<Void> saveAll(Flux<User> users) { return users.map(u -> { System.out.println("saved:" + u.getId()); this.users.put(u.getId(), u); return u; }).log().then(); }
update()でUserを更新。
map()で更新し、更新後のUserを返します。
@Override public Mono<User> update(long id, Mono<User> user) { return user.map(u -> users.put(id, u)).log(); }
delete()でUserを削除。
empty()で要素を持たないMonoを返します。
@Override public Mono<Void> delete(long id) { users.remove(id); return Mono.empty(); }
Controller
各エンドポイントを定義します。
SpringMVCと同様のアノテーションで定義してます。
異なるのは引数と戻り値にMonoやFluxがあるところです。
@RestController public class WebfluxUserController { private final UserRepository repository; public WebfluxUserController(UserRepository repository) { this.repository = repository; } @GetMapping("/users/{id}") public Mono<User> getById(@PathVariable int id) { return repository.getById(id); } @GetMapping("/users") public Flux<User> getAll() { return repository.getAll(); } @PostMapping("/users") Mono<Void> save(@RequestBody Mono<User> user) { return repository.save(user); } @PostMapping("/users/bulk") Mono<Void> saveBulk(@RequestBody Flux<User> users) { return repository.saveAll(users); } @PutMapping("/users/{id}") Mono<User> update(@PathVariable int id, @RequestBody Mono<User> user) { return repository.update(id, user); } @DeleteMapping("/users/{id}") Mono<Void> remove(@PathVariable int id) { return repository.delete(id); } @AllArgsConstructor @Data public static class User { long id; String name; int age; } }
実行
curlで実行して確認してみます。
GET
id:1のユーザを取得
$ curl "http://localhost:8080/users/1" -H 'Accept: application/stream+json' {"id":1,"name":"Alice","age":20}
ログ
2018-01-13 21:38:24.015 INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription) 2018-01-13 21:38:24.016 INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1 : | request(1) 2018-01-13 21:38:24.016 INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1 : | onNext(WebfluxUserController.User(id=1, name=Alice, age=20)) 2018-01-13 21:38:24.040 INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1 : | request(31) 2018-01-13 21:38:24.041 INFO 61176 --- [ctor-http-nio-2] reactor.Mono.Just.1 : | onComplete()
存在しないidのユーザを取得
$ curl "http://localhost:8080/users/5"
ログ
2018-01-13 20:45:47.886 INFO 59813 --- [ctor-http-nio-2] reactor.Mono.Empty.1 : onSubscribe([Fuseable] Operators.EmptySubscription) 2018-01-13 20:45:47.887 INFO 59813 --- [ctor-http-nio-2] reactor.Mono.Empty.1 : request(1) 2018-01-13 20:45:47.887 INFO 59813 --- [ctor-http-nio-2] reactor.Mono.Empty.1 : onComplete()
全ユーザを取得
ヘッダに"Accept: application/stream+json"を指定してjson stream形式で取得する。
この場合、backpressureが有効。
$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json' {"id":1,"name":"Alice","age":20} {"id":2,"name":"Bobby","age":32} {"id":3,"name":"Cindy","age":41}
ログ
request(1),request(31)のようにbackpressureが効いている。
2018-01-13 21:40:45.917 INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) 2018-01-13 21:40:45.918 INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4 : | request(1) 2018-01-13 21:40:45.918 INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4 : | onNext(WebfluxUserController.User(id=1, name=Alice, age=20)) 2018-01-13 21:40:45.919 INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4 : | request(31) 2018-01-13 21:40:45.919 INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4 : | onNext(WebfluxUserController.User(id=2, name=Bobby, age=32)) 2018-01-13 21:40:45.919 INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4 : | onNext(WebfluxUserController.User(id=3, name=Cindy, age=41)) 2018-01-13 21:40:45.920 INFO 61176 --- [ctor-http-nio-5] reactor.Flux.Iterable.4 : | onComplete()
全ユーザを取得
Acceptヘッダを指定せずにコールすると、jsonで(配列)で返ってくる。
この場合、backpressureは無効。
$ curl "http://localhost:8080/users" [{"id":1,"name":"Alice","age":20},{"id":2,"name":"Bobby","age":32},{"id":3,"name":"Cindy","age":41}]
ログ
request(unbounded)になっている。
2018-01-13 21:39:41.138 INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) 2018-01-13 21:39:41.138 INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3 : | request(unbounded) 2018-01-13 21:39:41.138 INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3 : | onNext(WebfluxUserController.User(id=1, name=Alice, age=20)) 2018-01-13 21:39:41.138 INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3 : | onNext(WebfluxUserController.User(id=2, name=Bobby, age=32)) 2018-01-13 21:39:41.138 INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3 : | onNext(WebfluxUserController.User(id=3, name=Cindy, age=41)) 2018-01-13 21:39:41.138 INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3 : | onComplete() 2018-01-13 21:39:41.146 INFO 61176 --- [ctor-http-nio-4] reactor.Flux.Iterable.3 : | cancel()
POST
id:4のユーザを登録
$ curl "http://localhost:8080/users" -d '{"id":4,"name":"Dan","age":44}' -H 'Content-Type: application/stream+json'
ログ
2018-01-13 20:07:46.928 INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1 : onSubscribe(FluxMap.MapSubscriber) 2018-01-13 20:07:46.930 INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1 : request(unbounded) saved:4 2018-01-13 20:07:46.941 INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1 : onNext(WebfluxUserController.User(id=4, name=Dan, age=44)) 2018-01-13 20:07:46.942 INFO 64700 --- [ctor-http-nio-2] reactor.Mono.Map.1 : onComplete()
確認
$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json' {"id":1,"name":"Alice","age":20} {"id":2,"name":"Bobby","age":32} {"id":3,"name":"Cindy","age":41} {"id":4,"name":"Dan","age":44}
一度に複数のUserを登録
$ curl "http://localhost:8080/users/bulk" -d '{"id":5,"name":"Emily","age":55}{"id":6,"name":"Felna","age":62}' -H 'Content-Type: application/stream+json'
ログ
2018-01-13 20:15:01.649 INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onSubscribe(FluxMap.MapSubscriber) 2018-01-13 20:15:01.649 INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1 : request(unbounded) saved:5 2018-01-13 20:15:01.657 INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onNext(WebfluxUserController.User(id=5, name=Emily, age=55)) saved:6 2018-01-13 20:15:01.658 INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onNext(WebfluxUserController.User(id=6, name=Felna, age=62)) 2018-01-13 20:15:01.658 INFO 64815 --- [ctor-http-nio-2] reactor.Flux.Map.1 : onComplete()
確認
$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json' {"id":1,"name":"Alice","age":20} {"id":2,"name":"Bobby","age":32} {"id":3,"name":"Cindy","age":41} {"id":4,"name":"Dan","age":44} {"id":5,"name":"Emily","age":55} {"id":6,"name":"Felna","age":62}
PUT
id:1のUserを更新。
レスポンスとして更新されたUser情報が返ってくる。
$ curl -X PUT "http://localhost:8080/users/1" -d '{"id":1,"name":"Abcde","age":111}' -H 'Content-Type: application/stream+json' {"id":1,"name":"Abcde","age":111}
ログ
2018-01-13 20:20:26.911 INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2 : onSubscribe(FluxMap.MapSubscriber) 2018-01-13 20:20:26.911 INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2 : request(1) 2018-01-13 20:20:26.911 INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2 : onNext(WebfluxUserController.User(id=1, name=Abcde, age=111)) 2018-01-13 20:20:26.912 INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2 : request(1) 2018-01-13 20:20:26.912 INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2 : request(31) 2018-01-13 20:20:26.912 INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2 : onComplete() 2018-01-13 20:20:26.913 INFO 66211 --- [ctor-http-nio-3] reactor.Mono.Map.2 : cancel()
確認
$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json' {"id":1,"name":"Abcde","age":111} {"id":2,"name":"Bobby","age":32} {"id":3,"name":"Cindy","age":41}
DELETE
id:2のUserを削除。
$ curl -X DELETE "http://localhost:8080/users/2"
確認
$ curl "http://localhost:8080/users" -H 'Accept: application/stream+json' {"id":1,"name":"Alice","age":20} {"id":3,"name":"Cindy","age":41}
おわり。
【参考】
https://blog.ik.am/entries/417
https://www.slideshare.net/makingx/introduction-to-spring-webflux-jsug-sfa1
http://javasampleapproach.com/spring-framework/spring-webflux/springboot-webflux-annotation-based-programming-model
ソースコードは下記にあげました。
github.com