前回は、リアクティブプログラミングの概念を紹介しました。リアクティブプログラミングは
そこで、今回はReactorやSpringといったリアクティブプログラミングのためのライブラリを使って実際に触れてみることで、データストリームやバックプレッッシャのイメージを膨らませていきます。プログラミングに不慣れな方でも簡易に実装できますので、ぜひ試してみてください。
PublisherとSubscriber - Subscriptionの仕組み
前回はPublisher(生産者)とSubscriber(消費者)の概念を説明しましたが、今回はもう少し詳しく説明しましょう。
Reactive Streamsの実装としては、Akka StreamsやMongoDB、RxJavaといったものがあります。今回は後述のSpringとの相性が良いReactor Coreを使って実装します。
まずは、依存ライブラリをMavenで設定しましょう。pom.xmlのリポジトリ及び依存関係の設定は以下の通りです。
<repositories>
<repository>
<id>spring-snapshot</id>
<name>Spring Snapshot Repository</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.0.M1</version>
</dependency>
</dependencies>
ちなみにIDEは何でも構いませんが、筆者はNetbeansで実施しており、「Maven」カテゴリの「Javaアプリケーション」プロジェクトを使っています。
HelloReactorクラスを作成し、Fluxと呼ばれるPublisherに対し赤・青・黄を設定し、大文字に変換した上で登録(subscribe)します。
public class HelloReactor {
public static void main(String[] args) {
Flux.just("red", "blue", "yellow")
.map(String::toUpperCase)
.log()
.subscribe();
}
}
「ファイルの実行」によりHelloReactorを実行します。
ロガー(logメソッド)により、以下の通り標準出力されます。onNextイベントにより順次、赤・青・黄の順に処理されていることがわかります。
6 01, 2017 2:05:40 午前 reactor.util.Loggers$JdkLogger info 情報: | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
6 01, 2017 2:05:40 午前 reactor.util.Loggers$JdkLogger info 情報: | request(unbounded)
6 01, 2017 2:05:40 午前 reactor.util.Loggers$JdkLogger info 情報: | onNext(RED)
6 01, 2017 2:05:40 午前 reactor.util.Loggers$JdkLogger info 情報: | onNext(BLUE)
6 01, 2017 2:05:40 午前 reactor.util.Loggers$JdkLogger info 情報: | onNext(YELLOW)
6 01, 2017 2:05:40 午前 reactor.util.Loggers$JdkLogger info 情報: | onComplete()
デフォルトのSubscriberの挙動はrequest(unbound)、つまりPublisher(生産者)が流そうとする赤・青・黄「全て」のデータが流れます。今度は、バックプレッシャによりストリームに流せる情報を2つまでに限定してみましょう。イメージは以下です。
先ほどのコードのsubscribe処理をオーバーライドすることで、バックプレッシャを実現します。
Flux.just("red", "blue", "yellow")
.map(String::toUpperCase)
.log()
.subscribe(new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(2); // データの上限を2に設定
}
@Override
public void onNext(String t) {
if("BLUE".equals(t)){
subscription.request(2); // 一定条件を満たす場合に追加で実行
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
実行結果は以下の通りです。バックプレッシャにより赤・青の二つのデータに処理が限定され、その後黄色が処理されていることがわかります。
6 01, 2017 2:57:36 午前 reactor.util.Loggers$JdkLogger info 情報: | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
6 01, 2017 2:57:36 午前 reactor.util.Loggers$JdkLogger info 情報: | request(2)
6 01, 2017 2:57:36 午前 reactor.util.Loggers$JdkLogger info 情報: | onNext(RED)
6 01, 2017 2:57:36 午前 reactor.util.Loggers$JdkLogger info 情報: | onNext(BLUE)
6 01, 2017 2:57:36 午前 reactor.util.Loggers$JdkLogger info 情報: | request(2)
6 01, 2017 2:57:36 午前 reactor.util.Loggers$JdkLogger info 情報: | onNext(YELLOW)
6 01, 2017 2:57:36 午前 reactor.util.Loggers$JdkLogger info 情報: | onComplete()
Subscriber側でリクエストの上限を設定できるだけでなく、すべてのデータを流しきった後の完了処理(onComplete)や、エラー時処理(onError)を実装することも可能です。
Spring Initializrで簡単にリアクティブプログラミングによるWebアプリ構築
前章ではあくまでJavaのスタンドアロンアプリケーションでしたが、本章ではWebアプリを構築します。
ブラウザからSpring Initializrにアクセスし、以下の通り入力します。Spring Initializrは連載第六回でも紹介しましたが、Hello Worldを実行するまでの手順を可能な限り簡易にしてくれる、いわゆるnutshell(ごく小さな単位)です。今回はSearch for dependenciesにWebではなく、Reactive Webを設定します。
- プロジェクト種別 : Maven Project
- バージョン : 2.0.0(SNAPSHOT)
- Group : com.example
- Artifact : demo
- Search for dependencies : Reactive Web
Generate Projectをクリックすると、資材一式(demo.zip)がダウンロードされます。
資材の中にあるDemoApplication.javaをテキストエディタ等で開き、以下の「追加」の5箇所を編集します。最小限のアノテーション設定などのみで、REST APIおよびリアクティブプログラムを構築できます。
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping; // 追加(1)
import org.springframework.web.bind.annotation.RestController; // 追加(2)
import reactor.core.publisher.Mono; // 追加(3)
@SpringBootApplication
@RestController // 追加(4)
public class DemoApplication {
// 追加(5)
@GetMapping("/")
Mono hello() {
return Mono.just("Hello World!");
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
ターミナルから以下を実行します。8080ポートでTomcatが起動されます。
$ ./mvnw spring-boot:run
[INFO] Scanning for projects...
[INFO]
[INFO] -----------------------------------
[INFO] Building demo 0.0.1-SNAPSHOT
<中略>
2017-05-22 01:02:50.789 INFO 21387 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 80802017-05-22 01:02:50.795 INFO 21387 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 2.588 seconds (JVM running for 6.938)
ブラウザから「http://localhost:8080/」にアクセスすると、以下の通り表示されます。
![]() |
Spring Bootを使ったHello World |
ブラウザからのリクエストパラメータを受け付けるには以下の通りhelloメソッドに引数を追加します。
//前略
@GetMapping("/hello")
Mono hello(Mono<User> user) {
return user.map(u -> "Hello " + u.getId());
}
public class User {
private String id;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
再度Spring Bootアプリケーションを起動し、「http://localhost:8080/hello3/?id=yujishono」にアクセスすると以下の通り表示されます。
![]() |
GETでのリクエストパラメータを使ったHello World |
ここまでのSpring Initializrを使った説明では、Subscriptionが出てこないことに違和感を感じた方もいらっしゃるのではないでしょうか。実はSpring InitializrのReactive WebではSubscriptionはSpringのServletHttpHandlerAdapterクラスが実施しているのです。興味がある方は実装 を確認してみるのも良いでしょう。
* * *
Reactor CoreやSpringを使ってリアクティブプログラミングに触ることで、データストリームやバックプレッシャのイメージを掴んでいただきました。
今回は、データストリームの処理として、大文字に変換する処理のみでしたが、特定条件に当てはまった処理のみをフィルタ(filterメソッド利用)したり、複数のデータをまとめ(zipメソッド)たりするなど、さまざまなAPIが提供されています。
Java8でStream APIが導入されるなどストリーム処理は今後のプログラミングにおいて大きな役割を果たしていくことになります。今後、ビジネスでの適用を想定される方は、併せて確認してみてください。
著者紹介
![]() |
正野 勇嗣 (SHONO Yuji ) - NTTデータ シニアスペシャリスト
2011年頃まで開発自動化技術のR&Dに従事。その後、開発プロジェクト支援やトラブルシューティング等に主戦場を移す。「ソースコード自動生成」に加えて、JenkinsやMaven等の「ビルド自動化」、JsTestDriverやSelenium等の「テスト自動化」を扱うようになり、多様化する開発自動化技術動向に興味。
最近は第四の自動化であるInfrastructure as Code等の「基盤自動化」の魅力に惹かれている。開発自動化技術に関する雑誌・記事執筆も行う。2児のパパ。