DEV Community

Cover image for Choreography Saga Pattern Implementation
Thilanka
Thilanka

Posted on

Choreography Saga Pattern Implementation

Saga pattern can be applied to perform distributed transactions spanning across multiple microservices by maintaining an application level locking mechanism.

In a traditional monolith, a single database performs all the application wide transactions so it can use table/row level locking to enforce ACID compliance. Microservices based systems usually have a database per service so that this ACID compliance could not be achieved in the database layer.

Saga pattern can be implemented in two ways:

1. Choreography/Event based Saga

Much like how ballet dancers perform set of precise steps in coordination with each other to the sound of music, each microservice participate in the transaction emits events that other microservices can capture and act accordingly. For eg:

  • A user submits an order through a website
  • NEW_ORDER event will be created.
  • Accounts microservice will deduct the order amount from the customer account and emit PAYEMENT_RECEIVED event.
  • Inventory microservice can act on the event and adjust the inventory.
  • When the INVENTORY_RESERVED emitted, the delivery microservice can schedule the delivery
  • Finally order created website will display the delivery details to the user.

But what happens when the inventory microservice rejects or fails the request to reserve a particular item simply because it is not available? Since the account microservice has already deducted the amount from the user's account and it's too late for a database level rollback, a compensating transaction has to be performed in the Accounts microservice. (repayment of the deducted amount) Again such error/failure scenarios are communicated to others by the same event emitting pattern.

Choreography best fits in event-driven communication

2. Orchestrator based Saga

In a musical orchestra, a conductor is a musical professional who directs the performance with movements of hands. Conductor sets the tempo and unifies the orchestra by providing instructions to the fellow musicians.

Orchestrator service in the Saga pattern acts as the conductor. The musicians who act on the instructions/commands received by the conductor are the microservices participate in the distributed transaction. Unlike choreography pattern where microservices act upon other microservices' events, Orchestrator based Sagas require a separate module/service known as the Saga Orchestrator to control the flow of the transaction.

Once a request has been posted to initiate a distributed transaction to the Saga orchestrator, it commands each microservice in a predefined sequence to do some task related to the transaction. Microservices perform their tasks and publish results typically into a single reply channel. Based on the events gathered in the reply channel, the orchestrator can decide whether to continue the transaction or execute compensating transactions on the corresponding microservices.

Let's translate the above ordering system to Orchestrator pattern:

  • A user submits an order through a website and sends a request/command to initiate Order Saga Orchestrator(OSO)
  • OSO commands Accounts microservice to deduct the order amount from the customer account
  • If the account has enough funds, Accounts microservice emits PAYEMENT_RECEIVED event into the reply channel.
  • OSO commands Inventory microservice to reserve inventory.
  • If the inventory is available it will emit INVENTORY_RESERVED event into the reply channel.
  • If however OSO receives an INVENTORY_REJECTED, it will command the Accounts microservice to execute compensating transaction (repayment of the deducted amount)

Orchestrator best fits in command-driven communication

Implementing Choreography Saga

We will be implementing the above discussed scenario with a Spring Boot project. RabbitMQ's Topic Exchange will be utilized for routing messages for multiple queues via a routing key.

A quick refresher on RabbitMQ

RabbitMQ is an opensource message broker which allows storing messages in queues. We will be using three queues and an Exchange. Message producers send messages to exchanges and exchanges route messages to queues so that they can be consumed by consumers.

How does the exchange knows the destination queue? Well each message has a property called a routing key embedded in the header. When a queue is created, it will be bound to the exchange by the routing key so that exchange knows where a given message should end up.

A picture is worth a thousand words:

Choreography Saga

An exchange is known as a Topic Exchange when it's bound to many queues using a routing key. This relationship is depicted in the highlighted yellow boxes in the diagram.

Project setup overview

We will be creating three Spring Boot projects for each microservice:

  • Order Service
    • Creates a new order
    • Listens to status updates of the order
  • Accounts Service
    • Checks if the account has enough funds
    • Debits account on success
    • Credits account on compensating transaction request
    • Publishes order status failure
  • Inventory Service
    • Checks if inventory item requested is available
    • Adjust inventory on success
    • Requests account service for compensating transaction
    • Publishes status failure

Dependencies



<!-- All microservices' pom.xml -->
<dependency>             
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

...

<dependencyManagement>
    <dependencies>
        <dependency>         
            <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    </dependencies>
</dependencyManagement>


Enter fullscreen mode Exit fullscreen mode

Configuring RabbitMQ

RabbitMQ Beans definition

All Microservices declare queues and bindings so that queues and exchanges readily available regardless of the microservices' start order. Check code comments to get a better understanding.



@Configuration
public class RabbitBeanConfig {
    public static final String SINGLE_ORDER_TOPIC_EXCH_NAME = "single-orders-exchange";
    public static final String SINGLE_ORDER_QUEUE_NAME = "single-order-queue";
    public static final String SINGLE_ORDER_ROUTING_KEY = "orders.create.*";

    @Bean
    Queue singleOrderqueue() {
        return new Queue(SINGLE_ORDER_QUEUE_NAME, false); // New orders received at our Restful endpoint will be placed on this queue to be consumed by Accounts microservice
    }

    @Bean
    Queue orderUpdateQueue() {
        return new Queue("order-update", false); // Stores the final order outcome - Order Manager microservice will consume this
    }

    @Bean
    Queue paymentQueue() {
        return new Queue("payment", false); // Successful order payments end up here to be consumed by Inventory service
    }

