This post will guide Magento 2 developers through implementing a message queue retry mechanism using the RabbitMQ dead-letter exchange feature.
To assist us in this endeavor, we will leverage the Message Queue Retry module by run-as-root.
Table of Contents
- What is RabbitMQ?
- What is a dead-letter exchange?
- Examples of transaction loss scenarios
- Practical case scenario
- Crafting an effective retry strategy for queue messages
- Practical case scenario with queue retry
What is RabbitMQ?
RabbitMQ is a widely embraced open-source Message Queue software, facilitating seamless interaction and scalability across numerous applications. As a proficient messaging broker, RabbitMQ furnishes a unified platform catering to Asynchronous messaging needs. Within this platform, applications gain the capability to transmit and receive messages securely, with RabbitMQ diligently ensuring the sanctuary of your messages until they are consumed.
Since version 2.1.0, Magento 2 has offered native RabbitMQ support, enabling effortless queue configuration through XML settings. For further details, you can explore additional information here
What is a dead-letter exchange?
This feature functions as a safety net within the messaging architecture, intended to manage messages that encounter unsuccessful processing by the consumer.
The dead-letter exchange operates as a mechanism for directing these problematic messages to a designated queue referred to as a dead-letter queue. This facilitates developers in isolating and inspecting troublesome messages without interrupting the primary processing flow, enabling them to analyze these messages, identify the underlying reasons for processing failures, and implement essential adjustments to ensure smooth operation.
It's particularly recommended for scenarios where upholding message integrity, diagnosing processing errors, and averting message loss is paramount.
Examples of transaction loss scenarios:
-
Exporting the customers' data to a CRM
- Possible issue: The CRM credentials were modified, but the changes were not reflected in the Magento integration settings.
- Consequence: This mismatch in credentials will result in failed API calls due to the usage of incorrect or invalid authentication information.
-
Exporting the orders to an ERP
- Possible issue: During a marketing campaign, the store's throughput surged by 200%, leading to a substantial rise in incoming orders; however, the ERP system could only accommodate its standard order import capacity.
- Consequence: Only a fraction of the increased order influx will be successfully exported to the ERP system, potentially causing delays and incomplete order processing.
-
Scheduled price uplift
Imagine having the capability to schedule a comprehensive price uplift by 10% for the entire catalog of your store.
- Possible issue: Deadlock encountered while saving product and price information.
- Consequence: The product prices may not be accurate.
Practical case scenario
A typical case in e-commerce is exporting orders to an ERP or OMS. We will implement an asynchronous order export architecture, where for each placed order, a message is created with the order id and sent to a queue. So that a consumer can later read this message and export it to the external system.
To simplify this example, let's declare the topic's name, exchange, queue, and bindings with the same name: erp_order_export
.
To handle this messaging, we'll utilize RabbitMQ as the broker.
You can find the complete code for the module on this repository.
We have to create four files to declare our queue.
etc/communication.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="erp_order_export" request="CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface"/>
</config>
etc/queue_consumer.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="erp_order_export" queue="erp_order_export" connection="amqp"
handler="CristianoPacheco\OrderExport\Queue\Consumer\ErpOrderExportConsumer::execute"
/>
</config>
etc/queue_publisher.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="erp_order_export">
<connection name="amqp" exchange="erp_order_export"/>
</publisher>
</config>
etc/queue_topology.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="erp_order_export" connection="amqp" type="topic">
<binding id="erp_order_export" topic="erp_order_export" destinationType="queue" destination="erp_order_export"/>
</exchange>
</config>
Run the command below in order to Magento send the command to create the exchange, queue and bindings on RabbitMQ:
bin/magento setup:upgrade
It will look like this on RabbitMQ admin:
Visually, our configs will generate something like this:
Note: It would make more sense to configure a direct exchange. However, up to the current version of Magento (2.4.6), only topic exchanges are supported.
Now that the queue is set up, we must send the order ID to the erp_order_export
queue every time a successful order is placed.
We can easily do this by listening to the event sales_model_service_quote_submit_success
via an observer:
etc/events.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Event/etc/events.xsd">
<event name="sales_model_service_quote_submit_success">
<observer name="erp_export_order" instance="CristianoPacheco\OrderExport\Observer\SendOrderToQueueObserver"/>
</event>
</config>
I won't post here the Observer code or the entire implementation. The complete working code is in the repository. If you want to simulate the scenarios, please install the module.
Our consumer will use a service to perform our "fake" order export. To simulate an error, our service will always throw an exception:
<?php
declare(strict_types=1);
namespace CristianoPacheco\OrderExport\Service;
use CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface;
use Exception;
class ExportOrderToErpService
{
public function execute(OrderExportDataInterface $orderExportData): void
{
$orderId = $orderExportData->getOrderId();
throw new Exception("The order $orderId id is invalid");
}
}
With the current configuration, orders will enter the erp_order_export
queue and can be "exported" during the execution of our consumer.
To run our consumer and export only one order per execution, run the following command:
bin/magento queue:consumer:start erp_order_export --single-thread --max-messages=1
What will happen at the end of the execution?
The service will throw an exception, and it will be handled here:
vendor/magento/framework-message-queue/Consumer.php:261
As you can see, the message will be rejected, and the requeue flag will have false
as the value. In other words, the message will be removed from the queue, and the order will not be exported to the ERP.
Well done! We managed to simulate a transaction loss.
In day-to-day scenarios, probably (I hope so), exceptions are handled at the service or the consumer's execution entry point, and the message would be sent back to the original queue or even taken manually in cases of error, such as sending an alert, or persisting the message in a database, etc.
What if there was a way to automatically identify without manual work when there are errors in message processing, allowing the possibility to optimize and standardize this process?
Yes, there is. We can delegate this hard work to RabbitMQ via a dead-letter exchange.
Crafting an Effective Retry Strategy for Queue Messages
So, how do we ensure that our messages reach their destination in the face of these intermittent failures? This is where the concept of a retry strategy comes into play. A retry strategy outlines how we handle failed operations by making subsequent attempts at sending the same message. When exporting an order to ERP, adopting a well-defined retry strategy can significantly enhance the reliability of our application.
For this specific case, three attempts are reasonable.
So let's define our requirements, it should be possible to:
- Reprocess the messages
- Set processing retry limit
- Set an interval in minutes for each retry attempt
- Find out what error was generated during message processing
- See what the message payload is
- If the limit of retries is reached, we must provide a way to manage message processing manually
- Resend the message to the queue
- Discard the message
Steps to achieve our goal:
We will add a second queue (
erp_order_export_delay
) in RabbitMQ to store the message for a specific lifetime. This queue will not have a consumer. It will only serve to "delay" processing the message in the source queue (erp_order_export
).When the time to live is reached, RabbitMQ will automatically send the message to the source queue (
erp_order_export
).-
If the retry limit is reached, the message will be stored in a database table for manual management.
- Maybe the ERP server is down, or some order data is incorrect, so we have to reprocess the messages later.
Steps 1 and 2 can be configured in the queue declaration, and RabbitMQ natively supports it.
Step 3 is a capability provided by the run as root Message Queue Retry module.
Practical case scenario with queue retry
Firstly we need to install the message queue retry module and enable the behavior to persist the messages on the database after the retry limit is reached.
No we have to configure our dead-letter exchange. The complete implementation of code below can be found here.
We'll use the exp_order_export_delay
as the name for the topic, exchange, queue and bindings.
Declaring the topic:
etc/communication.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="erp_order_export" request="CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface"/>
<topic name="erp_order_export_delay" request="CristianoPacheco\OrderExport\Api\Data\OrderExportDataInterface"/>
</config>
Declaring the retry limit:
etc/queue_retry.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:RunAsRoot:module:RunAsRoot_MessageQueueRetry:/etc/queue_retry.xsd">
<topic name="erp_order_export" retryLimit="3"/>
</config>
Declaring the dead-letter exchange:
etc/queue_topology.xml
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="erp_order_export" connection="amqp" type="topic">
<binding id="erp_order_export" topic="erp_order_export" destinationType="queue" destination="erp_order_export">
<arguments>
<argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export_delay</argument>
<argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export_delay</argument>
</arguments>
</binding>
</exchange>
<exchange name="erp_order_export_delay" connection="amqp" type="topic">
<binding id="erp_order_export_delay" topic="erp_order_export_delay" destinationType="queue"
destination="erp_order_export_delay">
<arguments>
<argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export</argument>
<argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export</argument>
<argument name="x-message-ttl" xsi:type="number">60000</argument><!-- 1 minute in milliseconds -->
</arguments>
</binding>
</exchange>
</config>
Let's take a closer look at each change in the file above.
1- the arguments x-dead-letter-exchange
and x-dead-letter-routing-key
will tell RabbitMQ to automatically send the messages to the erp_order_export_delay
exchange in case the message is negatively acknowledged (rejected or nack with requeue parameter set to false).
...
<arguments>
<argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export_delay</argument>
<argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export_delay</argument>
</arguments>
...
2- We declared a the erp_order_export_delay
exchange and bound it to the erp_order_export_delay
queue
...
<exchange name="erp_order_export_delay" connection="amqp" type="topic">
<binding id="erp_order_export_delay" topic="erp_order_export_delay" destinationType="queue" destination="erp_order_export_delay">
...
3- The arguments x-dead-letter-exchange
and x-dead-letter-routing-key
defines the return path for the message and the x-message-ttl
has an integer number in milliseconds that defines when the message will return to the exchanged defined in the previous two arguments.
Remembering that this queue does not have a consumer.
...
<arguments>
<argument name="x-dead-letter-exchange" xsi:type="string">erp_order_export</argument>
<argument name="x-dead-letter-routing-key" xsi:type="string">erp_order_export</argument>
<argument name="x-message-ttl" xsi:type="number">60000</argument><!-- 1 minute in milliseconds -->
</arguments>
...
Visually, our new configs will generate something like this:
Note: We always send the messages to an exchange and the exchange will forward the messages to one or more queues. It is the same with dead-letter exchanges.
Now we have to manually delete the erp_order_export
exchange and queue in RabbitMQ, because it won't be updated. You can do it in the RabbitMQ admin panel or via CLI.
After you did it, you have to run the command below to reflect our changes in RabbitMQ:
bin/magento setup:upgrade
Now if we navigate again in the RabbitMQ panel we should see:
Exchanges:
If we open our exchanges an expand the bindings, is it possible to see the arguments we have declared before:
Queues:
Did you notice that queues now have a few more features?
A homework for you, hover the mouse over each feature and wait a few seconds, a caption will be displayed informing what it is about. Also access each queue and expand the bindings section, see the values contained there.
Now that we finished our RabbitMQ configuration, let's place a new order and start the erp_order_export
consumer to see what will occur.
After place an order a new massage is in the erp_order_export
queue:
Now let's start our consumer:
bin/magento queue:consumer:start erp_order_export --single-thread --max-messages=1
Now if we take a look at the queues we'll see:
During message consumption, our service threw an exception, simulating a problem and the message was rejected
. RabbitMQ sent the message to the erp_order_expor_delay
queue.
After one minute the message was automatically sent to the erp_order_export
queue.
Let's inspect the message headers:
The x-death
header has interesting information about the retries.
Relevant attribute for us to analyse are:
-
count
- it is the retry number -
reason
- In our case, can be rejected or expired -
time
- The timestamp of the occurrence -
exchange
,queue
,routing-keys
- Where the message was at the time of occurrence
So, what that image says is:
Reading from bottom to top
First occurrence
The message was in the erp_order_export
queue and while processing it, it was rejected on Saturday, August 26, 2023 9:12:33 PM GMT-03:00
Second occurrence
The message was in the erp_order_export_delay
queue, and it was rejected on Saturday, August 26, 2023 9:12:53 PM GMT-03:00
It has 20 seconds difference because I configured the TTL for 20 seconds :)
This is the message headers after the second retry:
After the third retry:
What will occur in the next consumer execution?
So, the run-as-root module intercepted the message processing failure, verified that the message has reached the retry limit and then persisted the message in the Magento database.
Now it is possible to manage the message through the admin panel, in the path:
Run as Root > Manage Messages
Now to can decide if you'll discard the message or send it back to the erp_order_export
queue.
In summary, while multiple approaches exist to tackle this issue, my aim has been to show a straightforward method for to implement a retry mechanism.
That's it for today, happy coding!
Top comments (4)
wow … that's a very informative article with a lot of useful details 👏🏻👏🏻👏🏻
Thank you Rico. Coming from you is an honor!
Very useful module, thank you. However, I think names for topic, exchange, queue binding ID etc should be unique rather than one universal
erp_order_export
.Got so confused when tried to configure my topics, exchanges queues and consumers
So if I don't run cli command
bin/magento queue:consumer:start erp_order_export --single-thread --max-messages=1
, then consumer never starts?Something is missing here.