DEV Community

Cover image for Go EventSourcing and CQRS microservice using EventStoreDB πŸ‘‹βš‘οΈπŸ’«
Alexander
Alexander

Posted on • Edited on

Go EventSourcing and CQRS microservice using EventStoreDB πŸ‘‹βš‘οΈπŸ’«

In this article let's try to create closer to real world Event Sourcing CQRS microservice using: πŸš€πŸ‘¨β€πŸ’»πŸ™Œ
EventStoreDB The database built for Event Sourcing
gRPC Go implementation of gRPC
MongoDB Web and API based SMTP testing
Elasticsearch Elasticsearch client for Go.
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
swag Swagger for Go
Echo web framework
Kibana Kibana is user interface that lets you visualize your Elasticsearch

Source code you can find in GitHub repository

The main idea of this project is the implementation of Event Sourcing and CQRS using Go, EventStoreDB, gRPC, MongoDB and ElasticSearch.
Didn't write in this article about Event Sourcing and CQRS patterns, because it makes the article huge and the best place to read is [microservices.io (https://microservices.io/patterns/data/cqrs.html), found this article is very good too, and highly recommend Alexey Zimarev "Hands-on Domain-Driven Design with .NET Core" book and also his blog.

In this project we have microservice working with EventStoreDB using oficial go client, for [projections (https://zimarev.com/blog/event-sourcing/projections/) used MongoDB and Elasticsearch for search,
and communicate by gRPC and REST.
Did not implement here any interesting business logic and didn't cover tests, because don't have enough time,
the events list is very simple: create a new order, update shopping cart, pay, submit, cancel, change the delivery address, complete order, and of course in real-world better use more concrete and meaningfully events, but the target here is to show the idea and how it works.
Event Sourcing can be implemented in different ways, used here EventStoreDB, but we can do it with PostgreSQL and Kafka for example.
After trying both approaches, found EventStoreDB is a better solution because all required features are implemented out of the box, it is optimized and really very good engineers developing it.

For local development:

make local or docker_dev // for run docker compose
make run_es // run es microservice
Enter fullscreen mode Exit fullscreen mode

or

make dev // run all in docker compose with hot reload
Enter fullscreen mode Exit fullscreen mode

All UI interfaces will be available on ports:

EventStoreDB UI: http://localhost:2113

EventStoreDB

Jaeger UI: http://localhost:16686

Jaeger

Prometheus UI: http://localhost:9090

Prometheus

Grafana UI: http://localhost:3005

Grafana

Swagger UI: http://localhost:5007/swagger/index.html

Swagger

Kibana UI: http://localhost:5601/app/home#/

Kibana

Docker compose file for this project,
depending on your environment, use image for eventstoredb: eventstore/eventstore:21.6.0-buster-slim,
and for M1: ghcr.io/eventstore/eventstore@sha256:ab30bf2a2629e2c8710f8f7fdcb74df5466c6b3b2200c8e9ad8a797ed138012a

version: "3.8"

services:
  eventstore.db:
    image: eventstore/eventstore:21.6.0-buster-slim
    environment:
      - EVENTSTORE_CLUSTER_SIZE=1
      - EVENTSTORE_RUN_PROJECTIONS=All
      - EVENTSTORE_START_STANDARD_PROJECTIONS=true
      - EVENTSTORE_EXT_TCP_PORT=1113
      - EVENTSTORE_HTTP_PORT=2113
      - EVENTSTORE_INSECURE=true
      - EVENTSTORE_ENABLE_EXTERNAL_TCP=true
      - EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP=true
    ports:
      - "1113:1113"
      - "2113:2113"
    volumes:
      - type: volume
        source: eventstore-volume-data
        target: /var/lib/eventstore
      - type: volume
        source: eventstore-volume-logs
        target: /var/log/eventstore
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

  mongodb:
    image: mongo:latest
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin
      MONGODB_DATABASE: products
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data_container:/data/db
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.21
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

  node01:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1
    container_name: node01
    restart: always
    environment:
      - node.name=node01
      - cluster.name=es-cluster-7
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms128m -Xmx128m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - es-data01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks: [ "microservices" ]

  kibana:
    image: docker.elastic.co/kibana/kibana:7.11.1
    restart: always
    environment:
      ELASTICSEARCH_HOSTS: http://node01:9200
    ports:
      - "5601:5601"
    depends_on:
      - node01
    networks: [ "microservices" ]

volumes:
  mongodb_data_container:
  eventstore-volume-data:
  eventstore-volume-logs:
  es-data01:
    driver: local

networks:
  microservices:
    name: microservices
Enter fullscreen mode Exit fullscreen mode

First, we look at EventStoreDB and then add some code for working with it, we can look at streams and events
EventStoreDB ui:

AggregateRoot can be implemented in different ways, for this project and Go specific, the main aggregate root methods - load events and apply changes.
When we fetch the aggregate from the database, instead of reading its state as one record in a table or document, we read all events that were saved before and call the When method for each.
After all these steps, we will recover all the history of a given aggregate. By doing this, we will be bringing our aggregate to its latest state.

type AggregateBase struct {
    ID                string
    Version           int64
    AppliedEvents     []Event
    UncommittedEvents []Event
    Type              AggregateType
    when              when
}

func (a *AggregateBase) Apply(event Event) error {
    if event.GetAggregateID() != a.GetID() {
        return ErrInvalidAggregateID
    }

    event.SetAggregateType(a.GetType())

    if err := a.when(event); err != nil {
        return err
    }

    a.Version++
    event.SetVersion(a.GetVersion())
    a.UncommittedEvents = append(a.UncommittedEvents, event)
    return nil
}

// RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore
func (a *AggregateBase) RaiseEvent(event Event) error {
    if event.GetAggregateID() != a.GetID() {
        return ErrInvalidAggregateID
    }
    if a.GetVersion() >= event.GetVersion() {
        return ErrInvalidEventVersion
    }

    event.SetAggregateType(a.GetType())

    if err := a.when(event); err != nil {
        return err
    }

    a.Version = event.GetVersion()
    return nil
}
Enter fullscreen mode Exit fullscreen mode

The Event struct is abstraction under esdb.RecordedEvent and esdb.EventData for comfortable work with it.
An event represents a fact that took place in the domain. They are the source of truth; your current state is derived from the events.
They are immutable and represent the business facts.
It means that we never change or remove anything in the database, we only append new events.

// EventType is the type of any event, used as its unique identifier.
type EventType string

// Event is an internal representation of an event, returned when the Aggregate
// uses NewEvent to create a new event. The events loaded from the db is
// represented by each DBs internal event type, implementing Event.
type Event struct {
    EventID       string
    EventType     string
    Data          []byte
    Timestamp     time.Time
    AggregateType AggregateType
    AggregateID   string
    Version       int64
    Metadata      []byte
}
Enter fullscreen mode Exit fullscreen mode

In this example we don't use snapshots, so the AggregateStore interface:

// AggregateStore is responsible for loading and saving aggregates.
type AggregateStore interface {
    Load(ctx context.Context, aggregate Aggregate) error
    Save(ctx context.Context, aggregate Aggregate) error
    Exists(ctx context.Context, streamID string) error
}
Enter fullscreen mode Exit fullscreen mode

Implementation of AggregateStore is Load, Save and Exists methods,
Load and Save accept aggregate then load or apply events using EventStoreDB client.
The Load method: find out the stream name for an aggregate, read all of the events from the aggregate stream,
loop through all of the events, and call the RaiseEvent handler for each of them.
And the Save method persists aggregates by saving the history of changes, handling concurrency, when you retrieve a stream from EventStoreDB, you take note of the current version number, then when you save it back you can determine if somebody else has modified the record in the meantime.

func (a *aggregateStore) Load(ctx context.Context, aggregate es.Aggregate) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Load")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", aggregate.GetID()))

    stream, err := a.db.ReadStream(ctx, aggregate.GetID(), esdb.ReadStreamOptions{}, count)
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "db.ReadStream")
    }
    defer stream.Close()

    for {
        event, err := stream.Recv()
        if errors.Is(err, esdb.ErrStreamNotFound) {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "stream.Recv")
        }
        if errors.Is(err, io.EOF) {
            break
        }
        if err != nil {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "stream.Recv")
        }

        esEvent := es.NewEventFromRecorded(event.Event)
        if err := aggregate.RaiseEvent(esEvent); err != nil {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "RaiseEvent")
        }
        a.log.Debugf("(Load) esEvent: {%s}", esEvent.String())
    }

    a.log.Debugf("(Load) aggregate: {%s}", aggregate.String())
    return nil
}

