DEV Community

Mario Meyrelles
Mario Meyrelles

Posted on • Updated on

Serverless Event-Driven Architecture on Azure: A Worked Example - Part 2

Introduction

In the first post of this series here, we discussed how we can design a serverless and event-driven architecture project with components that are fairly easy to understand and integrate. In this second part of the series, we focus on command handling and Aggregate Root design, illustrating how to process commands and generate events inside the business logic. We created a fictional business scenario to help illustrate the concepts.

Some Business Context

Our New Prepaid Card

We are considering in this example a hypothetical prepaid card that stores credits that users can load using their own money. These credits can be used to shop on physical stores using a mobile app.

For users, there is a fee of 0,02% when money is loaded into the card. If the user loads more than $500.00 in a single transaction there is no fee. Users can freely send credits to other users of the platform using the mobile application offered to the users. There are no limits or charges for credits transfers. Transactions involving a significant amount of money are subject to a validation process and can take longer to be processed. The users also can apply for a plastic card if they wish to do credit and debit transactions using the network of the common credit card operators. This card can also be used to do ATM withdrawals. The only costs that will be applied for plastic card transactions are the processing and operator costs, that are not under our control.

For merchants, there is only a 0,025% fee on each transaction for merchants using the platform and absolutely nothing more in the first year. In the next year, the fees will be still low but can be raised a little bit. Any merchant can use this platform to accept payments configuring an account on the platform. The setup process is online and usually, it`s possible to start selling in less than two working days. The merchant can see the money corresponding to each sale in less than 24 hours in their account. The merchant also can receive the payment in credits and make the conversion to money in the future. If the merchant waits 31 days to make the conversion, the transaction fee is removed for the given sale.

The scope of this sample is to model the process of transferring loaded credits from one user to another using the app. We will ignore the other ways to use credits and all the complexities involved. Also, we are ignoring the reload process and we are ignoring the fees management.

Exploring the Domain

As shown above, we have partitioned the problem in a more understandable structure. We have 3 bounded contexts here:

Context Map for our exeample

Bounded Context Mission
User Management Focus on the lifecycle of a user as a customer of our prepaid card service. In this sample, we will assume that all cardholders are already created, are valid, and have the app configured. We will only seed configured users and we won’t implement any use case.
Balance Management Controls the cardholder's balance. Any action like a reload, payment or credit transfer is ultimately done via a credit or debit transaction.
App Credits Transfers Controls free credit transfers done inside the platform. Collaborates with Balance Management to securely add or remove credits from the accounts involved.

As you can imagine, the whole business problem space can be huge. For sake of simplicity we are only showing subdomains of interest in green:

Subdomain Mission
Account Transactions Assures that all the requests for debits and credits are fulfilled or rejected correctly, avoiding overdrafts and duplicated transactions. In this implementation, we show the details of working with event sourced aggregates.
Credits Transfers Responds for all the free credits transfers. The transfers can be done immediately and can be scheduled. Values considered too high are subject to human validation. Automatic rejection of transfers can occur when certain conditions are met. This subdomain shows an implementation of a Process Manager that effectively settles the credits transfers that are accepted after the validation process. We also implement the workflow for human validation in this project.

Now it's time to go into the details of the aggregates, commands, and events we plan to implement in this sample project:

Aggregate Commands Events
Cardholder
Prepaid Card Account (Account Transactions) DecreaseCredits
IncreaseCredits
ReverseCreditsChange
CreditsDecreased
CreditsDecreaseRejected
CreditsDecreaseReversed
CreditsIncreased
CreditsIncreaseRejected
CreditsIncreaseReversed
App Credits Transfer CreateCreditsTransfer CreditsTransferAccepted CreditsTransferFailed
CreditsTransferManuallyApproved
CreditsTransferManuallyRejected
CreditsTransferRejected
CreditsTransferScheduled (sprint 2)
CreditsTransferSentToManualValidation
CreditsTransferSucceeded

As the reader can see, the Credits Transfer aggregate is controlling its state using events to represent most of the possible states of a credits transfer. The events in the Prepaid Card Account aggregate are supporting the credits transfer and may not be exhaustive. The Cardholder aggregate only supports the Prepaid Card Account during validation and won`t have any command or event considered in this partition of the platform design. We won’t show in this article how we work scheduled commands and events. This discussion will be done in a future post.

Technical Implementation

Remembering the general architecture of the last post

Before diving into more technical details and code implementation, let's remember the general architecture that we described in the last post:

sadad

HTTP Endpoints and Azure Functions

HTTP endpoints are the most common way to communicate with the external world. Often, HTTP endpoints and used inside microservices communication. Azure Functions are very useful to build very lightweight APIs that can be used as a backend. In more complex architectures, the API layer can be put behind Azure API Gateway.

In the context of event-driven architectures, logic inside HTTP endpoints is part of command processing and therefore, can and should help the client to issue correct commands. This means that they can have access to read models and they write to the command store.

Basic API with Azure Functions

The implementation of a typical HTTP that can be like the following:

