This post is the second part of the "Learning Microservices with Go". I'm writing these posts as I'm learning the concepts. If you haven't checked out the part 1, here's the link for it: Part 1.
Link for Git Repo: Github
In part 1, we created the directory structure for our movie app. We created 3 services namely metadata, rating, movie services.
The problem that we're facing was that these services were running, interacting with each other using hard-coded addresses and ports. This works fine only till we run all the services on the same machine. We also need to be able to create multiple instances of any service which is currently not possible(This is important if we want to build a real application which can scale. It'll give us high availability and more fault tolerance).
To tackle this, we will use a service discovery. You may ask what's a service discovery. Service discovery addresses multiple problems:
1. How to find instance(s) of a particular service.
2. How to dynamically add or remove an instance of any service.
3. How to handle unresponsive instances of any service.
In simplicity, service discovery can be viewed as a registry, storing the information about the available services. It has the following features:
1. Adding(Registering) an instance of a service.
2. Removing(Deregistering) an instance of a service.
3. Get a list of all instances of a service. (list of network addresses of the instances)
Service Discovery Models
The service discovery can be implemented in two ways.
- Client-side service discovery: The application directly accesses the registry directly using a registry client.
- Server-side service discovery: The application access the registry via a load balancer, which forwards the request to the available instances.
Some differences:
In case of the client side service discovery, the logic for calling the instance needs to be with the client and be implemented for every application. If this logic is not correctly configured, the application may end up calling only 1 instance of the service, leading to overload of the instance and under-utilization of other instances.
In case of the server-side service discovery, the logic for calling the instance is with the load balancer. The load balancer gets the list of instances from the service discovery and decides which instance should the request be directed to and forwards the request to the chosen instance. This decouples the logic for service directory interaction from the application.
In this series, we'll be covering the client-side discovery models only as they're simpler to implement and don't require setting up of load balancers.
Tools available for service discovery
-
Hashicorp Consul: This has been a very simple solution for service discovery for some years. It has a simple API, which includes:
- PUT /catalog/register: Registers a service instance.
- PUT /catalog/deregister: Deregisters a service instance.
- GET /catalog/services: Gets the instances of the service that are available.
Kubernetes: Kubernetes is a very widely used container orchestration tool. It's used to manage, deploy, scale containers(which often contain microservices). It has a feature to register and find services running on it. It allows for a client-side discovery model as well as pluging a load balancer for server-side discovery model.
We'll use the hashicorp consul for this tutorial. We'll learn to use Kubernetes in future parts of the series.
Coding the service discovery
We'll add the service directory to the pkg
folder as it'd be accessed by all the services to discover other services.
Create a file pkg/discovery/discovery.go
. Add the following code to that.
package discovery
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
)
type Registery interface {
// Register a service with the registry.
Register(ctx context.Context, instanceID string, serviceName string, hostPort string) error
// Deregister a service with the registry.
Deregister(ctx context.Context, instanceID string, serviceName string) error
// Discover a service with the registry.
Discover(ctx context.Context, serviceName string) ([]string, error)
// HealthCheck a service with the registry.
HealthCheck(instanceID string, serviceName string) error
}
// ErrNotFound is returned when no service addresses are found.
var ErrNotFound = errors.New("no service addresses found")
// GenerateInstanceID generates a psuedo-random service instance identifier, using a service name. Suffixed by dash and number
func GenerateInstanceID(serviceName string) string {
return fmt.Sprintf("%s-%d", serviceName, rand.New(rand.NewSource(time.Now().UnixNano())).Int())
}
- What we added was an interface of a service discovery. We added that so as if in future if we switch our service discovery tool(eg: from Consul to some other tool), then we won't be making changes all over the code. The newer discovery could use the same interface.
- Also similar to the way we have a memory repository in the last part, we can have a memory discovery as well. It would provide us ability to test our discovery.
In memory service discovery
Create the file pkg/discovery/memory/memory.go
and add the following code.
package memory
import (
"context"
"errors"
"sync"
"time"
"movieexample.com/pkg/discovery"
)
type serviceNameType string
type instanceIDType string
type Registery struct {
sync.RWMutex
serviceAddrs map[serviceNameType]map[instanceIDType]*serviceInstance
}
type serviceInstance struct {
hostPort string
lastActive time.Time
}
// NewRegistry creates a new in-memory registry instance.
func NewRegistry() *Registery {
return &Registery{
serviceAddrs: make(map[serviceNameType]map[instanceIDType]*serviceInstance),
}
}
// Register creates a service record in the registry
func (r *Registery) Register(ctx context.Context, instanceID string, serviceName string, hostPort string) error {
r.Lock()
defer r.Unlock()
if _, ok := r.serviceAddrs[serviceNameType(serviceName)]; !ok {
r.serviceAddrs[serviceNameType(serviceName)] = map[instanceIDType]*serviceInstance{}
}
r.serviceAddrs[serviceNameType(serviceName)][instanceIDType(instanceID)] = &serviceInstance{
hostPort: hostPort,
lastActive: time.Now(),
}
return nil
}
// Deregister removes a service record from the registry
func (r *Registery) Deregister(ctx context.Context, instanceID string, serviceName string) error {
r.Lock()
defer r.Unlock()
if _, ok := r.serviceAddrs[serviceNameType(serviceName)]; !ok {
return discovery.ErrNotFound
}
delete(r.serviceAddrs[serviceNameType(serviceName)], instanceIDType(instanceID))
return nil
}
// HealthCheck marks a service instance as active
func (r *Registery) HealthCheck(instanceID string, serviceName string) error {
r.Lock()
defer r.Unlock()
if _, ok := r.serviceAddrs[serviceNameType(serviceName)]; !ok {
return errors.New("service not registered yet")
}
if _, ok := r.serviceAddrs[serviceNameType(serviceName)][instanceIDType(instanceID)]; !ok {
return errors.New("service instance not registered yet")
}
r.serviceAddrs[serviceNameType(serviceName)][instanceIDType(instanceID)].lastActive = time.Now()
return nil
}
// Discover returns a list of service instances from the registry
func (r *Registery) Discover(ctx context.Context, serviceName string) ([]string, error) {
r.RLock()
defer r.RUnlock()
if len(r.serviceAddrs[serviceNameType(serviceName)]) == 0 {
return nil, discovery.ErrNotFound
}
var res []string
for _, v := range r.serviceAddrs[serviceNameType(serviceName)] {
if time.Since(v.lastActive) > 5*time.Second {
continue
}
res = append(res, v.hostPort)
}
return res, nil
}
We can see that the in-memory service discovery implements all the methods of the Registry interface we had defined earlier. We used a map of map to store the service addresses. sync.RWMutex
is used to allow concurrent read, writes to the map.
Now we can use this implementation wherever we want.
Consul based service discovery
- Create a file
pkg/discovery/consul/consul.go
- Add the following code to it. ```go
package consul
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
consul "github.com/hashicorp/consul/api"
)
// Registery defines a consul based service registry
type Registery struct {
client *consul.Client
}
// NewRegistery creates a new consul registry instance.
// Addr is the address where the consul agent is running
func NewRegistery(addr string) (*Registery, error) {
config := consul.DefaultConfig()
config.Address = addr
client, err := consul.NewClient(config)
if err != nil {
return nil, err
}
return &Registery{client: client}, nil
}
// Register creates a service record in the registry
func (r *Registery) Register(ctx context.Context, instanceID string, serviceName string, hostPort string) error {
parts := strings.Split(hostPort, ":")
if len(parts) != 2 {
return errors.New("invalid host:port format. Eg: localhost:8081")
}
port, err := strconv.Atoi(parts[1])
if err != nil {
return err
}
host := parts[0]
err = r.client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
Address: host,
Port: port,
ID: instanceID,
Name: serviceName,
Check: &consul.AgentServiceCheck{
CheckID: instanceID,
TTL: "5s",
},
})
return err
}
// Deregister removes a service record from the registry
func (r *Registery) Deregister(ctx context.Context, instanceID string, _ string) error {
err := r.client.Agent().ServiceDeregister(instanceID)
return err
}
// HealthCheck is a push mechanism to update the health status of a service instance
func (r *Registery) HealthCheck(instanceID string, _ string) error {
err := r.client.Agent().UpdateTTL(instanceID, "", "pass")
return err
}
// Discover returns a list of addresses of active instances of the given service
func (r *Registery) Discover(ctx context.Context, serviceName string) ([]string, error) {
entries, _, err := r.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, err
}
var instances []string
for _, entry := range entries {
instances = append(instances, fmt.Sprintf("%s:%d", entry.Service.Address, entry.Service.Port))
}
return instances, nil
}
As we're using consul which is an external library, do remember to do `go mod tidy` in the *src* directory.
#### Modifying files to use service discovery
Change the two gateways that we'd defined earlier for metadata, rating services
```go
type Gateway struct {
registry discovery.Registry
}
Changing the New function
func New(registry discovery.Registry) *Gateway {
return &Gateway{registry}
}
Now the gateways require registry when being created. Change the Get function of the rating gateway as the following:
addrs, err := g.registery.Discover(ctx, "rating")
if err != nil {
return 0, err
}
if len(addrs) == 0 {
return 0, fmt.Errorf("no rating service instances available")
}
url := "http://" + addrs[rand.Intn(len(addrs))] + "/rating"
log.Printf("Calling rating service. Request: GET %s\n", url)
req, err := http.NewRequest(http.MethodGet, url, nil)
Earlier we're calling a pre-configured address. Now we'll get a list of addresses from the service discovery. We choose a random instance from the list.
Update the metadata service in the same way.
Now, we'll update the main functions of our services so that they can register and deregister to the service discovery.
Update the rating service main function as:
func main() {
var port int
flag.IntVar(&port, "port", 8082, "API Handler port")
flag.Parse()
fmt.Printf("Starting the movie rating service on port %d", port)
registery, err := consul.NewRegistery("localhost:8500")
if err != nil {
panic(err)
}
ctx := context.Background()
instanceID := discovery.GenerateInstanceID(serviceName)
if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
panic(err)
}
go func() {
for {
if err := registery.HealthCheck(instanceID, serviceName); err != nil {
log.Println("Failed to report healthy state: ", err.Error())
}
time.Sleep(1 * time.Second)
}
}()
defer registery.Deregister(ctx, instanceID, serviceName)
repo := memory.New()
ctrl := rating.New(repo)
h := httpHandler.New(ctrl)
http.HandleFunc("/rating", http.HandlerFunc(h.Handle))
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
panic(err)
}
}
Do the same for the metadata service.
The final thing that we need to do is to update the main function of the movie service.
func main() {
var port int
flag.IntVar(&port, "port", 8083, "Port to listen on")
flag.Parse()
// Register with consul
registery, err := consul.NewRegistery("localhost:8500")
if err != nil {
panic(err)
}
ctx := context.Background()
instanceID := discovery.GenerateInstanceID(serviceName)
if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
panic(err)
}
go func() {
for {
if err := registery.HealthCheck(instanceID, serviceName); err != nil {
log.Println("Failed to report healthy state: ", err.Error())
}
time.Sleep(1 * time.Second)
}
}()
defer registery.Deregister(ctx, instanceID, serviceName)
log.Printf("Starting the movie service at port: %d\n", port)
metadataGateway := metadataGateway.New(registery)
ratingGateway := ratingGateway.New(registery)
ctrl := movie.New(ratingGateway, metadataGateway)
h := httpHandler.New(ctrl)
http.Handle("/movie", http.HandlerFunc(h.GetMovieDetails))
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
panic(err)
}
}
We've completed the code for implementing service discovery.
Testing
- Now we'll need to run the Consul app to allow our services to use it. We can do this by using Docker. Run the following command. This will spin up consul in a docker container. ```
docker run \
-d \
-p 8500:8500 \
-p 8600:8600/udp \
--name=dev-consul \
hashicorp/consul agent -server -ui \
-node=server-1 -bootstrap-expect=1 -client=0.0.0.0
2. We'll now run each microservice inside each `cmd` directory.
`go run *.go`
3. Now going to the consul web ui running at `localhost:8500`, you'll see list of services that are active.
4. Try running multiple instances of any of the services by:
`go run *.go --port <Port>`
Remember to use ports that are not already being used.
5. Test the API by sending the request to the movie service.
`curl -v localhost:8083/movie?id=1`
You'll find the following in the shell running movie service.
2024/01/28 10:19:55 Calling metadata service. Request: GET http://localhost:8081/metadata
This means that the movie service was able to interact with the service discovery and then direct the request to the correct address for the service.
**Tadaaa**. We've completed the implementation of service discovery.
In the next part we'll learn serialization(used to encode the data transferred between the services).
Do like the post if you found it helpful and learned something.
Checkout my twitter at: [manavkush](https://twitter.com/manavkush)
Link for Git Repo: [Github](https://github.com/manavkush/microservices-go)
Link for Reference Book: [Book](https://www.packtpub.com/product/microservices-with-go/9781804617007)
See you next post.
Top comments (0)