func (a *aggregateStore) Save(ctx context.Context, aggregate es.Aggregate) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "aggregateStore.Save")
    defer span.Finish()
    span.LogFields(log.String("aggregate", aggregate.String()))

    if len(aggregate.GetUncommittedEvents()) == 0 {
        a.log.Debugf("(Save) [no uncommittedEvents] len: {%d}", len(aggregate.GetUncommittedEvents()))
        return nil
    }

    eventsData := make([]esdb.EventData, 0, len(aggregate.GetUncommittedEvents()))
    for _, event := range aggregate.GetUncommittedEvents() {
        eventsData = append(eventsData, event.ToEventData())
    }

    var expectedRevision esdb.ExpectedRevision
    if len(aggregate.GetAppliedEvents()) == 0 {
        expectedRevision = esdb.NoStream{}
        a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)

        appendStream, err := a.db.AppendToStream(
            ctx,
            aggregate.GetID(),
            esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
            eventsData...,
        )
        if err != nil {
            tracing.TraceErr(span, err)
            return errors.Wrap(err, "db.AppendToStream")
        }

        a.log.Debugf("(Save) stream: {%+v}", appendStream)
        return nil
    }

    readOps := esdb.ReadStreamOptions{Direction: esdb.Backwards, From: esdb.End{}}
    stream, err := a.db.ReadStream(context.Background(), aggregate.GetID(), readOps, 1)
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "db.ReadStream")
    }
    defer stream.Close()

    lastEvent, err := stream.Recv()
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "stream.Recv")
    }

    expectedRevision = esdb.Revision(lastEvent.OriginalEvent().EventNumber)
    a.log.Debugf("(Save) expectedRevision: {%T}", expectedRevision)

    appendStream, err := a.db.AppendToStream(
        ctx,
        aggregate.GetID(),
        esdb.AppendToStreamOptions{ExpectedRevision: expectedRevision},
        eventsData...,
    )
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "db.AppendToStream")
    }

    a.log.Debugf("(Save) stream: {%+v}", appendStream)
    aggregate.ClearUncommittedEvents()
    return nil
}
Enter fullscreen mode Exit fullscreen mode

