In this post, we will use Server Sent Events instead of WebSocket to broadcast messages to clients.
If you have missed the posts about the implementation of the WebSocket version, check here and part 2.
Unlike WebSocket, SSE (Server Sent Events) is a one-way direction protocol, and used for the server-to-client messaging case, and it only accepts text based messages. A good use case is to send notifications to clients, like that done in the push notification feature of the mobile apps. In this demo, we will use it to update the message list in the client.
From a developer's perspective, there is no much difference between Server Sent Events and HTTP based RESTful APIs, SSE requires a specific media type - text/event-stream
when emitting messages to clients. In the client application, use the browser built-in EventSource
to subscribe it.
If you have used SSE in Spring MVC or standard Jakarta EE platform, you could have to emit the event manually. SSE concept is a good match with the Reactor's Flux
, which serves an infinite stream to clients. Spring WebFlux simplifies the work, a SSE endpoint is no difference from a general RESTful APIs in a Controller
.
Firstly let's create the server side. Generate a project skeleton using Spring Initializr.
- Project type: Gradle
- Language: Kotlin
- Spring Boot version :2.4.0M1
- Project Metadata/Java: 14
- Dependencies: Reactive Web
Hit the Generate button to download the generated archive, and extract it into your local disk.
Make sure you have installed the latest JDK 14 (AdoptOpenJDK is highly recommended), then import the source codes in your IDEs. eg. Intellij IDEA, and start to implement the server side.
We will skip the demo of Reactor's Sinks here, and directly use a MongoDB capped collection as the backend message queue.
Create a Message
document definition and a Repository
for it.
interface MessageRepository : ReactiveMongoRepository<Message, String> {
@Tailable
fun getMessagesBy(): Flux<Message>
}
@Document(collection = "messages")
data class Message(@Id var id: String? = null, var body: String, var sentAt: Instant = Instant.now())
Create a @RestController
to handle messages.
@RestController()
@RequestMapping(value = ["messages"])
@CrossOrigin(origins = ["http://localhost:4200"])
class MessageController(private val messages: MessageRepository) {
@PostMapping
fun hello(@RequestBody p: String) =
this.messages.save(Message(body = p, sentAt = Instant.now())).log().then()
@GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun messageStream(): Flux<Message> = this.messages.getMessagesBy().log()
}
Here we use a @CrossOrigin
annotation to accept origins of the Angular client application.
Start a MongoDB service as follows. You have to prepare a capped messages collection, check this post for more details.
docker-compose up mongodb
Run the following command to start the server side application.
./gradlew bootRun
I also wrote a simple integration test for this SSE, check here and play it yourself if you are interested in it.
Let's move to the frontend application, create a new Angular project, or refactor the codes we have done in the former post.
export class AppComponent implements OnInit, OnDestroy {
title = 'client';
message = '';
messages: any[];
sub: Subscription;
constructor(private zone: NgZone, private http: HttpClient) {
}
getMessages(): Observable<any> {
return Observable.create(
observer => {
let source = new EventSource("http://localhost:8080/messages");
source.onmessage = event => {
this.zone.run(() => {
observer.next(event.data)
})
}
source.onerror = event => {
this.zone.run(() => {
observer.error(event)
})
}
}
)
}
ngOnInit(): void {
this.messages = [];
this.sub = this.getMessages().subscribe({
next: data => {
console.log(data);
this.addMessage(data);
},
error: err => console.error(err)
});
}
addMessage(msg: any) {
this.messages = [...this.messages, msg];
//console.log("messages::" + this.messages);
}
ngOnDestroy(): void {
this.sub && this.sub.unsubscribe();
}
sendMessage() {
console.log("sending message:" + this.message);
this.http
.post(
"http://localhost:8080/messages",
this.message
)
.subscribe({
next: (data) => console.log(data),
error: (error) => console.log(error),
complete: () => {
console.log('complete');
this.message = '';
}
});
}
}
Here we observe a EventSource
connection in the ngOnInit
method, and listen the MessageEvent
through the onmessage
hook to receive new messages from the server side. The sendMessage
uses Angular HttpClient
to send a message to the server side.
More info about the details of EventSource API, please go to MDN EventSource page.
There is no changes in the app.component.html
as we discussed in the former WebSocket version.
<div fxFlex>
<p *ngFor="let m of messages">
{{m}}
</p>
</div>
<div>
<form fxLayout="row baseline" #messageForm="ngForm" (ngSubmit)="sendMessage()">
<mat-form-field fxFlex>
<input name="message" fxFill matInput #messageCtrl="ngModel" [(ngModel)]="message" required />
<mat-error fxLayoutAlign="start" *ngIf="messageCtrl.hasError('required')">
Message body can not be empty.
</mat-error>
</mat-form-field>
<div>
<button mat-button mat-icon-button type="submit" [disabled]="messageForm.invalid || messageForm.pending">
<mat-icon>send</mat-icon>
</button>
</div>
</form>
</div>
Next run the client application.
npm run start
Open two browser windows(or two different browsers), type some messages in each window and experience it.
Get a copy of the complete codes from my github and play it yourself.
Top comments (1)
What is the win to use SSE over websocket?