The problem
I am currently assigned with the task of implementing a notification feature for my side project application. Whenever an user comments on or reacts to another user's posts, an event is generated. Subsequently, the server sends a message to notify the owner of the post regarding this event.
TLDR: You can find the final code here
After exploring numerous solutions on the internet, I discovered that we can achieve the goal by using long polling, websockets or server-sent events. However, since the server solely interact with the client and require real-time notifications, there is no necessity to utilize long pooling and websockets.
Getting started
According to Wikipedia:
Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via an HTTP connection, and describes how servers can initiate data transmission towards clients once an initial client connection has been established.
Talking is cheap, let's get our hands dirty by coding up this notification server.
// main.go
package main
import (
"fmt"
"log"
"net/http"
"time"
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("Client connected")
w.Header().Set("Access-Control-Allow-Origin", "*") // must have since it enable cors
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Println("Could not init http.Flusher")
}
for {
select {
case <-r.Context().Done():
fmt.Println("Connection closed")
return
default:
fmt.Println("case message... sending message")
fmt.Fprintf(w, "data: Ping\n\n")
flusher.Flush()
time.Sleep(5 * time.Second)
}
}
}
func main() {
router := http.NewServeMux()
router.HandleFunc("/sse", sseHandler)
log.Fatal(http.ListenAndServe(":3500", router))
}
You can conduct a quick test using the curl
command
curl http://localhost:3500/event
You should see something similar to this:
data: Ping
data: Ping
...
In the code above, we define a HTTP handler sseHandler
that handles requests to the /sse
endpoint. Inside the handler, we set appropriate headers for SSE and send an event to the client every 5 seconds.
The header plays a vital role since the client relies on it for establishing the connection. To gain a better understanding of the importance of headers, you can experiment by disabling one of them and observing how it impact the client connection (spoiler alert: it seems that Content-Type: text/event-stream
affects the connection establishment).
For the client-side, we will generate a front-end project utilizing a framework of your choice or even pure/vanilla JavaScript
.
npx degit solidjs/templates/ts sse-client
cd sse-client
# yarn, npm i, pnpm i to install dependencies
import { Component, createEffect, createSignal, onCleanup } from "solid-js";
const App: Component = () => {
const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
createEffect(() => {
const ev = new EventSource("http://localhost:3500/sse");
ev.onmessage = (e) => {
console.log({ e });
};
setEventSource(ev);
});
onCleanup(() => {
eventSource()?.close();
});
return <main>Server-sent events</main>;
};
export default App;
Visit the url http://localhost:3000, upon doing so, you should see something similar to the provided GIF:
That is it, we have successfully create the SSE endpoint, establish a connection to the server using JavaScript
. However, you might ask, "Okay, but how can I implement notification with this". Do you remember our requirement?
Whenever an user comments on or reacts to another user's posts, an event is generated. Subsequently, the server sends a message to notify the owner of the post regarding this event.
We have completely implemented the server sends message to notify
part. Therefore, we are left with only a solitary task remaining, that is to dispatch a message when a user initiates an event. We will execute this functionality by leveraging the Go
channel.
Flushing messages upon triggering an event
We need to utilize an effective communication method between functions and goroutines, and this is where go channel shine. Our plan would be creating and storing a channel for each SSE connection using a global store. Additionally, we will introduce a new endpoint, /time
, to trigger an event. When a user calls this endpoint, we will broadcast the message to all the channels in the store.
var CHANNEL_STORE []*chan string = make([]*chan string, 0)
func removeChannel(ch *chan string) {
pos := -1
storeLen := len(CHANNEL_STORE)
for i, msgChan := range CHANNEL_STORE {
if ch == msgChan {
pos = i
}
}
if pos == -1 {
return
}
CHANNEL_STORE[pos] = CHANNEL_STORE[storeLen-1]
CHANNEL_STORE = CHANNEL_STORE[:storeLen-1]
fmt.Println("Connection remains: ", len(CHANNEL_STORE))
}
func broadcast(msg string) {
for _, ch := range CHANNEL_STORE {
*ch <- msg
}
}
func getTime(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
msg := time.Now().Format("15:04:05")
broadcast(msg)
}
func sseHandler(w http.ResponseWriter, r *http.Request) {
ch := make(chan string)
CHANNEL_STORE = append(CHANNEL_STORE, &ch)
fmt.Println("Client connected: ", len(CHANNEL_STORE))
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
defer func() {
close(ch)
removeChannel(&ch)
fmt.Println("Client closed connection")
}()
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Println("Could not init http.Flusher")
}
for {
select {
case message := <-ch:
fmt.Println("case message... sending message")
fmt.Println(message)
fmt.Fprintf(w, "data: %s\n\n", message)
flusher.Flush()
case <-r.Context().Done():
fmt.Println("Client closed connection")
return
}
}
}
func main() {
router := http.NewServeMux()
router.HandleFunc("/sse", sseHandler)
router.HandleFunc("/time", getTime)
log.Fatal(http.ListenAndServe(":3500", router))
}
Last but not least, we need to update our client code.
import { Component, createEffect, createSignal, onCleanup } from "solid-js";
const App: Component = () => {
const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
const [time, setTime] = createSignal<string>("");
createEffect(() => {
const ev = new EventSource("http://localhost:3500/sse");
ev.onmessage = (e) => {
console.log({ e });
setTime(e.data);
};
setEventSource(ev);
});
async function handleGetTime() {
const res = await fetch("http://localhost:3500/time");
if (res.status !== 200) {
console.log("Could not connect to the server");
} else {
console.log("OK");
}
}
onCleanup(() => {
eventSource()?.close();
});
return (
<main>
Time: {time()}
<button onClick={handleGetTime}>Get time</button>
</main>
);
};
export default App;
Your result should resemble this GIF:
As you can see, I have launched two browser tabs in the demo, and upon triggering the /time/
endpoint. Both tabs receive the updated time immediately. This is achieved by broadcasting the message to all channels. We can further enhance the implementation by utilizing a Go
map, associating the user identity as the key and a slice of channels as the corresponding value, enabling targeted message broadcasts.
// main.go
type SSEConn struct {
mu sync.Mutex
clients map[string][]chan string
}
func NewSSEConn() *SSEConn {
return &SSEConn{clients: make(map[string][]chan string)}
}
func (p *SSEConn) addClient(id string) *chan string {
p.mu.Lock()
defer func() {
fmt.Println("Clients in add: ", p.clients)
for k, v := range p.clients {
fmt.Printf("Key: %s, value: %d\n", k, len(v))
fmt.Println("Channels from id=", id, v)
}
p.mu.Unlock()
}()
c, ok := p.clients[id]
if !ok {
client := []chan string{make(chan string)}
p.clients[id] = client
return &client[0]
}
newCh := make(chan string)
p.clients[id] = append(c, newCh)
return &newCh
}
func (p *SSEConn) removeClient(id string, conn chan string) {
p.mu.Lock()
defer func() {
fmt.Println("Clients in remove: ", p.clients)
for k, v := range p.clients {
fmt.Printf("Key: %s, value: %d", k, len(v))
}
p.mu.Unlock()
}()
c, ok := p.clients[id]
if !ok {
return
}
pos := -1
for i, ch := range c {
if ch == conn {
pos = i
}
}
if pos == -1 {
return
}
close(c[pos])
c = append(c[:pos], c[pos+1:]...)
if pos == 0 {
delete(p.clients, id)
}
}
func (p *SSEConn) broadcast(id string, data, event string) {
p.mu.Lock()
defer p.mu.Unlock()
c, ok := p.clients[id]
if !ok {
return
}
for _, ch := range c {
ch <- fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)
}
}
In the provided code snippet, I define the SSEConn
struct and implement methods such as addClient
, removeClient
and broadcast
. The SSEConn
consists of two fields: clients
and mu
(Mutex). The mu
field plays a crucial role in preventing race conditions, you can read more about that here and here. The clients
field stores the clients' IDs along with their corresponding channels. Also, it is important to remember to update the handler implementation to accept the user ID.
var sseConn = NewSSEConn()
func getTime(w http.ResponseWriter, r *http.Request) {
id := strings.TrimPrefix(r.URL.Path, "/time/")
w.Header().Set("Access-Control-Allow-Origin", "*")
msg := time.Now().Format("15:04:05")
sseConn.broadcast(id, msg, "timeEvent")
}
func sseHandler(w http.ResponseWriter, r *http.Request) {
id := strings.TrimPrefix(r.URL.Path, "/sse/")
ch := sseConn.addClient(id)
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
defer sseConn.removeClient(id, *ch)
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Println("Could not init http.Flusher")
}
for {
select {
case message := <-*ch:
fmt.Println("case message... sending message")
fmt.Println(message)
fmt.Fprintf(w, message)
flusher.Flush()
case <-r.Context().Done():
fmt.Println("Client closed connection")
return
}
}
}
func main() {
router := http.NewServeMux()
router.HandleFunc("/sse/", sseHandler)
router.HandleFunc("/time/", getTime)
log.Fatal(http.ListenAndServe(":3500", router))
}
Final client code:
import {
Component,
createEffect,
createSignal,
JSX,
onCleanup,
} from "solid-js";
const App: Component = () => {
const [eventSource, setEventSource] = createSignal<EventSource | undefined>();
const [time, setTime] = createSignal("");
const [id, setId] = createSignal("");
async function handleGetTime() {
const res = await fetch(`http://localhost:3500/time/${id()}`);
if (res.status !== 200) {
console.log("Could not connect to the server");
} else {
console.log("OK");
}
}
function handleChange(e: InputEvent) {
setId((e.currentTarget as HTMLInputElement).value);
}
function handleConnect() {
const ev = new EventSource(`http://localhost:3500/sse/${id()}`);
ev.addEventListener("timeEvent", (e) => {
console.log({ event: e.type });
console.log({ data: e.data });
setTime(e.data);
});
setEventSource(ev);
}
onCleanup(() => {
eventSource()?.close();
});
return (
<main>
Time: {time()}
<button onClick={handleGetTime}>Get time</button>
<input type="text" onInput={handleChange} value={id()} />
<button onClick={handleConnect}>Connect</button>
</main>
);
};
export default App;
The final result:
By the end of this article, you have gained insight about SSE, Go channels, and the implementation of notification functionality using Go
. I trust that you have found this article enjoyable and useful. Phew, considering the article's length, I will bring it to a close here.
Top comments (1)
Thanks, I look forward to more distributed cases!