For the next step let's create an order aggregate, we have to add es.AggregateBase struct and implement When interface:
:

type OrderAggregate struct {
    *es.AggregateBase
    Order *models.Order
}

func (a *OrderAggregate) When(evt es.Event) error {

    switch evt.GetEventType() {

    case v1.OrderCreated:
        return a.onOrderCreated(evt)
    case v1.OrderPaid:
        return a.onOrderPaid(evt)
    case v1.OrderSubmitted:
        return a.onOrderSubmitted(evt)
    case v1.OrderCompleted:
        return a.onOrderCompleted(evt)
    case v1.OrderCanceled:
        return a.onOrderCanceled(evt)
    case v1.ShoppingCartUpdated:
        return a.onShoppingCartUpdated(evt)
    case v1.DeliveryAddressChanged:
        return a.onChangeDeliveryAddress(evt)
    default:
        return es.ErrInvalidEventType
    }
}


func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
    var eventData v1.OrderCreatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.AccountEmail = eventData.AccountEmail
    a.Order.ShopItems = eventData.ShopItems
    a.Order.Created = true
    a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
    a.Order.DeliveryAddress = eventData.DeliveryAddress
    return nil
}

func (a *OrderAggregate) onOrderPaid(evt es.Event) error {
    var payment models.Payment
    if err := evt.GetJsonData(&payment); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.Paid = true
    a.Order.Payment = payment
    return nil
}

func (a *OrderAggregate) onOrderSubmitted(evt es.Event) error {
    a.Order.Submitted = true
    return nil
}

func (a *OrderAggregate) onOrderCompleted(evt es.Event) error {
    var eventData v1.OrderCompletedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.Completed = true
    a.Order.DeliveredTime = eventData.DeliveryTimestamp
    a.Order.Canceled = false
    return nil
}

func (a *OrderAggregate) onOrderCanceled(evt es.Event) error {
    var eventData v1.OrderCanceledEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.Canceled = true
    a.Order.Completed = false
    a.Order.CancelReason = eventData.CancelReason
    return nil
}

func (a *OrderAggregate) onShoppingCartUpdated(evt es.Event) error {
    var eventData v1.ShoppingCartUpdatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.ShopItems = eventData.ShopItems
    a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
    return nil
}

