In this article, we will look at different communication patterns we can implement in our distributed applications using NATS.
We will be using Go in this article, but NATS has over 40 client language implementations such as JAVA, C#, Python, Rust, Elixir, and many more. As always, all the code will be available in this repository.
Setup
Let's initialize a simple go project.
Note: This example is Linux/MacOS based but NATS is also supported on Windows.
$ go mod init example
Install the nats.go package
$ go install github.com/nats-io/nats.go/@latest
We will use this kind of folder structure
.
├── cmd
│ ├── publish-subscribe
│ │ └── main.go
│ ├── request-reply
│ │ └── main.go
│ └── queue-groups
│ └── main.go
├── go.mod
└── go.sum
Final step is to start our local nats-server
$ nats-server
[18661] 2022/03/02 00:41:46.170012 [INF] Starting nats-server
[18661] 2022/03/02 00:41:46.170585 [INF] Version: 2.7.3
[18661] 2022/03/02 00:41:46.170589 [INF] Git: [not set]
[18661] 2022/03/02 00:41:46.170592 [INF] Name: NAIAIQUT4426EPG4BEPKOUGMAVWS52GIFPWFZEO7ELXF3FVI5EVSLMTY
[18661] 2022/03/02 00:41:46.170595 [INF] ID: NAIAIQUT4426EPG4BEPKOUGMAVWS52GIFPWFZEO7ELXF3FVI5EVSLMTY
[18661] 2022/03/02 00:41:46.171426 [INF] Listening for client connections on 0.0.0.0:4222
[18661] 2022/03/02 00:41:46.171920 [INF] Server is ready
Publish-Subscribe
NATS implements a publish-subscribe message distribution model for one-to-many communication. A publisher sends a message on a subject and any active subscriber listening on that subject receives the message. This 1:N (one-to-many) pattern is sometimes called a fan-out.
Subscribers can also register interest in wildcard subjects that work a bit like a regular expression (but only a bit). For example,
-
foo.*
matchesfoo.bar
andfoo.baz
. -
foo.*.bar
matchesfoo.a.bar
andfoo.b.bar
. -
foo.>
matches any of the above.
Messages have a maximum size (set in the server configuration with max_payload
). The size is set to 1 MB by default but can be increased up to 64 MB if needed (though the NATS team recommends keeping the max message size to something more reasonable like 8 MB).
Why do we need this?
Publish-Subscribe is a pretty common use case, as the name suggests, we can use this to do message fan-out to different services.
Code
Let's write some code in cmd/publish-subscribe/main.go
to understand this better, starting with initializing our NATS client.
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
Subscribe to our foo
subject using 3 subscribers, so that we can see a message fan-out in action.
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 1:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 2:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 3:", string(msg.Data))
})
Publish our message to the foo
subject and wait.
if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
log.Fatalln(err)
}
time.Sleep(2 * time.Second)
The complete example should look like this. Yes, distributed messaging with NATS it's that simple!
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 1:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 2:", string(msg.Data))
})
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Subscriber 3:", string(msg.Data))
})
if err := nc.Publish("foo", []byte("Message")); err != nil {
log.Fatalln(err)
}
time.Sleep(2 * time.Second)
}
Output
As we can see, our message was distributed to all the subscribers.
$ go run cmd/publish-subscribe/main.go
2022/03/01 22:42:56 Subscriber 1: Message
2022/03/01 22:42:56 Subscriber 3: Message
2022/03/01 22:42:56 Subscriber 2: Message
Request-Reply
Request-Reply is a common pattern in modern distributed systems. A request is sent, and the application either waits on the response with a certain timeout or receives a response asynchronously.
NATS makes request-reply simple and powerful and enables powerful features like location transparency, scale-up and scale-down, observability, and more.
Why do we need this?
Sometimes one-to-one communication is required between the service, Request-Reply is a great pattern for this.
Code
Let's start by writing some code in cmd/request-reply/main.go
. Same as before, let's initialize our NATS client.
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
Subscribe to our subject foo
and add some logging.
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Request received:", string(msg.Data))
msg.Respond([]byte("Here you go!"))
})
We can also use unique reply subjects, clients can make requests to services that respond only to the requestor, creating a 1 to 1 relationship.
nc.Publish(msg.Reply, []byte("Here you go!"))
Now, we will use the Request
method on the NATS client. It has three arguments, first is the subject, second is the data in a byte array, and the last one is the timeout for the request.
reply, err := nc.Request("foo", []byte("Give me data"), 4*time.Second)
if err != nil {
log.Fatalln(err)
}
log.Println("Got Reply:", string(reply.Data))
So, our complete example should look like this.
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
nc.Subscribe("foo", func(msg *nats.Msg) {
log.Println("Request received:", string(msg.Data))
msg.Respond([]byte("Here you go!"))
})
reply, err := nc.Request("foo", []byte("Give me data"), 4*time.Second)
if err != nil {
log.Fatalln(err)
}
log.Println("Got Reply:", string(reply.Data))
}
Output
As expected, our request was received and our subscriber responded with some data.
$ go run cmd/request-reply/main.go
2022/03/01 20:14:53 Request received: Give me data
2022/03/01 20:14:53 Got Reply: Here you go!
Queue Groups
NATS provides a built-in load balancing feature called distributed queues. Using queue subscribers will balance message delivery across a group of subscribers which can be used to provide application fault tolerance and scale workload processing.
Why do we need this?
Queue subscribers are ideal for scaling services. Scale up is as simple as running another application, scale down is terminating the application with a signal that drains the in flight requests. This flexibility and lack of any configuration changes makes NATS an excellent service communication technology that can work with all platform technologies. One of the great features of NATS is that queue groups are defined by the application and their queue subscribers, not on the server configuration.
Code
To create a queue subscription, subscribers register a queue name. All subscribers with the same queue name form the queue group. This requires no configuration. As messages on the registered subject are published, one member of the group is chosen randomly to receive the message. Although queue groups have multiple subscribers, each message is consumed by only one.
We will start with our code in cmd/queue-groups/main.go
and like before, our client connection code is same
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
Next, we will create 3 queue subscribers with the subject foo
and queue name of queue.foo
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscriber 1:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscriber 2:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscriber 3:", string(msg.Data))
})
Lastly, we will create a loop and publish distinct messages to foo
subject so that we can see how they will be received.
for i := 1; i <= 3; i++ {
message := fmt.Sprintf("Message %d", i)
if err := nc.Publish("foo", []byte(message)); err != nil {
log.Fatalln(err)
}
}
time.Sleep(2 * time.Second)
Here's our complete example
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalln(err)
}
defer nc.Close()
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscriber 1:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscriber 2:", string(msg.Data))
})
nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
log.Println("Subscriber 3:", string(msg.Data))
})
for i := 1; i <= 3; i++ {
message := fmt.Sprintf("Message %d", i)
if err := nc.Publish("foo", []byte(message)); err != nil {
log.Fatalln(err)
}
}
time.Sleep(2 * time.Second)
}
Output
As we can see, our messages got randomly distributed to the subscribers. So, in a way NATS can act as a layer 7 load balancer for the services.
$ go run cmd/queue-groups/main.go
2022/03/01 22:53:59 Subscriber 3: Message 2
2022/03/01 22:53:59 Subscriber 1: Message 3
2022/03/01 22:53:59 Subscriber 2: Message 1
Conclusion
In this article, we looked at different communication patterns which demonstrates real-time distributed messaging power of NATS. Also, JetStream can be used in conjunction with these patterns where durable messaging and at least once delivery policies are required.
I hope this article was helpful. If you have any questions, feel free to reach out or post them in the NATS slack community.
Top comments (0)