In this post, we’ll see how to write workers in Go, how to gracefully shut them down. We’ll also look at how to coordinate multiple workers.
Finally, I’ll introduce Orchestra, a package I built which makes it a lot easier to manage and gracefully shutdown workers.
What are workers?
When we write programs, we are often in situations when we have function that runs until either it faces an error, or we explicitly order it to stop. A good example of this is the http.Server
. When we start a web server in Go, the program will block until we ask it to stop or it encounters.
Here’s a simple webserver:
package main
import (
"io"
"net/http"
)
func main() {
// Hello world, the web server
helloHandler := func(w http.ResponseWriter, req *http.Request) {
io.WriteString(w, "Hello, world!\n")
}
http.HandleFunc("/hello", helloHandler)
err := http.ListenAndServe(":8080", nil)
if err != nil {
panic(err)
}
}
We can write a function that behaves the same way:
package main
func main()
err := myWorker()
if err != nil {
panic(err)
}
}
func myWorker() error {
for {
err := doSomethingRepeatedly()
if err != nil {
return err
}
}
return nil
}
In both these programs, we can ask them to stop by killing the main process (ctrl + C) and that will forcefully close the program.
Gracefully Shutting Down Workers
To shutdown a worker grace fully we should listen for signals passed to the program. The main ones to look out for are:
-
SIGINT
: The interrupt signal. The terminal sends it to the foreground process when the user presses ctrl-c. The default behavior is to terminate the process, but it can be caught or ignored. The intention is to provide a mechanism for an orderly, graceful shutdown. -
SIGTERM
: The termination signal. The default behavior is to terminate the process, but it also can be caught or ignored. The intention is to kill the process, gracefully or not, but to first allow it a chance to cleanup.
Credit for the explanation of what the signals mean
For the webserver, since Go 1.8 (which was a long time ago), the http.Server
type has a Shutdown
method which gracefully closes the connections.
Here’s an example of how to use it:
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
)
func main() {
var srv http.Server
idleConnsClosed := make(chan struct{})
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
<-signals
// We received an interrupt signal, shut down.
if err := srv.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Printf("HTTP server Shutdown: %v", err)
}
close(idleConnsClosed)
}()
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
log.Fatalf("HTTP server ListenAndServe: %v", err)
}
<-idleConnsClosed
}
Let’s create something similar for our worker:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
)
var shouldStop = false
func main() {
complete := make(chan struct{})
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
<-signals
shouldStop = true
close(complete)
}()
if err := myWorker(); err != nil {
// Error in our worker:
log.Fatalf("Worker error: %v", err)
}
<-complete
}
func myWorker() error {
for !shouldStop {
err := doSomethingRepeatedly()
if err != nil {
return err
}
}
return nil
}
Multiple Workers
This seems somewhat straightforward with just a single long running process. However, it becomes trickier.
At first glance, it seems we can just do something like this:
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
)
var shouldStop = false
func main() {
var srv http.Server
complete := make(chan struct{})
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
<-signals
// Ask our worker to stop
shouldStop = true
// Shutdown the server
if err := srv.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Printf("HTTP server Shutdown: %v", err)
}
close(complete)
}()
go func() {
if err := myWorker(); err != nil {
// Error in our worker:
log.Fatalf("Worker error: %v", err)
}
}()
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
log.Fatalf("HTTP server ListenAndServe: %v", err)
}
}()
<-complete
}
func myWorker() error {
for !shouldStop {
err := doSomethingRepeatedly()
if err != nil {
return err
}
}
return nil
}
But that does not work as expected. Here’s why:
First, we did not stop the workers asynchronously. It does not seem really needed in the particular example, so let’s leave that problem for now.
More importantly, in the example above, if any of the workers encounters an error, the program rudely exits! Our entire shutdown sequence is ignored.
Coordinating with Context
In idiomatic Go, the ideal way would be to pass a context.Context
to our worker, and ask it to shut itself down once the context is done.
Using this method, instead of calling log.Fatalf()
when a worker encounters an error, we can simply cancel the context, and it will then initiate the shutdown sequence.
Since we are not calling the shutdown functions directly, let’s use sync.WaitGroup
to know when all our workers have exited.
This means we’d have to rework our code to look like this:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
<-signals
cancel()
}()
wg.Add(1)
go func() {
if err := myWorker(ctx); err != nil {
cancel()
}
wg.Done()
}()
wg.Add(1)
go func() {
if err := startServer(ctx); err != nil {
cancel()
}
wg.Done()
}()
wg.Wait()
}
func myWorker(ctx context.Context) error {
var shouldStop = false
go func() {
<-ctx.Done()
shouldStop = true
}()
for !shouldStop {
err := doSomethingRepeatedly()
if err != nil {
return err
}
}
return nil
}
func startServer(ctx context.Context) error {
var srv http.Server
go func() {
<-ctx.Done() // Wait for the context to be done
// Shutdown the server
if err := srv.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Printf("HTTP server Shutdown: %v", err)
}
}()
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
return fmt.Errorf("HTTP server ListenAndServe: %w", err)
}
return nil
}
This neatly satisfies all our conditions. Yay!
Using Orchestra
To reduce all this boilerplate, I created a package to do most of this.
Orchestra is a library to manage long running go processes.
At the heart of the library is an interface called Player
// Player is a long running background worker
type Player interface {
Play(context.Context) error
}
All a type needs to do to satisfy the interface is to have a Play
method that will gracefully shutdown when the context is done.
It can also return an error if it encounters a problem when playing.
Next, there’s the Conductor type (which itself is a Player)
// Conductor is a group of workers. It is also a Player itself **evil laugh**
type Conductor struct {
Timeout time.Duration
Players map[string]Player
}
With the conductor, you add Players
to it, and when you call the Play
method on the conductor, it will start the Players
under it and gracefully shut them all down when the main context is done.
The timeout is there incase there is a Player
that refused to stop.
It also contains a few helper functions:
-
PlayUntilSignal(p Player, sig ...os.Signal)
: This will start a player with a context, and close the context once it receives any of the signals provided. -
PlayerFunc(func(context.Context) error)
: This is a quick way to convert a standalone function into a type that satisfies thePlayer
interface. -
ServerPlayer{*http.Server}
: This is a type that embeds the*http.Server
and extends it to satisfy thePlayer
interface.
More details can be found in the README
.
With these, we can simplify our example code into this:
package main
import (
"context"
"net/http"
"os"
"syscall"
"time"
"github.com/stephenafamo/orchestra"
)
func main() {
var srv = &http.Server{}
// creates a player from a myWorker function
workerPlayer := orchestra.PlayerFunc(myWorker)
// A player from a server
serverPlayer := orchestra.ServerPlayer{srv}
// A conductor to control them all
conductor := &orchestra.Conductor{
Timeout: 5 * time.Second,
Players: map[string]orchestra.Player{
// the names are used to identify the players
// both in logs and the returned errors
"worker": workerPlayer,
"server": serverPlayer,
},
}
// Use the conductor as a Player
err := orchestra.PlayUntilSignal(conductor, os.Interrupt, syscall.SIGTERM)
if err != nil {
panic(err)
}
}
func myWorker(ctx context.Context) error {
var shouldStop = false
go func() {
<-ctx.Done()
shouldStop = true
}()
for !shouldStop {
err := doSomethingRepeatedly()
if err != nil {
return err
}
}
return nil
}
Conclusion
As stated in the first paragraph, we’ve seen how to write workers in Go, how to gracefully shut them down and I’ve introduced Orchestra, a package I built which makes it a lot easier to manage and gracefully shutdown workers.
The post Gracefully shutting down multiple workers in Go appeared first on Stephen AfamO's Blog.
Top comments (0)