func (a *OrderAggregate) onChangeDeliveryAddress(evt es.Event) error {
    var eventData v1.OrderDeliveryAddressChangedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.DeliveryAddress = eventData.DeliveryAddress
    return nil
}
Enter fullscreen mode Exit fullscreen mode

For example, let's look at, create order case, onCreateOrderCommand handle command, validate order state, serialize data and create CreateOrderEvent:

func (a *OrderAggregate) CreateOrder(ctx context.Context, shopItems []*models.ShopItem, accountEmail, deliveryAddress string) error {
    span, _ := opentracing.StartSpanFromContext(ctx, "OrderAggregate.CreateOrder")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", a.GetID()))

    if a.Order.Created {
        return ErrAlreadyCreated
    }
    if shopItems == nil {
        return ErrOrderShopItemsIsRequired
    }
    if deliveryAddress == "" {
        return ErrInvalidDeliveryAddress
    }

    event, err := eventsV1.NewOrderCreatedEvent(a, shopItems, accountEmail, deliveryAddress)
    if err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "NewOrderCreatedEvent")
    }

    if err := event.SetMetadata(tracing.ExtractTextMapCarrier(span.Context())); err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "SetMetadata")
    }

    return a.Apply(event)
}
Enter fullscreen mode Exit fullscreen mode

Then aggregate handle event using onOrderCreated method which only applies changes to the state:

func (a *OrderAggregate) onOrderCreated(evt es.Event) error {
    var eventData v1.OrderCreatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        return errors.Wrap(err, "GetJsonData")
    }

    a.Order.AccountEmail = eventData.AccountEmail
    a.Order.ShopItems = eventData.ShopItems
    a.Order.Created = true
    a.Order.TotalPrice = GetShopItemsTotalPrice(eventData.ShopItems)
    a.Order.DeliveryAddress = eventData.DeliveryAddress
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Our microservice accept http or gRPC requests:
swagger
For swagger used swag and let's look at create order handler code:

// CreateOrder
// @Tags Orders
// @Summary Create order
// @Description Create new order
// @Param order body dto.CreateOrderReqDto true "create order"
// @Accept json
// @Produce json
// @Success 201 {string} id ""
// @Router /orders [post]
func (h *orderHandlers) CreateOrder() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "orderHandlers.CreateOrder")
        defer span.Finish()
        h.metrics.CreateOrderHttpRequests.Inc()

        var reqDto dto.CreateOrderReqDto
        if err := c.Bind(&reqDto); err != nil {
            h.log.Errorf("(Bind) err: {%v}", err)
            tracing.TraceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        if err := h.v.StructCtx(ctx, reqDto); err != nil {
            h.log.Errorf("(validate) err: {%v}", err)
            tracing.TraceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        id := uuid.NewV4().String()
        command := v1.NewCreateOrderCommand(id, reqDto.ShopItems, reqDto.AccountEmail, reqDto.DeliveryAddress)
        err := h.os.Commands.CreateOrder.Handle(ctx, command)
        if err != nil {
            h.log.Errorf("(CreateOrder.Handle) id: {%s}, err: {%v}", id, err)
            tracing.TraceErr(span, err)
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("(order created) id: {%s}", id)
        return c.JSON(http.StatusCreated, id)
    }
}
Enter fullscreen mode Exit fullscreen mode

and gRPC CreateOrder handler does the same as http handler, validate request and call command.
For validation used validator because of implements value validations for structs and individual fields based on tags.
bloomrpc

func (s *orderGrpcService) CreateOrder(ctx context.Context, req *orderService.CreateOrderReq) (*orderService.CreateOrderRes, error) {
    ctx, span := tracing.StartGrpcServerTracerSpan(ctx, "orderGrpcService.CreateOrder")
    defer span.Finish()
    span.LogFields(log.String("req", req.String()))
    s.metrics.CreateOrderGrpcRequests.Inc()

    aggregateID := uuid.NewV4().String()
    command := v1.NewCreateOrderCommand(aggregateID, models.ShopItemsFromProto(req.GetShopItems()), req.GetAccountEmail(), req.GetDeliveryAddress())
    if err := s.v.StructCtx(ctx, command); err != nil {
        s.log.Errorf("(validate) aggregateID: {%s}, err: {%v}", aggregateID, err)
        tracing.TraceErr(span, err)
        return nil, s.errResponse(err)
    }

    if err := s.os.Commands.CreateOrder.Handle(ctx, command); err != nil {
        s.log.Errorf("(CreateOrder.Handle) orderID: {%s}, err: {%v}", aggregateID, err)
        return nil, s.errResponse(err)
    }

    s.log.Infof("(created order): orderID: {%s}", aggregateID)
    return &orderService.CreateOrderRes{AggregateID: aggregateID}, nil
}
Enter fullscreen mode Exit fullscreen mode

http and gRPC handlers do the same, validate the incoming request and call command service with CreateOrder command,
which load OrderAggregate, call HandleCommand method and save it to event store.
The main reason for CQRS gaining popularity is the ability to handle reads and writes separately due to severe differences in optimization techniques for those much more distinct operations.

func (c *createOrderHandler) Handle(ctx context.Context, command *CreateOrderCommand) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "createOrderHandler.Handle")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", command.GetAggregateID()))

    order := aggregate.NewOrderAggregateWithID(command.AggregateID)
    err := c.es.Exists(ctx, order.GetID())
    if err != nil && !errors.Is(err, esdb.ErrStreamNotFound) {
        return err
    }

    if err := order.CreateOrder(ctx, command.ShopItems, command.AccountEmail, command.DeliveryAddress); err != nil {
        return err
    }

    span.LogFields(log.String("order", order.String()))
    return c.es.Save(ctx, order)
}
Enter fullscreen mode Exit fullscreen mode

