Spring WebFlux as a new module introduced in Spring 5.0 which provides a new programming model for developers, most of the features existed in Spring WebMVC have been ported to the WebFlux stack, including WebSocket support.
WebSocket is a standalone spec defined in RFC6455 , which provides bi-direction none-blocking communication between clients and server side.
In this post, we will start creating a simple chat application which uses Spring WebFlux based WebSocket APIs to build the server side, and uses Angular as client to communicate with the server side. Initially we will use a Reactor specific Sink as the message queue, and then we will switch to use the trailable cursor on the capped collections in MongoDB to simplify the work.
As introduced in my original post , Spring WebFlux embraces ReactiveStreams spec, heavily depends on Project Reactor . The WebSocket API in Spring WebFlux is not so rich as the one in Spring WebMVC, eg. it lacks general controller support and there is no way to adapt the STOMP protocol. In the Google result of "spring webflux websocket", you will find most of the solutions are based on the Reactor 's Processor, eg. How To Build a Chat App Using WebFlux, WebSockets & React is a great article to introduce the usage of WebSocket in Spring WebFlux, for more info about the UnicastProcessor
and other processors in Reactor, check How to use Processor in Reactor Java from Manh Phan.
Firstly let's create the server side. Generate a project skeleton using Spring Initializr.
- Project type: Gradle
- Language: Kotlin
- Spring Boot version :2.4.0M1
- Project Metadata/Java: 14
- Dependencies: Reactive Web
Hit the Generate button to download the generated archive, and extract it into your local disk.
Make sure you have installed the latest JDK (AdoptOpenJDK is highly recommended) 14, then import the source codes into your favorite IDE, eg. Intellij IDEA. IDEA will resolve the dependencies and build the project automatically.
To enable WebSocket in Spring WebFlux application, just declare a simple WebSocketHandlerAdapter
bean.
@Bean
fun handlerAdapter(): WebSocketHandlerAdapter = WebSocketHandlerAdapter()
And set up the WebSocket endpoints in a HandlerMapping
bean.
@Bean
fun webSocketMapping(mapper: ObjectMapper): HandlerMapping? {
val map = mapOf("/ws/messages" to ChatSocketHandler(mapper))
val simpleUrlHandlerMapping = SimpleUrlHandlerMapping().apply {
urlMap = map
order = 10
}
return simpleUrlHandlerMapping
}
Here we will use a custom ChatSocketHandler
to receive from and send message to the endpoint /ws/messages. WebSocket supports text and binary based payload in the message, here we only use text message, we will convert our message to json string by Jackson ObjectMapper
.
Let's have a look at the complete codes of ChatSocketHandler
.
class ChatSocketHandler(val mapper: ObjectMapper) : WebSocketHandler {
val sink = Sinks.replay<Message>(100);
val outputMessages: Flux<Message> = sink.asFlux();
override fun handle(session: WebSocketSession): Mono<Void> {
println("handling WebSocketSession...")
session.receive()
.map { it.payloadAsText }
.map { Message(id= UUID.randomUUID().toString(), body = it, sentAt = Instant.now()) }
.doOnNext { println(it) }
.subscribe(
{ message: Message -> sink.next(message) },
{ error: Throwable -> sink.error(error) }
);
return session.send(
Mono.delay(Duration.ofMillis(100))
.thenMany(outputMessages.map { session.textMessage(toJson(it)) })
)
}
fun toJson(message: Message): String = mapper.writeValueAsString(message)
}
The ChatSocketHandler
implements WebSocketHandler
interface, in the handle
method, it will shake hands with a WebSocket client when it is connected. Here when receiving a message from a WebSocket client, we will cache it into a replayable Sinks.StandaloneFluxSink
, and retrieve the messages from our former Sink, and send the cached messages back to the WebSocket client.
The
UnicastProcessor
andReplayProcessor
, etc. are marked as deprecated in the latest version of Reactor, so here we use the newestSinks
instead.
Declare a simple POJO to present the WebSocket message payload in a chat application.
data class Message @JsonCreator constructor(
@JsonProperty("id") var id: String? = null,
@JsonProperty("body") var body: String,
@JsonProperty("sentAt") var sentAt: Instant = Instant.now()
)
Jackson ObjectMapper
requires a none-arguments constructor or a JsonCreator
annotated constructor when serializing an object.
Now run the server application by clicking the Run icon besides the main
fun in the editor of IDEA or executing the following command .
./gradlew bootRun
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :bootRun
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.4.0-M1)
2020-07-18 13:57:57.002 INFO 14776 --- [ main] c.e.demo.WebSocketServerApplicationKt
...
<==========---> 80% EXECUTING [1m 1s]
> :bootRun
Let's move to the frontend building - creating a simple Angular app to shake hands with the server side.
I assume you have installed the latest NodeJS and Angular CLI.
Follow the official Getting started guide to setup Angular environment and initialize a new project. Then open the project in your favorite IDEs, eg. VS Code.
To make things simple, we will contribute codes in the top-level AppComponent
directly. In a real world application, you should follow the official Angular coding Style Guide to structure your project.
export class AppComponent implements OnInit, OnDestroy {
title = 'client';
message = '';
messages: any[];
socket: WebSocket;
constructor(private zone: NgZone) {
}
ngOnInit(): void {
this.messages = [];
this.socket = new WebSocket("ws://localhost:8080/ws/messages");
this.socket.onmessage = event => {
console.log('onmessage:' + event)
this.zone.run(() => {
this.addMessage(event.data);
})
}
}
addMessage(msg: any) {
this.messages = [...this.messages, msg];
//console.log("messages::" + this.messages);
}
ngOnDestroy(): void {
}
sendMessage() {
console.log("sending message:" + this.message);
this.socket.send(this.message);
}
}
Here we initialize a WebSocket connection in the ngOnInit
method, and listen the onmessage
to receive a message from the server side. And in the addMessage
method it calls WebSocket.send
to send messages to the server side.
More info about the details of WebSocket API, please go to MDN WebSocket page.
Let's move to the AppComponent
template file, app.component.html
.
<div fxFlex>
<p *ngFor="let m of messages">
{{m}}
</p>
</div>
<div>
<form fxLayout="row baseline" #messageForm="ngForm" (ngSubmit)="sendMessage()">
<mat-form-field fxFlex>
<input name="message" fxFill matInput #messageCtrl="ngModel" [(ngModel)]="message" required />
<mat-error fxLayoutAlign="start" *ngIf="messageCtrl.hasError('required')">
Message body can not be empty.
</mat-error>
</mat-form-field>
<div>
<button mat-button mat-icon-button type="submit" [disabled]="messageForm.invalid || messageForm.pending">
<mat-icon>send</mat-icon>
</button>
</div>
</form>
</div>
It just includes a simple form to submit messages, and use a ngFor directive to display the received messages.
Next run the client application.
npm run start
Open two browser windows, and type some words in the input box and hit send button.
Awesome, it works.
In the built-in developer tools panel in Firefox, you can track the details of the shakehands progress of a WebSocket connection request between client and serve side.
The request headers include some specific items, the Connection: keep-alive, Upgrade and Upgrade: websocket are required to start a WebSocket connection.
GET /ws/messages HTTP/1.1
Host: localhost:8080
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:78.0) Gecko/20100101 Firefox/78.0
Accept: */*
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Sec-WebSocket-Version: 13
Origin: http://localhost:4200
Sec-WebSocket-Extensions: permessage-deflate
Sec-WebSocket-Key: tsHaoQIeDW0eAk/fHn5kqw==
Connection: keep-alive, Upgrade
Cookie: PLAY_SESSION=eyJhbGciOiJIUzI1NiJ9.eyJkYXRhIjp7ImNzcmZUb2tlbiI6IjBmODcyYzUyMDM3YmJjN2UwZTI4YWRiNjQ4YTA0MGYyNjBiMDVmNzYtMTU2Njg4OTU4MjM4NS0wOThjNDQ5ZjVhOTg4Nzk1YmU0NjQ4ZmUifSwibmJmIjoxNTY2ODg5NTgyLCJpYXQiOjE1NjY4ODk1ODJ9.F4lTngIoAlp8F_vVvLsmw4XSYBtpIGd9yNxlff-8Iuo
Pragma: no-cache
Cache-Control: no-cache
Upgrade: websocket
And the response headers looks like this.
HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
sec-websocket-accept: zq1b1kR56+jGNQ4v1bDr37jfTBA=
The complete codes is hosted on my github account, check the feat/reactor-sinks branch for this demo.
Top comments (0)