The Go client from NATS by default has support for reconnection logic, so that whenever it disconnects from a server it will retry for a number of times before giving up and closing the connection.
nc, err := nats.Connect("demo.nats.io", nats.MaxReconnects(10))
// ...
Besides limiting the number of maximum number of reconnect attempts,
the client has a number of knobs to customize how it should reconnect,
such as backing off in between connection attempts, or whether to
disable reconnecting altogether without buffering
enabled for example.
Furthermore, it also supports event callbacks to be able to trace
whenever the client disconnects, reconnects or gives up and closes the
connection to the server.
nc, err := nats.Connect("demo.nats.io",
nats.ReconnectWait(2 * time.Second),
nats.ReconnectHandler(func(c *nats.Conn) {
log.Println("Reconnected to", c.ConnectedUrl())
}),
nats.DisconnectHandler(func(c *nats.Conn) {
log.Println("Disconnected from NATS")
}),
nats.ClosedHandler(func(c *nats.Conn) {
log.Println("Closed connection to NATS")
}),
)
// ...
On the other hand, there aren't as many knobs for the initial
connection attempt done by the client. Also, if the initial connect to
a NATS server fails, then nats.Connect
will just return an error and
abort.
nc, err = nats.Connect("127.0.0.1:4223")
if err != nil {
// nats: no servers available for connection
log.Fatal(err)
}
Instead of exposing a number of options to manage how to Dial to NATS
as part of the client, it is possible to define a CustomDialer that
can be used to replace the default dialer in order customize how to
dial without having to do further internal changes to the library.
type customDialer struct {
}
func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
// ...
return conn, nil
}
func main() {
cd := &customDialer{}
opts := []nats.Option{
nats.SetCustomDialer(cd),
// ...
}
nc, err = nats.Connect("demo.nats.io", opts...)
// ...
}
This usage of Go interfaces makes things very flexible as now the
client could add custom logic as part of the dialer implemention and
everything will work transparently.
For example, let's say that you want to make the client use the
context
package to use DialContext
and be able to cancel
connecting to NATS altogether with a deadline, one could then
define a Dialer
implementation as follows:
package main
import (
"context"
"log"
"net"
"time"
"github.com/nats-io/go-nats"
)
type customDialer struct {
ctx context.Context
nc *nats.Conn
connectTimeout time.Duration
connectTimeWait time.Duration
}
func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
ctx, cancel := context.WithTimeout(cd.ctx, cd.connectTimeout)
defer cancel()
for {
log.Println("Attempting to connect to", address)
if ctx.Err() != nil {
return nil, ctx.Err()
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
d := &net.Dialer{}
if conn, err := d.DialContext(ctx, network, address); err == nil {
log.Println("Connected to NATS successfully")
return conn, nil
} else {
time.Sleep(cd.connectTimeWait)
}
}
}
}
With the dialer implementation above, the NATS client will retry a
number of times to connect to the NATS server until the context is no
longer valid:
func main() {
// Parent context cancels connecting/reconnecting altogether.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
var nc *nats.Conn
cd := &customDialer{
ctx: ctx,
nc: nc,
connectTimeout: 5 * time.Second,
connectTimeWait: 1 * time.Second,
}
opts := []nats.Option{
nats.SetCustomDialer(cd),
nats.ReconnectWait(2 * time.Second),
nats.ReconnectHandler(func(c *nats.Conn) {
log.Println("Reconnected to", c.ConnectedUrl())
}),
nats.DisconnectHandler(func(c *nats.Conn) {
log.Println("Disconnected from NATS")
}),
nats.ClosedHandler(func(c *nats.Conn) {
log.Println("NATS connection is closed.")
}),
nats.MaxReconnects(2),
}
go func() {
nc, err = nats.Connect("127.0.0.1:4222", opts...)
}()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
go func() {
for {
select {
case sig := <-c:
switch sig {
case syscall.SIGINT:
log.Println("Shutting down...")
cancel()
return
}
}
}
}()
WaitForEstablishedConnection:
for {
if err != nil {
log.Fatal(err)
}
select {
case <-ctx.Done():
break WaitForEstablishedConnection
default:
}
if nc == nil || !nc.IsConnected() {
log.Println("Connection not ready yet...")
time.Sleep(500 * time.Millisecond)
continue
}
break WaitForEstablishedConnection
}
if ctx.Err() != nil {
log.Fatal(ctx.Err())
}
// Run until receiving signal
for range time.NewTicker(1 * time.Second).C {
select {
case <-ctx.Done():
// Disconnect and flush pending messages
if err := nc.Drain(); err != nil {
log.Println(err)
}
log.Println("Closed connection to NATS")
return
default:
}
if nc.IsClosed() {
break
}
if err := nc.Publish("hello", []byte("world")); err != nil {
log.Println(err)
continue
}
log.Println("Published message")
}
}
Using DialContext as in the example above, also opens the door to be
able to hook to some of the tracing features part of the standard
library, for example to check when a connection attempt will happen or
TCP connection is established.
In the example below, the httptrace.ClientTrace
package is being used
in order to activate some of the built-in tracing instrumentation that
is part of the standard library via context
and DialContext
.
type customDialer struct {
ctx context.Context
nc *nats.Conn
connectTimeout time.Duration
connectTimeWait time.Duration
}
func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
d := &net.Dialer{}
trace := &httptrace.ClientTrace{
ConnectStart: func(network, addr string) {
log.Println("Connecting to server at", addr)
},
ConnectDone: func(network, addr string, err error) {
if err == nil {
log.Println("Established TCP connection")
}
},
}
ctx := httptrace.WithClientTrace(cd.ctx, trace)
ctx, cancel := context.WithTimeout(ctx, cd.connectTimeout)
defer cancel()
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
if conn, err := d.DialContext(ctx, network, address); err == nil {
log.Println("Connected to NATS successfully")
return conn, nil
} else {
time.Sleep(cd.connectTimeWait)
}
}
}
}
Result:
go run custom-connect.go
2018/09/05 09:02:17 Connection not ready yet...
2018/09/05 09:02:17 Attempting to connect to server at 127.0.0.1:4222
2018/09/05 09:02:17 Established TCP connection
2018/09/05 09:02:17 Connected to NATS successfully
2018/09/05 09:02:19 Published message
2018/09/05 09:02:20 Published message
2018/09/05 09:02:21 Published message
2018/09/05 09:02:22 Published message
2018/09/05 09:02:23 Published message
^C2018/09/05 09:02:23 Shutting down...
You can find the full example here. Interested in learning more? Join the NATS slack channel, or you can also find more examples of advanced NATS usage on my book Practical NATS.
Top comments (0)