The process of building a piece of state from events is called a projection.
EventStoreDB has subscriptions, so we can subscribe our projections for order type stream events.
When we execute a command, the aggregate generates a new event that represents the state transitions of the aggregate. Those events are committed to the store, so the store appends them to the end of the aggregate stream. A subscription receives these events and updates its read models.
MongoDB's projection subscribes to events stream using the persistent subscription.
and process events using When method, like aggregate it applies changes depending on the event type:

func (o *mongoProjection) ProcessEvents(ctx context.Context, stream *esdb.PersistentSubscription, workerID int) error {

    for {
        event := stream.Recv()
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        if event.SubscriptionDropped != nil {
            o.log.Errorf("(SubscriptionDropped) err: {%v}", event.SubscriptionDropped.Error)
            return errors.Wrap(event.SubscriptionDropped.Error, "Subscription Dropped")
        }

        if event.EventAppeared != nil {
            o.log.ProjectionEvent(constants.MongoProjection, o.cfg.Subscriptions.MongoProjectionGroupName, event.EventAppeared, workerID)

            err := o.When(ctx, es.NewEventFromRecorded(event.EventAppeared.Event))
            if err != nil {
                o.log.Errorf("(mongoProjection.when) err: {%v}", err)
                if err := stream.Nack(err.Error(), esdb.Nack_Retry, event.EventAppeared); err != nil {
                    o.log.Errorf("(stream.Nack) err: {%v}", err)
                    return errors.Wrap(err, "stream.Nack")
                }
            }
            err = stream.Ack(event.EventAppeared)
            if err != nil {
                o.log.Errorf("(stream.Ack) err: {%v}", err)
                return errors.Wrap(err, "stream.Ack")
            }
            o.log.Infof("(ACK) event commit: {%v}", *event.EventAppeared.Commit)
        }
    }
}

func (o *mongoProjection) When(ctx context.Context, evt es.Event) error {
    ctx, span := tracing.StartProjectionTracerSpan(ctx, "mongoProjection.When", evt)
    defer span.Finish()
    span.LogFields(log.String("AggregateID", evt.GetAggregateID()), log.String("EventType", evt.GetEventType()))

    switch evt.GetEventType() {

    case v1.OrderCreated:
        return o.onOrderCreate(ctx, evt)
    case v1.OrderPaid:
        return o.onOrderPaid(ctx, evt)
    case v1.OrderSubmitted:
        return o.onSubmit(ctx, evt)
    case v1.ShoppingCartUpdated:
        return o.onShoppingCartUpdate(ctx, evt)
    case v1.OrderCanceled:
        return o.onCancel(ctx, evt)
    case v1.OrderCompleted:
        return o.onCompleted(ctx, evt)
    case v1.DeliveryAddressUpdated:
        return o.onDeliveryAddressUpdated(ctx, evt)

    default:
        o.log.Warnf("(mongoProjection) [When unknown EventType] eventType: {%s}", evt.EventType)
        return es.ErrInvalidEventType
    }
}
Enter fullscreen mode Exit fullscreen mode

and onOrderCreate method deserialize data and handle event calling MongoDB repository insert method:

