Let's try to create closer to real world gRPC microservice with tracing and monitoring π:
Source code u can find here
Core tools used what will be used: π
PostgreSQL as database
Redis for sessions and caching
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
First we need to start all necessary infrastructure containers:
run make local
UI interfaces will be available on ports:
Jaeger UI: http://localhost:16686
Prometheus UI: http://localhost:9090
Grafana UI: http://localhost:3000
After sending any requests, you are able to monitoring of metrics at the dashboard of Prometheus.
For Prometheus go to http://localhost:9090/graph type gRPC and choose one. π¨βπ»
You can import Grafana dashboard templates from the grafana directory, default login/password is admin and password admin
I like to use evans for simple testing gRPC.
In cmd folder let's init all dependencies and start the app.
Viper is very good and common choice as complete configuration solution for Go applications.
We use here config-local.yml file approach.
configPath := utils.GetConfigPath(os.Getenv("config"))
cfg, err := config.GetConfig(configPath)
if err != nil {
log.Fatalf("Loading config: %v", err)
}
Next let's create logger, here i used Uber's Zap under the hood, important here is to create Logger interface for be able to replace logger in the future if it's need.
type Logger interface {
InitLogger()
Debug(args ...interface{})
Debugf(template string, args ...interface{})
Info(args ...interface{})
Infof(template string, args ...interface{})
Warn(args ...interface{})
Warnf(template string, args ...interface{})
Error(args ...interface{})
Errorf(template string, args ...interface{})
DPanic(args ...interface{})
DPanicf(template string, args ...interface{})
Fatal(args ...interface{})
Fatalf(template string, args ...interface{})
}
Setup postgres and redis
Usually production SQL db standard solution for these days is combination of sqlx and pgx.
Good Redis Go clients is go-redis and redigo, i used first.
func NewPsqlDB(c *config.Config) (*sqlx.DB, error) {
dataSourceName := fmt.Sprintf("host=%s port=%s user=%s dbname=%s sslmode=disable password=%s",
c.Postgres.PostgresqlHost,
c.Postgres.PostgresqlPort,
c.Postgres.PostgresqlUser,
c.Postgres.PostgresqlDbname,
c.Postgres.PostgresqlPassword,
)
db, err := sqlx.Connect(c.Postgres.PgDriver, dataSourceName)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(maxOpenConns)
db.SetConnMaxLifetime(connMaxLifetime * time.Second)
db.SetMaxIdleConns(maxIdleConns)
db.SetConnMaxIdleTime(connMaxIdleTime * time.Second)
if err = db.Ping(); err != nil {
return nil, err
}
return db, nil
}
func NewRedisClient(cfg *config.Config) *redis.Client {
redisHost := cfg.Redis.RedisAddr
if redisHost == "" {
redisHost = ":6379"
}
client := redis.NewClient(&redis.Options{
Addr: redisHost,
MinIdleConns: cfg.Redis.MinIdleConns,
PoolSize: cfg.Redis.PoolSize,
PoolTimeout: time.Duration(cfg.Redis.PoolTimeout) * time.Second,
Password: cfg.Redis.Password, // no password set
DB: cfg.Redis.DB, // use default DB
})
return client
}
And let's set up Jaeger:
func InitJaeger(cfg *config.Config) (opentracing.Tracer, io.Closer, error) {
jaegerCfgInstance := jaegercfg.Configuration{
ServiceName: cfg.Jaeger.ServiceName,
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: cfg.Jaeger.LogSpans,
LocalAgentHostPort: cfg.Jaeger.Host,
},
}
return jaegerCfgInstance.NewTracer(
jaegercfg.Logger(jaegerlog.StdLogger),
jaegercfg.Metrics(metrics.NullFactory),
)
}
And add the global tracer to our application:
tracer, closer, err := jaegerTracer.InitJaeger(cfg)
if err != nil {
appLogger.Fatal("cannot create tracer", err)
}
appLogger.Info("Jaeger connected")
opentracing.SetGlobalTracer(tracer)
defer closer.Close()
Prometheus has have 4 types of metrics: Counter, Gauge, Histogram, Summary
To expose Prometheus metrics in a Go application, you need to provide a /metrics HTTP endpoint.
You can use the prometheus/promhttp library's HTTP Handler as the handler function.
func CreateMetrics(address string, name string) (Metrics, error) {
var metr PrometheusMetrics
metr.HitsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: name + "_hits_total",
})
if err := prometheus.Register(metr.HitsTotal); err != nil {
return nil, err
}
metr.Hits = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: name + "_hits",
},
[]string{"status", "method", "path"},
)
if err := prometheus.Register(metr.Hits); err != nil {
return nil, err
}
metr.Times = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: name + "_times",
},
[]string{"status", "method", "path"},
)
if err := prometheus.Register(metr.Times); err != nil {
return nil, err
}
if err := prometheus.Register(prometheus.NewBuildInfoCollector()); err != nil {
return nil, err
}
go func() {
router := echo.New()
router.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
if err := router.Start(address); err != nil {
log.Fatal(err)
}
}()
return &metr, nil
}
user.proto file
In gRPC documentation we can find good practice recommendations and naming conventions for writing proto files.
As you can see, each field in the message definition has a unique number. These field numbers are used to identify your fields in the message binary format, and should not be changed once your message type is in use. Note that field numbers in the range 1 through 15 take one byte to encode, including the field number and the field's type (you can find out more about this in Protocol Buffer Encoding).
message LoginRequest {
string email = 1;
string password = 2;
}
message LoginResponse {
User user = 1;
string session_id = 2;
}
service UserService{
rpc Register(RegisterRequest) returns (RegisterResponse);
rpc FindByEmail(FindByEmailRequest) returns (FindByEmailResponse);
rpc FindByID(FindByIDRequest) returns (FindByIDResponse);
rpc Login(LoginRequest) returns (LoginResponse);
rpc GetMe(GetMeRequest) returns(GetMeResponse);
rpc Logout(LogoutRequest) returns(LogoutResponse);
}
generate your user.proto file π€
protoc --go_out=plugins=grpc:. *.proto
it creates user.pb.go file with server and client interfaces what need to implement in our microservice:
// UserServiceServer is the service API for UserService service.
type UserServiceServer interface {
Register(context.Context, *RegisterRequest) (*RegisterResponse, error)
FindByEmail(context.Context, *FindByEmailRequest) (*FindByEmailResponse, error)
FindByID(context.Context, *FindByIDRequest) (*FindByIDResponse, error)
Login(context.Context, *LoginRequest) (*LoginResponse, error)
GetMe(context.Context, *GetMeRequest) (*GetMeResponse, error)
Logout(context.Context, *LogoutRequest) (*LogoutResponse, error)
}
Then in server.go initialize the repository, use cases, metrics and so on then start gRPC server:
func (s *Server) Run() error {
metrics, err := metric.CreateMetrics(s.cfg.Metrics.URL, s.cfg.Metrics.ServiceName)
if err != nil {
s.logger.Errorf("CreateMetrics Error: %s", err)
}
s.logger.Info(
"Metrics available URL: %s, ServiceName: %s",
s.cfg.Metrics.URL,
s.cfg.Metrics.ServiceName,
)
im := interceptors.NewInterceptorManager(s.logger, s.cfg, metrics)
userRepo := userRepository.NewUserPGRepository(s.db)
sessRepo := sessRepository.NewSessionRepository(s.redisClient, s.cfg)
userRedisRepo := userRepository.NewUserRedisRepo(s.redisClient, s.logger)
userUC := userUseCase.NewUserUseCase(s.logger, userRepo, userRedisRepo)
sessUC := sessUseCase.NewSessionUseCase(sessRepo, s.cfg)
l, err := net.Listen("tcp", s.cfg.Server.Port)
if err != nil {
return err
}
defer l.Close()
server := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: s.cfg.Server.MaxConnectionIdle * time.Minute,
Timeout: s.cfg.Server.Timeout * time.Second,
MaxConnectionAge: s.cfg.Server.MaxConnectionAge * time.Minute,
Time: s.cfg.Server.Timeout * time.Minute,
}),
grpc.UnaryInterceptor(im.Logger),
grpc.ChainUnaryInterceptor(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
grpcrecovery.UnaryServerInterceptor(),
),
)
if s.cfg.Server.Mode != "Production" {
reflection.Register(server)
}
authGRPCServer := authServerGRPC.NewAuthServerGRPC(s.logger, s.cfg, userUC, sessUC)
userService.RegisterUserServiceServer(server, authGRPCServer)
grpc_prometheus.Register(server)
http.Handle("/metrics", promhttp.Handler())
go func() {
s.logger.Infof("Server is listening on port: %v", s.cfg.Server.Port)
if err := server.Serve(l); err != nil {
s.logger.Fatal(err)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
server.GracefulStop()
s.logger.Info("Server Exited Properly")
return nil
}
I found this is very good gRPC Middleware repository, but we easy can create our own, for example logger interceptor:
func (im *InterceptorManager) Logger(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
start := time.Now()
md, _ := metadata.FromIncomingContext(ctx)
reply, err := handler(ctx, req)
im.logger.Infof("Method: %s, Time: %v, Metadata: %v, Err: %v", info.FullMethod, time.Since(start), md, err)
return reply, err
}
We can access grpc metadata in service handlers too, for example here we extract and validate session_id which client must send in the request context:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", status.Errorf(codes.Unauthenticated, "metadata.FromIncomingContext: %v", grpc_errors.ErrNoCtxMetaData)
}
sessionID := md.Get("session_id")
So let's create unary service handler for creating the new user:
func (u *usersService) Register(ctx context.Context, r *userService.RegisterRequest) (*userService.RegisterResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "user.Create")
defer span.Finish()
user, err := u.registerReqToUserModel(r)
if err != nil {
u.logger.Errorf("registerReqToUserModel: %v", err)
return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "registerReqToUserModel: %v", err)
}
if err := utils.ValidateStruct(ctx, user); err != nil {
u.logger.Errorf("ValidateStruct: %v", err)
return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "ValidateStruct: %v", err)
}
createdUser, err := u.userUC.Register(ctx, user)
if err != nil {
u.logger.Errorf("userUC.Register: %v", err)
return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "Register: %v", err)
}
return &userService.RegisterResponse{User: u.userModelToProto(createdUser)}, nil
}
On the first lines start tracing span. The βspanβ is the primary building block of a distributed trace.
Each component of the distributed system contributes a span - a named, timed operation representing a piece of the workflow.
opentracing
span, ctx := opentracing.StartSpanFromContext(ctx, "user.Create")
defer span.Finish()
let's check how it's look in Jaeger:
open http://localhost:16686/
Then we usually have to validate request input, for errors gRPC has packages status and codes
I found good practice to parse and log errors in handler layer, here i use ParseGRPCErrStatusCode method, which parse err and returns matched gRPC code.
Validator is good solution for validation.
user, err := u.registerReqToUserModel(r)
if err != nil {
u.logger.Errorf("registerReqToUserModel: %v", err)
return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "registerReqToUserModel: %v", err)
}
if err := utils.ValidateStruct(ctx, user); err != nil {
u.logger.Errorf("ValidateStruct: %v", err)
return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "ValidateStruct: %v", err)
}
After request input validation call use case method which contains business logic and works with users repository:
createdUser, err := u.userUC.Register(ctx, user)
if err != nil {
u.logger.Errorf("userUC.Register: %v", err)
return nil, status.Errorf(grpc_errors.ParseGRPCErrStatusCode(err), "Register: %v", err)
}
Inside a.user.Register(ctx, user) method we start new tracing span and call user repository methods:
func (u *userUseCase) Register(ctx context.Context, user *models.User) (*models.User, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "UserUseCase.Register")
defer span.Finish()
existsUser, err := u.userPgRepo.FindByEmail(ctx, user.Email)
if existsUser != nil || err == nil {
return nil, grpc_errors.ErrEmailExists
}
return u.userPgRepo.Create(ctx, user)
}
In user repository Create method we again start new tracing span and run our query
Important note here:
Good practice is always wrap err with some additional information, it's will make debugging much easier in the future π
On Repository and UseCase levels usually we don't log errors, only wrap with the message and returns, because we logging errors on top layer in handlers.π¨βπ»
So we don't need log the one error multiple time, already warped it with debug message whats went wrong.
func (r *UserRepository) Create(ctx context.Context, user *models.User) (*models.User, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "UserRepository.Create")
defer span.Finish()
createdUser := &models.User{}
if err := r.db.QueryRowxContext(
ctx,
createUserQuery,
user.FirstName,
user.LastName,
user.Email,
user.Password,
user.Role,
user.Avatar,
).StructScan(createdUser); err != nil {
return nil, errors.Wrap(err, "Create.QueryRowxContext")
}
return createdUser, nil
}
Finally, service handler must return response object generated by proto, here usually we need to create helpers for map our internal business logic models to response object for return it.
return &userService.RegisterResponse{User: u.userModelToProto(createdUser)}, nil
Every app must be covered by tests, I didn't completely cover all code this one, but wrote some test of course.
For testing and mocking testify and gomock is very good tools.
Source code and list of all used tools u can find here π¨βπ» :)
I hope this article is usefully and helpfully, I'll be happy to receive any feedbacks or questions :)
Top comments (2)
Hello,
Thanks for this great post. I did a "make local" & "make run"
When I run the evans app, I get below error for call Register :
userService.UserService@127.0.0.1:5000> call Register
email (TYPE_STRING) => test@gmail.com
first_name (TYPE_STRING) => test
last_name (TYPE_STRING) => test
password (TYPE_STRING) => password
role (TYPE_STRING) => engineer
avatar (TYPE_STRING) => Falcon
command call: rpc error: code = Internal desc = Register: Create.QueryRowxContext: ERROR: relation "users" does not exist (SQLSTATE 42P01)
What am I missing ?
On my way to try it in prod 0/ Great job!