Spring WebfluxのWebsocketを使ってサーバー間通信
やること
Webfluxを導入してみたので、既存のspring-bootに入っているwebsocketからwebfluxのサポートするwebsocket実装に切り替える。
サーバー間通信で使用するので、通信としてのServerもClientもJavaで実装する(ブラウザは使用しない)
事前準備
SPRING INITIALIZR を使ってテンプレートプロジェクトを作成します。
SpringBootは2.0系を選択して、ReactiveWebをDependenciesに入れておきます。
以下の環境で作業を行います。
- Java9
- SpringBoot2.0.0M6
Websocketクラスの実装
Serverサイドのマッピングの設定
FluxWebSocketConfig.java
Mappingを設定することで指定したパスでwebsocket通信が受付できるようになります。
@Configuration public class FluxWebSocketConfig { @Autowired private FluxWebSocketHandler handler; @Bean public HandlerMapping webSocketMapping() { Map<String, WebSocketHandler> map = new HashMap<>(); map.put("/path", handler); SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setUrlMap(map); mapping.setOrder(-1); // before annotated controllers return mapping; } @Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
FluxWebSocketHandler.java
Handlerでは通信後の処理を記述しています。最初の通信でhandle()がコールされるので、ここでEmitterProcessorを返却するようにしています。
セッション確立後にsendMessageを使ってメッセージを配信するようにしていますが、ここでEmitterProcessorに対してMessageを渡すことでClient側に伝播されます。
@Slf4j @Component public class FluxWebSocketHandler implements WebSocketHandler { private final EmitterProcessor<String> in = EmitterProcessor.create(); public void sendMessage(String message) { log.info("Send Message:" + message); in.onNext(message); } @Override public Mono<Void> handle(WebSocketSession session) { session.receive() .map(WebSocketMessage::getPayloadAsText) .log() .subscribe(); return session.send(Mono.just(session.textMessage("connected!"))).then(session.send(in.map(session::textMessage))); } }
session.receive()の処理はClient側からのメッセージを受けないのであれば不要です。
Clientサイドのリクエストの設定
FluxWebSocketClientSample.java
リクエスト処理は別スレッドで起動しています。(本当はWebSocketClientの機能で別スレッド起動ができるはずです。。。)
@Slf4j @Component public class FluxWebSocketClientSample { @PostConstruct private void init() { ExecutorService executor = Executors.newSingleThreadExecutor(); Runnable runner = this::connect; executor.execute(runner); } private void connect() { try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } WebSocketClient client = new ReactorNettyWebSocketClient(); client.execute(URI.create("ws://localhost:8080/path"), session -> session.receive() .map(WebSocketMessage::getPayloadAsText) .log() .then() ) .subscribe(); } }
↑のサンプルではServer→Clientの送信だけですが、双方向でメッセージを送りたい場合はこちらでもEmitterProcessorを設定します。
動作確認
./gradlew bootRunでプロセスを起動して.logメソッドによるメッセージが表示されたら成功です。
適当なクラスを用意してFluxWebSocketHandler.sendMessage()を呼び出すことで任意のメッセージを伝播することができるはずです。
最後に
全般的にReactorNettyWebSocketClientやEmitterProcessorの仕様・使い方がわかりません。。。まとまっている資料やサイトがあれば教えてください。。。