func (o *mongoProjection) onOrderCreate(ctx context.Context, evt es.Event) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoProjection.onOrderCreate")
    defer span.Finish()
    span.LogFields(log.String("AggregateID", evt.GetAggregateID()))

    var eventData v1.OrderCreatedEvent
    if err := evt.GetJsonData(&eventData); err != nil {
        tracing.TraceErr(span, err)
        return errors.Wrap(err, "evt.GetJsonData")
    }
    span.LogFields(log.String("AccountEmail", eventData.AccountEmail))

    op := &models.OrderProjection{
        OrderID:         aggregate.GetOrderAggregateID(evt.AggregateID),
        ShopItems:       eventData.ShopItems,
        Created:         true,
        AccountEmail:    eventData.AccountEmail,
        TotalPrice:      aggregate.GetShopItemsTotalPrice(eventData.ShopItems),
        DeliveryAddress: eventData.DeliveryAddress,
    }

    _, err := o.mongoRepo.Insert(ctx, op)
    if err != nil {
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

The OrderMongoRepository insert method is simple:

func (m *mongoRepository) Insert(ctx context.Context, order *models.OrderProjection) (string, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "mongoRepository.Insert")
    defer span.Finish()
    span.LogFields(log.String("OrderID", order.OrderID))

    _, err := m.getOrdersCollection().InsertOne(ctx, order, &options.InsertOneOptions{})
    if err != nil {
        tracing.TraceErr(span, err)
        return "", err
    }

    return order.OrderID, nil
}
Enter fullscreen mode Exit fullscreen mode


The same idea for ElasticSearch projection implementation, we can use it for searching orders, so let's look at the repository Search method.
At this project as elastic search client used elastic.

func (e *elasticRepository) Search(ctx context.Context, text string, pq *utils.Pagination) (*dto.OrderSearchResponseDto, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "elasticRepository.Search")
    defer span.Finish()
    span.LogFields(log.String("Search", text))

    shouldMatch := v7.NewBoolQuery().
        Should(v7.NewMatchPhrasePrefixQuery(shopItemTitle, text), v7.NewMatchPhrasePrefixQuery(shopItemDescription, text)).
        MinimumNumberShouldMatch(minimumNumberShouldMatch)

    searchResult, err := e.elasticClient.Search(e.cfg.ElasticIndexes.Orders).
        Query(shouldMatch).
        From(pq.GetOffset()).
        Explain(e.cfg.Elastic.Explain).
        FetchSource(e.cfg.Elastic.FetchSource).
        Version(e.cfg.Elastic.Version).
        Size(pq.GetSize()).
        Pretty(e.cfg.Elastic.Pretty).
        Do(ctx)
    if err != nil {
        tracing.TraceErr(span, err)
        return nil, errors.Wrap(err, "elasticClient.Search")
    }

    orders := make([]*models.OrderProjection, 0, len(searchResult.Hits.Hits))
    for _, hit := range searchResult.Hits.Hits {
        jsonBytes, err := hit.Source.MarshalJSON()
        if err != nil {
            tracing.TraceErr(span, err)
            return nil, errors.Wrap(err, "Source.MarshalJSON")
        }
        var order models.OrderProjection
        if err := json.Unmarshal(jsonBytes, &order); err != nil {
            tracing.TraceErr(span, err)
            return nil, errors.Wrap(err, "json.Unmarshal")
        }
        orders = append(orders, &order)
    }

    return &dto.OrderSearchResponseDto{
        Pagination: dto.Pagination{
            TotalCount: searchResult.TotalHits(),
            TotalPages: int64(pq.GetTotalPages(int(searchResult.TotalHits()))),
            Page:       int64(pq.GetPage()),
            Size:       int64(pq.GetSize()),
            HasMore:    pq.GetHasMore(int(searchResult.TotalHits())),
        },
        Orders: mappers.OrdersFromProjections(orders),
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

More details and source code of the full project you can find here,
of course in real-world applications, we have to implement many more necessary features, like circuit breaker, retries, rate limiters, etc.,
depending on the project it can be implemented in different ways, for example, you can use Kubernetes and Istio for some of them.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

Top comments (3)

Collapse
 
mirkoperillo profile image
mirko

Thank you for the clear explanation, you have improved my topic understanding

Collapse
 
alex_g profile image
Alex G

What a mess.

Collapse
 
satioo profile image
Vaibhav Satam

This is the best article of 2022. Could you also extend it to the concept of Sagas?