// TransfersApi.cs
using FluentValidation;
using FluentValidation.Results;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands;
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace ServerlessStreaming.Api
{
    public static partial class TransfersApi
    {
        [FunctionName("TransfersApi")]
        public static async Task<IActionResult> Run(

            // be sure to add content-type: application/json on your request!
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = "create")] 
            HttpRequest req,

            [CosmosDB(
                databaseName: "CreditsTransfers",
                collectionName: "Commands",
                ConnectionStringSetting = "CosmosDBConnectionString")]
            IAsyncCollector<CreateCreditsTransferCommand> commands,

            ILogger log)
        {
            log.LogInformation("Starting validation of Credit transfer request...");

            using var reader = new StreamReader(req.Body);
            string requestBody = await reader.ReadToEndAsync();
            var payload = JsonConvert.DeserializeObject<CreditsTransferModel>(requestBody);

            if (payload == null)
            {
                return new BadRequestObjectResult("Invalid Credit Transfer Request.");
            }

            var validator = new CreditsTransferValidator();
            var validationResult = validator.Validate(payload);

            if (!validationResult.IsValid)
            {
                return new BadRequestObjectResult(validationResult.Errors.Select(e => new {
                    Field = e.PropertyName,
                    Error = e.ErrorMessage
                }));
            }

            var command = new CreateCreditsTransferCommand(
                    aggregateId: payload.Id,
                    correlationId: Guid.NewGuid().ToString(),
                    createdAtUtc: DateTime.UtcNow,
                    payload: payload);

            try
            {
                await commands.AddAsync(command);
                await commands.FlushAsync();

                log.LogWarning($"Request Accepted. CorrelationId: {command.CorrelationId}, AggregateId: {command.AggregateId}");
                req.HttpContext.Response.Headers.Add("ETag", command.ETag);
                return new AcceptedResult();

            }
            catch (DocumentClientException ex) when (ex.Error.Code == "Conflict")
            {

                log.LogWarning("Returning Not Modified: ", payload.Id);
                req.HttpContext.Response.Headers.Add("ETag", command.ETag);
                return new StatusCodeResult(304);
            }
        }
    }

    public class CreditsTransferValidator : AbstractValidator<CreditsTransferModel>
    {
        // see also: https://www.tomfaltesek.com/azure-functions-input-validation/

        public CreditsTransferValidator()
        {
            RuleFor(x => x.Id).NotEmpty();

            RuleFor(x => x.Sender.Id).NotEmpty();
            RuleFor(x => x.Receiver.Id).NotEmpty();

            RuleFor(x => x.Sender.AccountId).NotEmpty();
            RuleFor(x => x.Receiver.AccountId).NotEmpty();

            RuleFor(x => x.Sender.DocumentNumber).NotEmpty();
            RuleFor(x => x.Receiver.DocumentNumber).NotEmpty();

            // todo: validate if sender and receiver are different
            // todo: validade if account and users are valid

            RuleFor(x => x.Amount).GreaterThan(0m);
        }

        protected override bool PreValidate(ValidationContext<CreditsTransferModel> context, ValidationResult result)
        {

            if (context.InstanceToValidate == null)
            {
                result.Errors.Add(new ValidationFailure("", "Please ensure a model was supplied."));
                return false;
            }
            return true;
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

The code above demonstrates how an Azure Function can react to HTTP requests and produce commands. The function is annotated with bindings and triggers. We can see a trigger carrying the payload of an HTTP request and we can see an output binding that sends the commands to CosmosDB, our command and event store in this project. We can also see an ILogger parameter in the function signature. The logger is extensively used to send data to Application Insights and to debug locally. The extensive use of triggers and bindings makes Azure Functions very pleasant to use and in many cases, removes the need to manage instances of Http clients or CosmosDB clients in the function code.

The function implements basic validations and using FluentValidator library in .NET. Having a valid payload, we create a new command and send it to CosmosDB using a very convenient output binding. With this binding, we can just call CosmosDB. We handle a possible insertion conflict by calling FlushAsync to be able to handle any exception during the insertion and possibly, return to the caller that a given request has already been processed. When the request is not valid, we use the validation messages to inform the caller about wrong or missing parameters. We expect that the payload conforms to a model that represents the payload of the data that contains all the information needed to process the command.

The API above is used to create a new credit transfer. It does not depend on external data to perform validation. In this example, we have another API that is exposed publicly, the Transactions API. We have a more complex situation, where data from the read model needs to be used to perform the validation process:

A more complex API

// TransactionsApi.cs
using FluentValidation;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate.Commands;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace ServerlessStreaming.Api
{

    public class TransactionsApi
    {
        // as a client to the application, the api can use the read model to validate commands.
        private readonly ICosmosDbService<PrepaidCardAccount> prepaidAccountReadModel; 

        public TransactionsApi(ICosmosDbService<PrepaidCardAccount> prepaidAccountReadModel)
        {
            this.prepaidAccountReadModel = prepaidAccountReadModel;
        }


        [FunctionName("TransactionsApi")]
        public async Task<IActionResult> Run(

            // be sure to add content-type: application/json on your request! 
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = "create")]
                HttpRequest req, 

            [CosmosDB(
                databaseName: "CreditsTransfers",
                collectionName: "Commands",
                ConnectionStringSetting = "CosmosDBConnectionString")]
                IAsyncCollector<IncreaseCreditsCommand> increaseCreditsCommands,

            [CosmosDB(
                databaseName: "CreditsTransfers",
                collectionName: "Commands",
                ConnectionStringSetting = "CosmosDBConnectionString")]
                IAsyncCollector<DecreaseCreditsCommand> decreaseCreditsCommands,

            ILogger log)
        {
            log.LogInformation("Starting validation of Credit Change request...");


            using var reader = new StreamReader(req.Body);
            string requestBody = await reader.ReadToEndAsync();
            var payload = JsonConvert.DeserializeObject<AccountTransactionModel>(requestBody);


            // some validation rules here: https://docs.fluentvalidation.net/en/latest/advanced.html
            var existingAccount = await prepaidAccountReadModel.GetItemAsync(payload.PrepaidCardAccountId, payload.PrepaidCardAccountId);
            var validator = new TransactionValidator();
            var validationContext = new ValidationContext<AccountTransactionModel>(payload);
            validationContext.RootContextData["account"] = existingAccount; // using the account stored on read model to perform validation of business data.
            var validationResult = validator.Validate(validationContext);

            if (!validationResult.IsValid)
            {
                return new BadRequestObjectResult(validationResult.Errors.Select(e => new {
                    Field = e.PropertyName,
                    Error = e.ErrorMessage
                }));
            }


            switch (payload.Operation)
            {
                case AccountTransactionModel.IncreaseOperation:
                {
                    return await PerformIncreaseOperation(req, payload, increaseCreditsCommands, log);
                }

                case AccountTransactionModel.DecreaseOperation:
                {
                    return await PerformDecreaseOperation(req, payload, decreaseCreditsCommands, log);
                }
                default:
                {
                    return new BadRequestObjectResult("Invalid Request: please inform operation.");
                }
            }
        }

        private static async Task<IActionResult> PerformDecreaseOperation(HttpRequest req, AccountTransactionModel creditsChangeRequest, IAsyncCollector<DecreaseCreditsCommand> decreaseCreditsCommands, ILogger log)
        {
            var command = new DecreaseCreditsCommand(aggregateId: creditsChangeRequest.PrepaidCardAccountId,
                correlationId: creditsChangeRequest.CorrelationId ?? Guid.NewGuid().ToString(),
                createdAtUtc: DateTime.UtcNow,
                payload: creditsChangeRequest);
            try
            {
                await decreaseCreditsCommands.AddAsync(command);
                await decreaseCreditsCommands.FlushAsync();

                log.LogWarning($"Decrease Credits Command generated. CorrelationId: {command.CorrelationId}, AggregateId: {command.AggregateId}");
                req.HttpContext.Response.Headers.Add("ETag", command.ETag);
                return new AcceptedResult();
            }
            catch (DocumentClientException ex) when (ex.Error.Code == "Conflict")
            {

                log.LogWarning("Returning Not Modified: ", creditsChangeRequest.Id);
                req.HttpContext.Response.Headers.Add("ETag", command.ETag);
                return new StatusCodeResult(304);
            }
        }

        private static async Task<IActionResult> PerformIncreaseOperation(HttpRequest req, AccountTransactionModel creditsChangeRequest, IAsyncCollector<IncreaseCreditsCommand> increaseCreditsCommands, ILogger log)
        {
            var command = new IncreaseCreditsCommand(aggregateId: creditsChangeRequest.PrepaidCardAccountId,
                correlationId: creditsChangeRequest.CorrelationId ?? Guid.NewGuid().ToString(),
                createdAtUtc: DateTime.UtcNow,
                payload: creditsChangeRequest);

            try
            {
                await increaseCreditsCommands.AddAsync(command);
                await increaseCreditsCommands.FlushAsync();

                req.HttpContext.Response.Headers.Add("ETag", command.ETag);
                log.LogWarning($"Increase Credits Command generated. CorrelationId: {command.CorrelationId}, AggregateId: {command.AggregateId}");
                return new AcceptedResult();
            }
            catch (DocumentClientException ex) when (ex.Error.Code == "Conflict")
            {
                log.LogWarning("Returning Not Modified: ", creditsChangeRequest.Id);
                req.HttpContext.Response.Headers.Add("ETag", command.ETag);
                return new StatusCodeResult(304);
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

In this second example, we are demonstrating an API to increase or decrease credits for a given account. These transactions can be called from other services like the Transfers API. This function is using dependency injection to receive a dependency on a service that can do queries in Cosmos DB. This mechanism is very similar to the ASP.NET Core dependency injection infrastructure you are probably used to work with.

The validation logic is more complex. We first go to CosmosDB and look for an account. Inside the validation process, we check if the informed account owner is valid in a custom validator:

using FluentValidation;
using FluentValidation.Results;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate;

namespace ServerlessStreaming.Api
{
    public class TransactionValidator : AbstractValidator<AccountTransactionModel>
    {
        // see also: https://www.tomfaltesek.com/azure-functions-input-validation/

        public TransactionValidator()
        {
            RuleFor(x => x).NotNull();
            RuleFor(x => x.Id).NotEmpty();
            RuleFor(x => x.PrepaidCardAccountId).NotEmpty();
            RuleFor(x => x.CardholderId).NotEmpty();
            RuleFor(x => x.Amount).GreaterThan(0m);

            RuleFor(x => x.CardholderId).Custom((x, context) => {

                var account = context.ParentContext.RootContextData["account"] as PrepaidCardAccount;
                if (x != account.CardHolderId)
                {
                    context.AddFailure("The informed account does not have a valid owner.");
                }
            });
        }

        protected override bool PreValidate(ValidationContext<AccountTransactionModel> context, ValidationResult result)
        {

            if (context.InstanceToValidate == null)
            {
                result.Errors.Add(new ValidationFailure("", "Please ensure a model was supplied."));
                return false;
            }
            return true;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

If there is a mismatch, we add an error to the list of validation errors and return them to the client. Another difference is that we are using two output bindings - one for each kind of transaction. The rest of the function is generally the same. We do the transformation of a request into a valid command that potentially will succeed and produce a change in the global state.

Commands and Aggregates in the world of CQRS and Event Sourcing

CQRS and Event Sourcing, as said in many other articles, work very well in combination. We will show here a complete example and we will describe how we can build our event sourced aggregates to properly organize our business logic.

Physical Organization

In our solution, we have a project to organize the domain and maintain the business rules together. In serverless architectures, it's often tempting to write business logic inside the functions. We should try to avoid this. Our domain logic should try to be host-agnostic.

Alt Text

The image below shows the physical structure of our example project. It's easy to see the aggregates, commands, and events.
A new developer can rapidly understand what are the main commands and events produced by the system. The structure of commands and events shown above can help to make things more intention revealing. The business rules are usually found in command handlers.

Command Structure

A command contains all the data necessary to be processed without any other dependencies. Commands are very simple structures and fairly easy to understand.

First, let's see our command implementation details:

using Newtonsoft.Json;
using System;

namespace ServerlessStreaming.Common
{

    public interface ICommand
    {
        string Id { get; set; }
        string CorrelationId { get; }
        string AggregateId { get; }
        DateTime CreatedAtUtc { get; }
        string CommandName { get; set; }
    }


    public abstract class Command<TPayload> : ICommand where TPayload : IPayload
    {
        private string commandName;

        protected Command(string aggregateId, string correlationId, DateTime createdAtUtc, TPayload payload)
        {
            CorrelationId = correlationId;
            CreatedAtUtc = createdAtUtc;
            AggregateId = aggregateId;
            Payload = payload;
            commandName = GetType().FullName;
        }


        [JsonProperty(PropertyName = "id")]
        public string Id { get; set; }

        public string CommandName
        {
            get
            {
                return commandName;
            }
            set
            {
                if (value == GetType().FullName)
                {
                    commandName = value;
                }
                else
                {
                    throw new TypeInitializationException(GetType().FullName, new Exception("The command name must match the C# full type name in order to prevent json desserialization mismatches."));
                }
            }
        }

        public string CorrelationId { get; }

        public DateTime CreatedAtUtc { get; }

        public string AggregateId { get; }

        public TPayload Payload { get; }

        public string ETag
        {
            get
            {
                //todo: needs to be opaque
                return CommandHash;
            }
        }

        public abstract string CommandHash { get; }

        public override string ToString()
        {
            try
            {
                var json = JsonConvert.SerializeObject(this);
                return json;
            }
            catch (Exception)
            {
                return $"AggregateId: {AggregateId}, CorrelationId: {CorrelationId}, Id: {Id}, CommandName: {CommandName}";
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The structure is not complicated. We have a Command class that requires a type parameter of type TPayload that represents the model of the command. Another extremely important field is CorrelationId. The correlation identifier is used to tie all the commands and events produced by a given request. This is mandatory for the troubleshooting of the multiple parts of the system and can help in auditing. With this unique identifier, we can query all the commands, events, and log messages on either CosmosDB or Application Insights. This is a big win for event-driven architectures. We can investigate and understand how the state was reached and we can see the motivations of each state change. A command encapsulates a motivation to change the global state. This is not true for common CRUD systems, where we only see the final state of an aggregate.

We also introduce the concept of Aggregate Id represented by another required field of the command. The aggregate id is the concrete subject of the command. The command must refer to an aggregate identified so it changes the state of this specific aggregate. The aggregate id for an account can be, for example, 1236-5649-9998-7721.

All commands are required to have a unique name and we automate this by getting the C# class name as the command name in our example. We use the command name to partition the Commands container in CosmosDB. Usually, we share the same container with multiple commands to reduce the overall solution's financial cost. Since we will have multiple commands with different JSON structures, the process of deserialization of the commands requires that we add an identifier on each command - we use the command name also to help in the deserialization process so we can find the correct deserializer. We use ETag to help the caller in client-side caching.

A concrete command should be something like this:

using ServerlessStreaming.Common;
using System;

namespace ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands
{
    public class CreateCreditsTransferCommand : Command<CreditsTransferModel>
    {
        public CreateCreditsTransferCommand(
            string aggregateId,
            string correlationId,
            DateTime createdAtUtc,
            CreditsTransferModel payload)
            : base(aggregateId, correlationId, createdAtUtc, payload) {  }


        public override string CommandHash => AggregateId + "_" + Payload.Id;
    }
}
Enter fullscreen mode Exit fullscreen mode

We don't expose an empty constructor and we use CreditsTransferModel as our command payload. The type parameter of the base class informs what is the type acceptable for the model. The code for CreditsTransferModel is:

using Newtonsoft.Json;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel.CardholderAggregate;
using System;

namespace ServerlessStreaming.DomainModel.CreditsTransferAggregate
{
    public class CreditsTransferModel : IPayload
    {
        [JsonProperty(PropertyName = "id")]
        public string Id { get; set; }
        public CardholderInfo Sender { get; set; }
        public CardholderInfo Receiver { get; set; }
        public decimal Amount { get; set; }
        public string TransferPurpose { get; set; }
        public DateTime? DateScheduledUtc { get; set; }
        public DateTime DateCreatedUtc { get; set; }
        public DateTime? DateProcessedUtc { get; set; }
    }
}
Enter fullscreen mode Exit fullscreen mode

As we can see here, the model contains the fields needed to create a valid transfer operation. We force all the models to have an Id property. The IPayload interface enforces this.

Command Handling Process: Azure Functions as a Host

As we can recall, usually commands are generated by an API and then saved on a CosmosDB Collection in our architecture. The main reason for this is to be able to react to each command using the mechanisms available on the cloud for us. So we reactively receive a valid command from CosmosDB and trigger another Azure Function. In general, every command will have a corresponding command handler. The physical structure in the code is trivial:

Organization of the command handlers

The command handler for CreateCreditsTransferCommand is here:

using Microsoft.Azure.Documents;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Polly;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using static ServerlessStreaming.Common.UtilityFunctions;

namespace CreditsTransferProcessor
{
    public class CreateCreditsTransferCommandHandler
    {
        private readonly CreditsTransferCommandHandlers commandHandlers;

        public CreateCreditsTransferCommandHandler(CreditsTransferCommandHandlers commandHandlers)
        {
            this.commandHandlers = commandHandlers;
        }

        [FunctionName("CommandHandler_CreateCreditsTransfer")]
        public async Task Run(

            [CosmosDBTrigger(
            databaseName: "CreditsTransfers",
            collectionName: "Commands",
            ConnectionStringSetting = "CosmosDBConnectionString",
            LeaseCollectionName = "leases",
            LeaseCollectionPrefix = "CH_CCT")] // abbr of this command handler
            IReadOnlyList<Document> commandsReceived,


            [CosmosDB(
                databaseName: "CreditsTransfers",
                collectionName: "Events",
                ConnectionStringSetting = "CosmosDBConnectionString")]
            IAsyncCollector<Event> outputEvents,


            [CosmosDB(
                databaseName: "CreditsTransfers",
                collectionName: "Commands",
                ConnectionStringSetting = "CosmosDBConnectionString")]
            IAsyncCollector<CommandHandlerFailed> commandsFailed,

             [CosmosDB(
                databaseName: "CreditsTransfers",
                collectionName: "Commands",
                ConnectionStringSetting = "CosmosDBConnectionString")]
            IAsyncCollector<CommandHandlerSucceeded> commandsSucceeded,


            [Queue("command-handler-failed")]
            IAsyncCollector<string> deadLetterQueue,

            ILogger log)
        {

            if (commandsReceived == null)
            {
                log.LogWarning("No commands available to process!");
                return;
            }

            log.LogInformation("Number of commands to process: " + commandsReceived.Count);


            foreach (var c in commandsReceived)
            {
                CreateCreditsTransferCommand command = default;

                try
                {
                    log.LogDebug("Processing document id: {Id}...", c.Id);

                    var input = CanDesserializeAs<CreateCreditsTransferCommand>(c);
                    if (input.IsNullOrEmpty())
                    {
                        continue;
                    }

                    command = JsonConvert.DeserializeObject<CreateCreditsTransferCommand>(input);

                    var context = new Context().WithLogger(log);
                    context.Add("command", command);
                    await RetryPolicies.GetPolicy().ExecuteAsync(async (context) =>
                    {
                        if (command != null && command.CommandName == typeof(CreateCreditsTransferCommand).FullName)
                        {
                            // invoke domain model logic to decide if the command should be accepted.
                            await commandHandlers.HandleAsync(command, log, outputEvents);

                            // send command confirmation
                            var commandSucceeded = new CommandHandlerSucceeded(command.Id, command.CorrelationId, command.AggregateId, command.CommandHash);
                            await commandsSucceeded.AddAsync(commandSucceeded);

                            log.LogWarning("Command Handler succeeded: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}",
                                                                                            command.CommandName, command.CorrelationId, command.AggregateId);
                        }
                    }, context);
                }
                catch (JsonSerializationException ex) when (ex.InnerException is TypeInitializationException)
                {
                    continue;
                }
                catch (Exception ex)
                {
                    await DeadLettering.HandleFailure(commandsFailed, deadLetterQueue, log, command, ex);
                }

                log.LogDebug("Processed document id: {Id}...", c.Id);
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

The constructor is receiving a concrete instance of a command handler defined in our domain model. As we can see, there is a new trigger here: CosmosDBTrigger. The Azure Function runtime uses the CosmosDB Change Feed mechanism to invoke this function when a new item is added on the Commands container. There are some configurations and infrastructure to make this work but in general, we need an auxiliary collection to store some locks - the lease container - and we must make sure that we don't have a duplicated lease collection prefix.

We can also see that there are output bindings to Events collection and other bindings to store commands that are succeeded and failed. We have also a binding can store the payload of failed commands.

The function can receive more than one command and we must not lose any command. For each command received, we check if the message is useful for us. We will receive all the commands in all command handlers and we must ignore commands that we don't handle so other functions can handle it. After successful deserialization, we start a retry context and try to perform our business operation, delegating this work to the command handler that was injected in the constructor. This function works as a gateway to our domain logic. After a successful operation, the CommandHandlerSucceeded message is saved on CosmosDB.

When something fails, we do a retry. If it fails again, we first try to send a CommandHandlerFailed message to CosmosDB. If it fails again, we store the command payload on a storage queue. These are very exceptional situations and must be monitored by the operations team. The error handling logic is as follows:

using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate;
using System;
using System.Threading.Tasks;

namespace ServerlessStreaming.Common
{
    public static class DeadLettering
    {
        public static async Task HandleFailure<T>(IAsyncCollector<CommandHandlerFailed> commandsFailed, IAsyncCollector<string> commandHandlerDlq, ILogger log, Command<T> cmd, Exception ex)
            where T : IPayload
        {
            log.LogDebug("Starting command error handling using Cosmos: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", cmd.CommandName, cmd.CorrelationId, cmd.AggregateId);

            var commandHandlerFailed = new CommandHandlerFailed(cmd.Id, cmd.CorrelationId, cmd.AggregateId, cmd.CommandHash, ex);
            try
            {
                await commandsFailed.AddAsync(commandHandlerFailed);
                log.LogError("Command Handler failed: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}, Exception: {Exception}",
                                                    cmd.CommandName, cmd.CorrelationId, cmd.AggregateId, ex);
            }
            catch (Exception ex2)
            {
                log.LogDebug("Starting command error handling using Queue Storage after cosmos failure: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}",
                                                   cmd.CommandName, cmd.CorrelationId, cmd.AggregateId);

                try
                {
                    await commandHandlerDlq.AddAsync(cmd.ToString());
                    await commandHandlerDlq.FlushAsync();
                    log.LogDebug("Command sent to dead letter:  Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", cmd.CommandName, cmd.CorrelationId, cmd.AggregateId);
                }
                catch (Exception ex3)
                {
                    log.LogCritical("Error sending failed command to dead letter queue. Exception: {Exception}:", ex3);
                    throw;
                }


                log.LogCritical("Command Handler and command was deadlettered: Command {CommandName}, CorrelationId: {CorrelationId}, AggregateId: {AggregateId}, Exception: {Exception}", cmd.CommandName, cmd.CorrelationId, cmd.AggregateId, ex2);

            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

As we can see above, we try our best to notify the failure to the external world. If we can use CosmosDB to communicate the command handling failure, we can react to these messages and perhaps, notify the customer using some sort of automatic notification. The storage queue is our last resort. We would have used other kinds of outputs here. The idea is to use a different persistence mechanism.

Please note again that we are using a single CosmosDB container to store all the commands and we do the same to store all the events. This is a cost optimization strategy to avoid extra costs. It would be possible to create a container for each type of command or event. This would be very analogous to a relational modeling strategy. As each new container can add cost to the solution, we generally try to avoid a bigger number of containers. The side-effect of this organization is that all the functions are called on each insert and we should ignore the messages that we don't want to handle. This kind of organization, of course, can be modified later if needed. You need to monitor the performance of your solution and find bottlenecks or cost pressure continuously.

Command Handling Process: The Command Handler

The command handler works as a small orchestrator. It receives input from the host, in our case, an Azure Function. The command handler is part of the domain logic. With a command in hands, it calls the Aggregate Root to get the command handled. It depends on an EventSourcedRepository to persist the results of command handling: Events! In an event-sourced application using CQRS and DDD concepts, we ask for the domain to process a command and produce events.

using Microsoft.Extensions.Logging;
using ServerlessStreaming.DomainModel.CreditsTransferAggregate.Commands;
using System;
using ServerlessStreaming.Common;
using ServerlessStreaming.Infrastructure;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using System.Linq;

namespace ServerlessStreaming.DomainModel.CreditsTransferAggregate
{

    public class CreditsTransferCommandHandlers 
    {
        private readonly EventSourcedRepository eventSourcedRepository;

        public CreditsTransferCommandHandlers(EventSourcedRepository eventSourcedRepository)
        {
            this.eventSourcedRepository = eventSourcedRepository;
        }


        public async Task HandleAsync(CreateCreditsTransferCommand command, ILogger hostLogger, IAsyncCollector<Event> azureOutputBinding = null)
        {
            if (azureOutputBinding != null)
            {
                eventSourcedRepository.SetWritePersistence(azureOutputBinding);
            }

            var eventStream = await eventSourcedRepository.LoadAggregateStreamAsync(command.AggregateId);

            // this is applicable only when we are creating a new aggregate. We must have no prior events.
            if (eventStream.Any())
            {
                throw new Exception("Aggregate has already been created"); // todo: better exceptions;
            }

            var creditsTransfer = new CreditsTransferAggregate(command, hostLogger); 
            creditsTransfer.DesignateCreditsTransferHandlingStrategy(command);

            await eventSourcedRepository.SaveToAggregateStreamAsync(creditsTransfer.Changes);

        }


        public async Task HandleAsync(ManuallyApproveCreditsTransferCommand command, ILogger hostLogger, IAsyncCollector<Event> azureOutputBinding = null)
        {
            if (azureOutputBinding != null)
            {
                eventSourcedRepository.SetWritePersistence(azureOutputBinding);
            }

            var eventStream = await eventSourcedRepository.LoadAggregateStreamAsync(command.AggregateId);
            var creditsTransfer = new CreditsTransferAggregate(eventStream, hostLogger);
            creditsTransfer.PerformManualApproval(command);

            await eventSourcedRepository.SaveToAggregateStreamAsync(creditsTransfer.Changes);
        }



        public async Task HandleAsync(ManuallyRejectCreditsTransferCommand command, ILogger azureFunctionsLogger, IAsyncCollector<Event> azureOutputBinding = null)
        {
            if (azureOutputBinding != null)
            {
                eventSourcedRepository.SetWritePersistence(azureOutputBinding);
            }

            var eventStream = await eventSourcedRepository.LoadAggregateStreamAsync(command.AggregateId);
            var creditsTransfer = new CreditsTransferAggregate(eventStream, azureFunctionsLogger);
            creditsTransfer.PerformManualRejection(command);

            await eventSourcedRepository.SaveToAggregateStreamAsync(creditsTransfer.Changes);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

As we can see in this code, the command handler knows how to call the aggregate. It calls a public method on the aggregate root and captures the Changes property to persist the events to the event-sourced repository. Since we are using the output binding mechanism of the host, Azure Functions, we need to slightly change the command handler to receive this infrastructure dependency and configure the repository to understand IAsyncCollector<T>. We are also using the logging infrastructure of Azure in the form of a common ILogger.

Understanding the Event Stream

Before going into the demonstration of how an aggregate root is implemented, it's important to visualize a concrete example of what an event sourced aggregate means. It's far easier to explain using an example.

Consider that a new account is created. Then some credits are added. The account number created is 1234-4321-5678-0987. The aggregate name is Account.

Event Name Aggregate Version Event Body Balance (current state of this aggregate)
Account Created 0 {AggregateId: 1234-4321-5678-0987} $ 0.00
Credits Increased 1 {AggregateId: 1234-4321-5678-0987, Amount: 1000.00} $ 1000.00
Credits Decreased 2 {AggregateId: 1234-4321-5678-0987, Amount: 10.00} $ 990.00

As we can see, we only save to the database these events. We don't perform an update. Never. A database that only appends events is called event store in the industry. We are using CosmosDB as our event store. As you can imagine, the current state of Account aggregate is derived from the event stream. We must read the entire event stream to calculate the current state of the aggregate. This is a major mindset change. The repository code and aggregate code now need to understand how to work with an event stream. We admit that this is not trivial and requires some study time, but you should consider this strategy to create new systems that are not small. Using event sourcing is an advantage over traditional database-state based development because all the changes in the global state are documented in the event stream. The global state is the entire event stream. We can easily understand how a given aggregate evolved and we gain the ability to do time-travel debugging to identify and solve issues.

Aggregate Versions and Concurrency

Suppose there are two instances of the DecreaseCreditsCommandHandler that intend to decrease $500.00, almost at the same time (race condition). This would be possible if we just added events to the stream without checking anything, allowing the user to have then a negative balance. The situation would be like this:

Event Name Aggregate Version Event Body Balance (current state of this aggregate)
Account Created 0 {AggregateId: 1234-4321-5678-0987} $ 0.00
Credits Increased 1 {AggregateId: 1234-4321-5678-0987, Amount: 1000.00} $ 1000.00
Credits Decreased 2 {AggregateId: 1234-4321-5678-0987, Amount: 10.00} $ 990.00
Credits Decreased 3 {AggregateId: 1234-4321-5678-0987, Amount: 500.00} $ 490.00
Credits Decreased 3 {AggregateId: 1234-4321-5678-0987, Amount: 500.00} $ -10.00

We can see that we are breaking a domain invariant that avoids accounts with a negative amount of credits. To avoid this problem we should reject the command that would lead to an invalid state. A solution to this problem would be to query the event stream and detect the current aggregate version. We then try to publish a new event incrementing the version. If there is an event with the same version, we do a retry a little bit later.

Command Name Event Name Aggregate Version Event Body Balance (current state of this aggregate)
CreateNewAccount Account Created 0 {AggregateId: 1234-4321-5678-0987} $ 0.00
IncreaseCredits Credits Increased 1 {AggregateId: 1234-4321-5678-0987, Amount: 1000.00} $ 1000.00
DecreaseCredits Credits Decreased 2 {AggregateId: 1234-4321-5678-0987, Amount: 10.00} $ 990.00
DecreaseCredits thread#1 Credits Decreased 3 {AggregateId: 1234-4321-5678-0987, Amount: 500.00} $ 490.00
DecreaseCredits thread#2 - still 3 because there is another event with version 3 no event published $ 490.00
DecreaseCredits thread#2 (retry) Credits Decrease Rejected 4 {AggregateId: 1234-4321-5678-0987, Amount: 500.00, Reason: "Insufficient Funds"} $ 490.00

Thread #2 tries to decrease the credits and fails. Tries again and command succeeds, now, producing an event explaining that there were invalid funds.

We have added a uniqueness constraint on the Events collection so we can get an exception upon the insertion of a duplicated version for an aggregate.

Command Handlers and Read Models

As we saw in the previous post, the read model is eventually consistent. It means that an event published will eventually be reflected in the read model. Usually, the read model update interval is very slow and is imperceptible to the users. Some developers can be tempted to access the read model to perform some additional validations and run business logic. This is not correct. There is always a risk of looking at stale data. The command handler should only look at the event stream of the aggregate it intends to affect. Deviations of this rule should be carefully discussed. Access to read models affected by other aggregates can be considered, but usually, the UI of the application should feed the command with data from diverse read models. The API that accepts requests from the UI can also do this kind of preparation work to issue correct commands, eventually, obtaining data from multiple read models.

An Event Sourced Repository

Our event-sourced repository is little bit unconventional:

using Microsoft.Azure.WebJobs;
using Newtonsoft.Json;
using ServerlessStreaming.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;


namespace ServerlessStreaming.Infrastructure
{
    public class EventSourcedRepository 
    {
        private readonly ICosmosDBEventStore<Event> eventStore;
        private IAsyncCollector<Event> azureOutputBinding;

        public EventSourcedRepository(ICosmosDBEventStore<Event> eventStore)
        {
            this.eventStore = eventStore;
        }


        public async Task SaveToAggregateStreamAsync(IEnumerable<Event> events)
        {
            if (azureOutputBinding == null)
            {
                await SaveToAggregateStreamUsingEventStore(events);
            }
            else
            {
                await SaveToAggregateStreamUsingAzureOutputBinding(events, azureOutputBinding);
            }
        }

        private async Task SaveToAggregateStreamUsingAzureOutputBinding(IEnumerable<Event> events, IAsyncCollector<Event> azureOutputBinding)
        {

            foreach (var item in events)
            {
                await azureOutputBinding.AddAsync(item);
            }
            await azureOutputBinding.FlushAsync();
        }

        private async Task SaveToAggregateStreamUsingEventStore(IEnumerable<Event> events)
        {
            // todo: optization & cache - not performant.
            foreach (var e in events)
            {
                var concreteEventType = AppDomain.CurrentDomain.GetAssemblies().Select(x => x.GetType(e.EventName)).Where(x => x != null).First();
                var concreteEvent = JsonConvert.DeserializeObject(e.ToString(), concreteEventType);
                await eventStore.AddItemRawAsync(concreteEvent, e.EventName);
            }
        }

        public async Task<IEnumerable<Event>> LoadAggregateStreamAsync(string aggregateId)
        {

            var eventStoreQuery = $@" SELECT c.id, c.EventName, c.EventDateTimeUtc, c.AggregateId, c.AggregateVersion, c.CorrelationId, c.Payload 
                                      FROM c
                                      WHERE c.AggregateId = '{aggregateId}' 
                                      ORDER BY c.AggregateVersion";

            // we get all the events still as Cosmos DB documents. 
            var events = await eventStore.GetItemsRawAsync(eventStoreQuery);

            var concreteEvents = new List<Event>();

            foreach (var ev in events)
            {
                var concreteEventTypeName = ev.GetPropertyValue<string>("EventName");
                var concreteEventType = AppDomain.CurrentDomain.GetAssemblies()
                                                               .Select(x => x.GetType(concreteEventTypeName))
                                                               .Where(x => x != null).First();

                if (JsonConvert.DeserializeObject(ev.ToString(), concreteEventType) is Event concreteEvent)
                {
                    concreteEvents.Add(concreteEvent);
                }
                else
                {
                    throw new InvalidOperationException("Could not process event or the event is unknown.");
                }
            }

            return concreteEvents;
        }

        public EventSourcedRepository SetWritePersistence(IAsyncCollector<Event> outputBinding)
        {
            this.azureOutputBinding = outputBinding;
            return this;
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

This repository works adding events to an aggregate's stream and loading the stream back when needed. The repository depends on a low-level service to interact with CosmosDB. We also can work adding items to either our CosmosDB service or a IAsyncCollector<Event>.

We must be aware that most of the domain logic is placed on the write side of the application. The changes in the global state are controlled by the aggregates, command handlers, and specialized repositories that work with the event store. We should not be tempted to query directly the event store to get data, for example, to satisfy UI requests. Since the read and write sides of the application are physically separated, as CQRS recommends, we use the read side to query the read models and then, build the UI. This repository is used only to manage the events in the event store.

Understanding the Aggregate Root

Let's go back to the command handling logic. What we do is briefly is:

  • Receive a command in an Azure Function that is triggered by the insertion of a new command in CosmosDB
  • Call the Command Handler
  • The Command Handler calls the Aggregate
  • The Aggregate works with the command payload and produces events
  • The Command Handler persists those events using the event-sourced repository.

We now need to understand the details of an Aggregate Root and how it effectively handles business logic. Let's start with the implementation of the base AggregateRoot class:

using Microsoft.Extensions.Logging;
using ServerlessStreaming.Common;
using System;
using System.Collections.Generic;
using System.Linq;

namespace ServerlessStreaming.DomainModel
{
    public abstract class AggregateRoot
    {
        protected AggregateRoot(IEnumerable<Event> eventStream, ILogger log)
        {
            this.Log = log;
            if (eventStream.Any() == false)
            {
                throw new Exception("Could not load aggregate from the event stream because it is empty.");
            }

            this.EventStream.AddRange(eventStream);
            LoadFromHistory(this.EventStream);
            LoadAggregateIdFromStream();

            Log.LogDebug("Aggregate State Restored.");
        }

        protected AggregateRoot(ILogger log) {
            this.Log = log;
        }

        private string aggregateId;

        public string AggregateId
        {
            get => aggregateId;

            protected set
            {
                if (value.IsNullOrEmpty())
                {
                    throw new ArgumentException("The Aggregate Id must not be empty.");
                }
                aggregateId = value;
            }
        }
        protected List<Event> EventStream { get; } = new List<Event>();

        public List<Event> Changes { get; } = new List<Event>();

        protected ILogger Log { get; }

        public abstract void Apply(Event evt);

        protected abstract void LoadFromHistory(IEnumerable<Event> eventStream);

        protected virtual void LoadAggregateIdFromStream()
        {
            this.AggregateId = EventStream.First().AggregateId;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The Aggregate Root is responsible to manage a group of related objects so they can work together as a unit to the external world. When we want to reconstruct the aggregate, we pass an event stream to the constructor so we can process each event ordered by the aggregate version. Sometimes we need to create a new aggregate, like a new user account, without any event stream. So we have two constructors available. The logging is required in our project.

During reconstruction, we process each event and update the state of the aggregate. This is done when LoadFromHistory is called:

 protected override void LoadFromHistory(IEnumerable<Event> eventStream)
 {
     foreach (var evt in eventStream)
     {
         _ = evt switch
         {
             CreditsDecreasedEvent e => Apply(e, false),
             CreditsIncreasedEvent e => Apply(e, false),

             CreditsDecreaseRejectedEvent e => Apply(e, false),
             CreditsIncreaseRejectedEvent e => Apply(e, false),

             CreditsDecreaseReversedEvent e => Apply(e, false),
             CreditsIncreaseReversedEvent e => Apply(e, false),
         };

         currentAggregateVersion = evt.AggregateVersion;
     }
 }

Enter fullscreen mode Exit fullscreen mode

We have to Apply each event received. Like this:

 private int Apply(CreditsDecreasedEvent e, bool isNew)
 {
     state.CurrentBalance -= (e.Payload as AccountTransactionModel).Amount;
     state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;

     if (isNew)
     {
         Changes.Add(e);
     }

     return e.AggregateVersion;
 }
Enter fullscreen mode Exit fullscreen mode

We are applying each preexistent event and updating the local state of the Aggregate. With an updated state, the aggregate is capable of doing correct decisions and generate correct events. Please note that the current aggregate version is loaded during the LoadAggregateIdFromStream call. The aggregate id is also obtained from any event on the stream. We use the first one. Please note that each event that changes the state must have a corresponding Apply method.

Please note that this reconstruction of the state is something that only occurs in the memory of the host running the code of the Aggregate Root. The calculated state is usually not saved anywhere unless we do a process called "snapshot", which is a photography of the state ate a given moment that is also saved in the stream. A snapshot can be a performance optimization when the event stream is very long. Please also note that we can work on the raw events processing in any way we want - we can do different projections of the events in the event store for different purposes. This is pure gold. You can read the stream and use it for different use cases, potentially unknown yet.

When the reconstruction of the aggregate is completed, the command handler is ready to dispatch commands to the aggregate:

 public void DecreaseCredits(DecreaseCreditsCommand command)
 {
     var currentBalance = state.CurrentBalance;

     // check if we have enough credits to be accept this command:
     var amountToDecrease = command.Payload.Amount;
     if (amountToDecrease <= state.CurrentBalance)
     {
         // if so, we can do the debit.
         ProceedWithCreditsDecrease(command, currentBalance, currentAggregateVersion);
     }
     else
     {
         // if not, we have to communicate the rejection using an Event
         RejectCreditsDecrease(command, currentBalance, currentAggregateVersion);
     }
 }


 private void ProceedWithCreditsDecrease(DecreaseCreditsCommand command, decimal currentBalance, int currentAggregateVersion)
 {
     var ev = new CreditsDecreasedEvent(
         command.CorrelationId,
         DateTime.UtcNow,
         command.AggregateId,
         currentAggregateVersion + 1,
         command.Payload)
     {
         PreviousBalance = currentBalance,
         CurrentBalance = currentBalance - command.Payload.Amount

     };

     Log.LogWarning("Credits Decrease will be accepted. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);

     Apply(ev);
 }

Enter fullscreen mode Exit fullscreen mode

The aggregate is simply exposing the DecreaseCredits method and deciding the command should be accepted or rejected. When the credits decrease is accepted, we create CreditsDecreasedEvent, incrementing the aggregate version and filling the necessary fields. We also log this to increase the visibility of the operations even during development time. The event can also contain custom fields. We tie the previous and current balance in the event. This is useful to troubleshoot the balance variation easily. And probably, this makes it more difficult to tamper the balance.

We have to communicate the existence of this new event to the world. We have to put all the new events on the Changes list. The Apply method again is called:

 public override void Apply(Event evt)
 {
     var aggregateVersion = evt switch
     {
         CreditsDecreasedEvent e => Apply(e, true),
         CreditsIncreasedEvent e => Apply(e, true),

         CreditsDecreaseRejectedEvent e => Apply(e, true),
         CreditsIncreaseRejectedEvent e => Apply(e, true),

         CreditsDecreaseReversedEvent e => Apply(e, true),
         CreditsIncreaseReversedEvent e => Apply(e, true),
     };

     currentAggregateVersion = aggregateVersion;
 }

 private int Apply(CreditsDecreasedEvent e, bool isNew)
 {
     state.CurrentBalance -= (e.Payload as AccountTransactionModel).Amount;
     state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;

     if (isNew)
     {
         Changes.Add(e);
     }

     return e.AggregateVersion;

 }

Enter fullscreen mode Exit fullscreen mode

We apply the new event and update again the local state, as other calls to this aggregate can occur. The Changes collection is used by the command handler logic to send the events to the repository:

   eventSourcedRepository.SaveToAggregateStreamAsync(prepaidCardAccountAggregate.Changes);
Enter fullscreen mode Exit fullscreen mode

The state of the aggregate can be any object. The PrepaidCardAccountAggregate uses the entity PrepaidCardAccount to store the current state of the aggregate.

 private readonly PrepaidCardAccount state = new PrepaidCardAccount();
 private int currentAggregateVersion = -1;

 public PrepaidCardAccountAggregate(IEnumerable<Event> eventStream, ILogger log)
  : base(eventStream, log) { }

// rest of the PrepaidCardAccountAggregate ommitted
Enter fullscreen mode Exit fullscreen mode

An entity is just a simple class or a class that can collaborate with other entities and value objects. The code for PrepaidCardAccount entity is as follows:


using Newtonsoft.Json;
using System;

namespace ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate
{
    public class PrepaidCardAccount
    {
        [JsonProperty(PropertyName = "id")]
        public string Id { get; set; } 
        public string CardHolderId { get; set; }
        public decimal CurrentBalance { get; set; }
        public DateTime AccountCreatedAtUtc { get; set; }
        public DateTime BalanceUpdatedAtUtc { get; set; }
        public int AggregateVersion { get; set; }
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we can see the whole code for PrepaidCardAccountAggregate. Please forgive us, but we felt that for C# and .NET there are only a few complete examples of how to implement a realistic aggregate root.

using Microsoft.Extensions.Logging;
using ServerlessStreaming.Common;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate.Commands;
using ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ServerlessStreaming.DomainModel.PrepaidCardAccountAggregate
{
    public class PrepaidCardAccountAggregate : AggregateRoot
    {

        private readonly PrepaidCardAccount state = new PrepaidCardAccount();
        private int currentAggregateVersion = -1;


        public PrepaidCardAccountAggregate(IEnumerable<Event> eventStream, ILogger log)
         : base(eventStream, log) { }


        public void DecreaseCredits(DecreaseCreditsCommand command)
        {
            var currentBalance = state.CurrentBalance;

            // check if we have enough credits to be accept this command:
            var amountToDecrease = command.Payload.Amount;
            if (amountToDecrease <= state.CurrentBalance)
            {
                // if so, we can do the debit.
                ProceedWithCreditsDecrease(command, currentBalance, currentAggregateVersion);
            }
            else
            {
                // if not, we have to communicate the rejection using an Event
                RejectCreditsDecrease(command, currentBalance, currentAggregateVersion);
            }
        }


        public void IncreaseCredits(IncreaseCreditsCommand command)
        {

            var currentBalance = state.CurrentBalance;

            var ev = new CreditsIncreasedEvent(command.CorrelationId, DateTime.UtcNow, command.AggregateId, currentAggregateVersion + 1, command.Payload)
            {
                PreviousBalance = currentBalance,
                CurrentBalance = currentBalance + command.Payload.Amount
            };

            Log.LogWarning("Credits Increase will be accepted. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);

            Apply(ev);

        }

        public void ReverseCreditsChange(ReverseCreditsChangeCommand command)
        {
            var eventToRevert = EventStream.Single(x => (x.Payload as IPayload).Id == command.Payload.PayloadId);

            if (eventToRevert.EventName.Contains(nameof(CreditsIncreasedEvent)))
            {
                ReverseCreditsIncrease(command, eventToRevert as CreditsIncreasedEvent);
            }
            else
            {
                ReverseCreditsDecrease(command, eventToRevert as CreditsDecreasedEvent);
            }

        }

        private void ReverseCreditsIncrease(ReverseCreditsChangeCommand command, CreditsIncreasedEvent eventToRevert)
        {
            var currentBalance = state.CurrentBalance;
            var amountToRevert = (eventToRevert.Payload as AccountTransactionModel).Amount;

            var evt = new CreditsIncreaseReversedEvent(command.CorrelationId,
                                                       DateTime.UtcNow,
                                                       command.AggregateId,
                                                       currentAggregateVersion + 1,
                                                       command.Payload)
            {
                PreviousBalance = currentBalance,
                AmountToRevert = amountToRevert,
                CurrentBalance = currentBalance - amountToRevert
            };

            Log.LogWarning("Credits Increase will be reversed. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);

            Apply(evt);
        }


        private void ReverseCreditsDecrease(ReverseCreditsChangeCommand command, CreditsDecreasedEvent eventToRevert)
        {
            var currentBalance = state.CurrentBalance;
            var amountToRevert = (eventToRevert.Payload as AccountTransactionModel).Amount;

            var evt = new CreditsDecreaseReversedEvent(command.CorrelationId,
                                                       DateTime.UtcNow,
                                                       command.AggregateId,
                                                       currentAggregateVersion + 1,
                                                       command.Payload)
            {
                PreviousBalance = currentBalance,
                AmountToRevert = amountToRevert,
                CurrentBalance = currentBalance + amountToRevert
            };

            Log.LogWarning("Credits Decrease will be reversed. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);

            Apply(evt);
        }

        private void ProceedWithCreditsDecrease(DecreaseCreditsCommand command, decimal currentBalance, int currentAggregateVersion)
        {
            var ev = new CreditsDecreasedEvent(
                command.CorrelationId,
                DateTime.UtcNow,
                command.AggregateId,
                currentAggregateVersion + 1,
                command.Payload)
            {
                PreviousBalance = currentBalance,
                CurrentBalance = currentBalance - command.Payload.Amount

            };

            Log.LogWarning("Credits Decrease will be accepted. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);


            Apply(ev);
        }

        private void RejectCreditsDecrease(DecreaseCreditsCommand command, decimal currentBalance, int currentAggregateVersion)
        {
            var ev = new CreditsDecreaseRejectedEvent(
                command.CorrelationId,
                DateTime.UtcNow,
                command.AggregateId,
                currentAggregateVersion + 1,
                command.Payload)
            {
                PreviousBalance = currentBalance,
                AmountRequested = command.Payload.Amount,
                Reason = "Insufficient Credits. Requested amount: " + command.Payload.Amount
            };

            Log.LogWarning("Credits Decrease will be rejected due to insufficiente funds. CorrelationId: {CorrelationId}, AggregateId: {AggregateId}", command.CorrelationId, AggregateId);

            Apply(ev);
        }


        public override void Apply(Event evt)
        {
            var aggregateVersion = evt switch
            {
                CreditsDecreasedEvent e => Apply(e, true),
                CreditsIncreasedEvent e => Apply(e, true),

                CreditsDecreaseRejectedEvent e => Apply(e, true),
                CreditsIncreaseRejectedEvent e => Apply(e, true),

                CreditsDecreaseReversedEvent e => Apply(e, true),
                CreditsIncreaseReversedEvent e => Apply(e, true),
            };

            currentAggregateVersion = aggregateVersion;
        }

        protected override void LoadFromHistory(IEnumerable<Event> eventStream)
        {
            foreach (var evt in eventStream)
            {
                _ = evt switch
                {
                    CreditsDecreasedEvent e => Apply(e, false),
                    CreditsIncreasedEvent e => Apply(e, false),

                    CreditsDecreaseRejectedEvent e => Apply(e, false),
                    CreditsIncreaseRejectedEvent e => Apply(e, false),

                    CreditsDecreaseReversedEvent e => Apply(e, false),
                    CreditsIncreaseReversedEvent e => Apply(e, false),
                };

                currentAggregateVersion = evt.AggregateVersion;
            }
        }



        private int Apply(CreditsDecreasedEvent e, bool isNew)
        {
            state.CurrentBalance -= (e.Payload as AccountTransactionModel).Amount;
            state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;

            if (isNew)
            {
                Changes.Add(e);
            }

            return e.AggregateVersion;

        }

        private int Apply(CreditsIncreasedEvent e, bool isNew)
        {
            state.CurrentBalance += (e.Payload as AccountTransactionModel).Amount;
            state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;

            if (isNew)
            {
                Changes.Add(e);
            }

            return e.AggregateVersion;
        }

        private int Apply(CreditsDecreaseRejectedEvent e, bool isNew)
        {
            // no state changes right now. But this can change in the future.

            if (isNew)
            {
                Changes.Add(e);
            }

            return e.AggregateVersion;
        }

        private int Apply(CreditsIncreaseRejectedEvent e, bool isNew)
        {
            // no state changes right now. But this can change in the future.
            if (isNew)
            {
                Changes.Add(e);
            }

            return e.AggregateVersion;
        }

        private int Apply(CreditsDecreaseReversedEvent e, bool isNew) 
        {

            state.CurrentBalance += e.AmountToRevert;
            state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;


            if (isNew)
            {
                Changes.Add(e);
            }

            return e.AggregateVersion;
        }

        private int Apply(CreditsIncreaseReversedEvent e, bool isNew)
        {
            state.CurrentBalance -= e.AmountToRevert;
            state.BalanceUpdatedAtUtc = e.EventDateTimeUtc;

            if (isNew)
            {
                Changes.Add(e);
            }

            return e.AggregateVersion;
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

The advantages of this design, despite the need to change the mindset, is a more testable aggregate root and the clarity of what the system can do, given certain inputs. This kind of code is very testable and the business logic is not coupled to the host. The use of either Azure Functions or other .NET host is irrelevant when we centralize the business rules in a cohesive domain model. It is possible to simulate many situations and validate if the domain invariants are still preserved.

Conclusion of Part 2

Wow! We were able to describe how the commands are processed by a simple cloud structure using serverless components: Azure Functions and CosmosDB. This article is an effort to explain concretely many concepts that are spread around many blogs and documentation on the web, giving some north for developers that are planning to create applications on Azure in a more structured way still keeping the joy of serverless technologies in terms of flexibility and costs. The use of DDD and CQRS/ES concepts are not restricted to monoliths or services running on-premises or on Kubernetes.

Having the right mindset it is viable to CQRS, DDD, Event Sourcing in a reactive and serverless environment. Azure is a great place

In part 3 we will describe how events are effectively published and handled.

Top comments (8)

Collapse
 
der_gopher profile image
Alex Pliutau

Great write-up. We also recently published an article on how to bridge Backend and Data Engineering teams using Event Driven Architecture - packagemain.tech/p/bridging-backen...

Collapse
 
vmamore profile image
Vinícius Mamoré

Amazing Mario, thanks for the article, really good stuff! Is there a github repo to see the project? Thanks!

Collapse
 
woodpk profile image
woodpk

Mario,
I'm not sure if I am reading your article correctly. It looks like all of your command handlers would be triggered for every change feed event even when they did not have processing logic for that given command. You write about having logic to basically ignore those commands so that they are correctly processed by the correct command handler.

Is there a more efficient way to do this? Is there some way to filter the change feed so that the function is not triggered unless the correct set of subscribed commands came through? Or, instead of using cosmos DB, could the API function send the command directly to Event Grid ?

Collapse
 
colincmcc profile image
Colin McCullough • Edited

There isn't a current way to filter documents with the Change Feed trigger for Azure Functions. I'll try to find the GitHub issue, but this looks like the tracking request is here:
feedback.azure.com/forums/263030-a...

I've done this manually by implementing an interface (e.g. ICommandProcessor) with 2 methods: Handle & CanHandle. The command handler function, that's processing all commands, injects all handlers (IEnumerable ICommandProcessor ) and filters processing of each command based off the CanHandle method. I've implemented CanHandle both with reflection and direct lists of types. Reflection strategy grabs all parameters of methods named HandleAsync which implement ICommand to check against. Though, I don't like the idea of using reflection in Azure functions due to cold starts.

EDIT: Was on mobile previously, adding context
Here's an example (incomplete and probably has issues) of the command handler strategy based on Mario's post:

    public interface ICommandProcessor
    {
        Task HandleAsync(ICommand command, IAsyncCollector<Event> azureOutputBinding = null);
        bool CanHandle(ICommand command);
    }
Enter fullscreen mode Exit fullscreen mode
   public class ExampleCommandProcessor : ICommandProcessor
   {
      private readonly IEventStoreLogic _eventStore;

      private static readonly IEnumerable<Type> HandledCommands = typeof(ExampleCommandProcessor)
         .GetMethods(BindingFlags.Instance)
         .SelectMany(x => x.GetParameters()
            .Where(p => typeof(ICommand).IsAssignableFrom(p.ParameterType))
            .Select(t => t.ParameterType));

      public ExampleCommandProcessor(IEventStoreLogic eventStore)
      {
         _eventStore = eventStore;
      }

      public bool CanHandle(ICommand command)
      {
         return HandledCommands.Contains(command.GetType());
      }

      public Task HandleAsync(ICommand command, IAsyncCollector<Event> azureOutputBinding = null)
      {
         return HandleCommand((dynamic)command, azureOutputBinding);
      }

      private async Task HandleCommand(CreateItem command,
         IAsyncCollector<Event> azureOutputBinding = null)
      {
         if (azureOutputBinding != null) _eventStore.SetWritePersistence(azureOutputBinding);

         var itemStream = await _eventStore.LoadAggregateStreamAsync(command.AggregateId);

         if (itemStream .Any()) throw new SomeRandomException();

         var newItem = Item.AddOrSomething(command); // static in this example

         await UpdateStream(newItem);
      }
... continue multiple handlers with concrete commands
Enter fullscreen mode Exit fullscreen mode

If you need more guidance on it, I'd look at in-process messaging platform strategies, like Mediatr or MassTransit's Mediator. They perform a lot of black box magic behind the scenes, that have a similar concept.

Collapse
 
magazin81 profile image
magazin81

That is really great, Mario. Waiting for the part 3!

Collapse
 
smdsaleem profile image
smdsaleem

Agreed. Eagerly looking forward to Part-3, Mario.

Collapse
 
cuanjooste profile image
Cuan Jooste

Hi Mario, this has been a great read and definitely helpful. Any updates on Part 3, or do you maybe have a repo with all code that you can share? Thanks again

Collapse
 
woodpk profile image
woodpk

Any word on a GitHub repo? Very interested in reviewing!