Things of interesting

プログラミングに関する技術ネタの備忘録

Spring WebfluxのWebsocketを使ってサーバー間通信

やること

Webfluxを導入してみたので、既存のspring-bootに入っているwebsocketからwebfluxのサポートするwebsocket実装に切り替える。
サーバー間通信で使用するので、通信としてのServerもClientもJavaで実装する(ブラウザは使用しない)

事前準備

SPRING INITIALIZR を使ってテンプレートプロジェクトを作成します。
SpringBootは2.0系を選択して、ReactiveWebをDependenciesに入れておきます。

以下の環境で作業を行います。

  • Java9
  • SpringBoot2.0.0M6

Websocketクラスの実装

Serverサイドのマッピングの設定

FluxWebSocketConfig.java
Mappingを設定することで指定したパスでwebsocket通信が受付できるようになります。

@Configuration
public class FluxWebSocketConfig {
    
    @Autowired
    private FluxWebSocketHandler handler;

    @Bean
    public HandlerMapping webSocketMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", handler);

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1); // before annotated controllers
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

FluxWebSocketHandler.java
Handlerでは通信後の処理を記述しています。最初の通信でhandle()がコールされるので、ここでEmitterProcessorを返却するようにしています。

セッション確立後にsendMessageを使ってメッセージを配信するようにしていますが、ここでEmitterProcessorに対してMessageを渡すことでClient側に伝播されます。

@Slf4j
@Component
public class FluxWebSocketHandler implements WebSocketHandler {
    private final EmitterProcessor<String> in = EmitterProcessor.create();

    public void sendMessage(String message) {
        log.info("Send Message:" + message);
        in.onNext(message);
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        session.receive()
        .map(WebSocketMessage::getPayloadAsText)
        .log()
        .subscribe();
    return session.send(Mono.just(session.textMessage("connected!"))).then(session.send(in.map(session::textMessage)));
    }
}

session.receive()の処理はClient側からのメッセージを受けないのであれば不要です。

Clientサイドのリクエストの設定

FluxWebSocketClientSample.java
リクエスト処理は別スレッドで起動しています。(本当はWebSocketClientの機能で別スレッド起動ができるはずです。。。)

@Slf4j
@Component
public class FluxWebSocketClientSample {

    @PostConstruct
    private void init() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Runnable runner = this::connect;
        executor.execute(runner);
    }

    private void connect() {
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        WebSocketClient client = new ReactorNettyWebSocketClient();
        
        client.execute(URI.create("ws://localhost:8080/path"),
                session -> session.receive()
                        .map(WebSocketMessage::getPayloadAsText)
                        .log()
                        .then()
                )
                .subscribe();

    }
}

↑のサンプルではServer→Clientの送信だけですが、双方向でメッセージを送りたい場合はこちらでもEmitterProcessorを設定します。

動作確認

./gradlew bootRunでプロセスを起動して.logメソッドによるメッセージが表示されたら成功です。
適当なクラスを用意してFluxWebSocketHandler.sendMessage()を呼び出すことで任意のメッセージを伝播することができるはずです。

最後に

全般的にReactorNettyWebSocketClientやEmitterProcessorの仕様・使い方がわかりません。。。まとまっている資料やサイトがあれば教えてください。。。