    @Bean
    TopicExchange singleOrderExchange() {
        return new TopicExchange(SINGLE_ORDER_TOPIC_EXCH_NAME); //Single Topic Exchange to route messages to queues using a Routing key
    }

    @Bean
    Binding singleOrderBinding(Queue singleOrderqueue, TopicExchange singleOrderExchange) {
        return BindingBuilder.bind(singleOrderqueue).to(singleOrderExchange).with(SINGLE_ORDER_ROUTING_KEY); // Bind each queue to the Exchange
    }

    @Bean
    Binding orderUpdateBinding(Queue orderUpdateQueue, TopicExchange singleOrderExchange) {
        return BindingBuilder.bind(orderUpdateQueue).to(singleOrderExchange).with("orders.update");
    }

    @Bean
    Binding inventoryBinding(Queue paymentQueue, TopicExchange singleOrderExchange) {
        return BindingBuilder.bind(paymentQueue).to(singleOrderExchange).with("orders.pay");
    }

    @Bean
    MessageConverter messageConverter() { // Encode/Decode Java objects to/from JSON 
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory, final MessageConverter messageConverter)
    {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setExchange(SINGLE_ORDER_TOPIC_EXCH_NAME); //Otherwise default rabbit queue
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter); // Set the message converter to the template
        return rabbitTemplate;
    }

}


Enter fullscreen mode Exit fullscreen mode

Queue listeners

Order Manager needs to listen to order-update queue in which contains the final status of the transaction.



@Component
public class OrderUpdateListener {

    @Autowired
    Queue orderUpdateQueue;

    @RabbitListener(queues = "#{orderUpdateQueue.getName()}")
    public void onOrderUpdate(SingleOrder order) {
        System.out.println("ORDER_SERVICE|"+order.id+"|ORDER STATUS UPDATED: "+order);
    }
}


Enter fullscreen mode Exit fullscreen mode

SingleOrder is a simple DTO object representing Saga transaction details.



public class SingleOrder implements Serializable {

    public int id;
    public String item;
    public int qty;
    public int cost;
    private int costFactor = 10;
    public boolean isFailed;
    public String failureReason;

    public SingleOrder() { }

    public SingleOrder(int id, String item, int qty) {
        this.id = id;
        this.item = item;
        this.qty = qty;
        this.cost = qty * costFactor;
    }
}


Enter fullscreen mode Exit fullscreen mode

Accounts service and the Inventory service has listeners configured as well. When a new Order is created Accounts service should check if there are enough funds to do the transaction. It should also support compensating transactions when Inventory service rejects the transaction.



@Component
public class OrderCreatedListener {

    @Autowired
    Queue createOrderQueue;

    @Autowired
    RabbitTemplate rabbit;

    @RabbitListener(queues = "#{createOrderQueue.getName()}")
    public void onOrderCreated(SingleOrder order, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
        System.out.println("ACCOUNT_SERVICE|"+order.id+"|RESPONDING TO ORDER CREATED  key: "+key);
        String[] tokens = key.split("\\.");
        if (tokens.length > 1 && tokens[tokens.length - 1].equals("failed")) {
            //Compensating transaction
            System.out.println("ACCOUNT_SERVICE|"+order.id+"|COMPENSATING AMOUNT = +"+order.cost);
        } else {
            if (order.cost > 100) {
                System.out.println("ACCOUNT_SERVICE|"+order.id+"|NOT ENOUGH FUNDS");
                order.isFailed = true;
                order.failureReason = "NOT ENOUGH FUNDS: " + order.cost;
                rabbit.convertAndSend("orders.update", order);
            } else {
                System.out.println("ACCOUNT_SERVICE|"+order.id+"|DEBITED AMOUNT = -"+order.cost);
                rabbit.convertAndSend("orders.pay", order);
            }
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

Similarly the Inventory service listens to the verified payments queue for accepting/rejecting the transaction. If the item is not available it should send a message to order-update queue so that the Order Manager can show rejected order details to the customer. It should also notify Account Manager to run a compensating transaction.



@Component
public class PaymentReceivedListener {

    @Autowired
    Queue paymentQueue;

    @Autowired
    RabbitTemplate template;

    @RabbitListener(queues = "#{paymentQueue.getName()}")
    public void reserveInventory(SingleOrder order) {
        System.out.println("INVENTORY_SERVICE|"+order.id+"|VERIFIED PAYMENT");
        if (order.qty > 5) {
            System.out.println("INVENTORY_SERVICE|"+order.id+"|STOCKS NOT AVAILABLE");
            order.isFailed = true;
            order.failureReason = "STOCKS NOT AVAILABLE: " + order.qty;
            template.convertAndSend("orders.create.failed", order);
            template.convertAndSend("orders.update", order);
        } else {
            System.out.println("INVENTORY_SERVICE|"+order.id+"|STOCKS RESERVED: QTY" + order.qty);
            template.convertAndSend("orders.update", order);
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

We are almost done. Our simple order creation restful web endpoint would look like as follows.



@RestController
@RequestMapping("/single-order")
public class SingleOrderResource {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping()
    public SingleOrder createSingleOrder(@RequestParam String item, @RequestParam int qty) {
        int id  = ThreadLocalRandom.current().nextInt(100, 500 + 1);
        SingleOrder order = new SingleOrder(id, item, qty);
        System.out.println("ORDER_SERVICE|NEW ORDER:"+order);
        rabbitTemplate.convertAndSend("orders.create.new", order);
        return order;
    }
}


Enter fullscreen mode Exit fullscreen mode

That's pretty much how you would implement a choreography based Saga :) If you have any questions please do let me know in the comment's section.

Top comments (0)