Let's chat about ordering. It's one of my favorite topics, and something I've blogged about extensively before. Previously ordered processing in Azure Functions was only possible with event streams like Azure Event Hubs, but today I want to show how you can preserve order for Service Bus queues and topics as well.
On the surface it seems pretty straight-forward: I want to be able to process messages from a queue in the exact order that I received them. For a simple service running on a machine, it's pretty easy to achieve. However, how do I preserve the ordering of queue messages when I want to process at scale? With something like Azure Functions I may be processing messages across dozens of active instances, how can I preserve ordering?
Let's use a simple example of a messaging system that deals with patients at a hospital. Imagine I have a few events for each patient:
- Patient arrives
- Patient assigned a room
- Patient receives treatment
- Patient is discharged
I want to make sure I never process a message out of order and potentially discharge a patient before I've processed their treatment!
Let's run some quick experiments to see what happens. For this I'm going to simulate 1000 patients each sending these 4 messages (in order) and processing them (ideally in order as well).
Default and out of order
Let's try this with a simple Azure Function that just triggers on a queue. I'm not going to do anything special, just trigger on the queue and push the operation it's processing to a list on Redis Cache.
public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString")]Message message,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
await _client.PushData((string)message.UserProperties["patientId"], Encoding.UTF8.GetString(message.Body));
}
After sending 1000 patients worth of data (4 messages each) to this queue, what does the Redis Cache look like after processing? Well some of the patients look great. When I lookup Patient #4 I see:
>lrange Patient-$4 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"
Great! All 4 events were sent for Patient 4, and got processed in order. But if I look at patient 2:
>lrange Patient-$2 0 -1
1) "Message-1"
2) "Message-2"
3) "Message-0"
4) "Message-3"
In this case it didn't finish processing the "patient arrives" message until after 2 other messages had already been processed. So what happened here? Azure Service Bus does guarantee ordering, so why are my messages out of order?
Well by default, the queue trigger will do a few things. First, for every instance that spins up, it will process a set of messages concurrently. By default an instance concurrently processes 32 messages. That means it may be processing all 4 messages for a patient at the same time and they finish in different order than they were sent. Well that seems easy enough to fix, let's just limit the concurrency to 1.
Anti-pattern: limit scale out and concurrency
Here's maybe the most common solution to the above problem that I see. Let's limit the concurrency to only process 1 message at a time instead of 32. For that I modify my host.json
file and set the maxConcurrentCalls
to 1. Now each instance will only process 1 message at a time. I run the same test again.
First off, it's super slow. It takes me a long time to chew through the 4000 queue messages because each instance only processes 1 at a time. And worse yet? When I check the results afterwards, some of the patients are still out of order! What's going on here? Even though I limited the instance concurrency to 1, Azure Functions has scaled me out to multiple instances. So if I have 20 function app instances that have scaled, I have 20 messages being processed concurrently (1 per instance). That means I still get into a spot where messages from the same patient could be processed at the same time - just on different instances. I'm still not guaranteed ordered processing.
The fix here? Many people want to limit the scale out of Azure Functions. While it's technically possible, it would hurt my throughput even more. Now only one message globally could be processed at a time, meaning during high traffic I'm going to get a large backlog of patient events that my function may not be able to keep up with.
Sessions to the rescue
Wouldn't this be such a sad blog post if I ended it here? There is a better way! Previously I would have said your best bet here may be to use Event Hubs which, because of partitions and batches, you can guarantee ordering. The challenge here though is that sometimes a queue is the right message broker for the job given its transactional qualities like retries and deadlettering. And now you can use queues and get ordering with Service Bus sessions 🎉.
So what are sessions? Sessions enable you to set an identifier for a group of messages. In order to process messages from a session, you first have to "lock" the session. You can then start to process each message from the session individually (using the same lock / complete semantics of a regular queue). The benefit of sessions is it enables you to preserve order even when processing at high scale across multiple instances. Think of before where we had something like 20 Azure Function app instances all competing for the same queue. Rather than not scaling to 20, now all 20 instances each will "lock" its own available session and only process events from that session. Sessions also ensure that messages from a session are processed in order.
Sessions can be dynamically created at any time. An instance of Azure Functions spins up and first asks "are there any messages that have a session ID that hasn't been locked?" If so, it locks the session and starts processing in order. When a session no longer has any available messages, Azure Functions will release the lock and move on to the next available session. No message will be processed without first having to lock the session the message belongs to.
For our example above, I'm going to send the same 4000 messages (4 patient events for 1000 patients). In this case, I'm going to set the patient ID as the session ID. Each Azure Functions instance will acquire a lock on a session (patient), process any messages that are available, and then move on to another patient that has messages available.
Using sessions in Azure Functions
Sessions are currently available in the Microsoft.Azure.WebJobs.Extensions.ServiceBus
extension using version >= 3.1.0, and at the time of writing this is in preview. So first I'll pull in the extension.
Install-Package Microsoft.Azure.WebJobs.Extensions.ServiceBus -Pre
And then make the tiniest code change to my function code to enable sessions (isSessionsEnabled = true
):
public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
await _client.PushData(message.SessionId, Encoding.UTF8.GetString(message.Body));
}
I also need to make sure I'm using a session-enabled queue or topic.
And when I push the messages to the queue, I'll set the right sessionId
for each patient message I send.
After publishing the function I push the 4000 messages. The queue gets drained pretty quickly, because I'm able to process multiple sessions concurrently across scaled-out instances. After running the test I check Redis Cache. As expected, I see all messages were processed, and for every single patient I see they were processed in order:
>lrange Patient-$10 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"
>lrange Patient-$872 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"
So with the new Azure Functions support for sessions, I can process messages from a Service Bus queue or topic in order without having to sacrifice on overall throughput. I can dynamically add messages to a new or existing session, and have confidence that messages in a session will be processed in the order they are received by service bus.
You can see the full sample I used for testing and loading messages in my GitHub repo. The master
branch will be all in order, and the out-of-order
branch is the default and out of order experiment.
Top comments (29)
This looks compelling when processing is successful, but what happens if a message in the session fails processing, say message #2? Will they still receive treatment and be discharged even though room assignment fails?
It will retry the message according to whatever retry policies you've set on the queue. So there's definitely a world you could see:
Eventually I expect the message would "deadletter" and processing on the session would continue. Here's the key line in the sessions doc:
We have done custom logic to handle this. We moved the message to a SQL table after the retries. If a new message arrives with same session available in SQL table, then the message will be moved to SQL table without processing. There was separate timer function to notify / process / cleanup the data in SQL table.
If that is the case, to ensure proper business logic as described in this scenario, I would not rely on message ordering.
Using only Azure Functions it would make more sense to me to use Durable Function(s) to orchestrate the business logic, probably using the external events and/or monitoring patterns.
Even with durable you’re going to have at-least-once guarantees. You have more control over some of the retries for sure, but a scenario of “message 1 is poisoned. It won’t ever successfully process” is going to be a scenario your app will need to come to terms with regardless if functions or not. Do you not ever process 2-4? If you tried to resubmit message 1 it would go to back of queue anyway, so how does your app get to it? You’re not wrong that durable may provide a bit more control here but some of the problems if you want your app to be “once and always once” successful delivery just aren’t very feasible in a distributed cloud world.
This is a pretty interesting approach. It puts me very much in mind of an Actor system. If this were to be coupled with durable functions to hold the state of the actor then I think you'd end up pretty close.
Great article. This solution fits my needs perfectly...almost. In my scenario it would be ideal if I could grab a whole batch of messages for a given session in a single Function invocation. Do you think we'll ever have a function trigger that supports this or is Event Hubs the way to go here? Hub triggers have the advantage (for me) of batching, but the disadvantage that partition keys are more "static". My scenario is closer to yours with Patient ID, where it would be nice if this partitioning were completely dynamic like how Sessions do it. So I'm a little torn on which messaging service to use.
Hi Jeff, We implemented this and are using the latest pre-release version of the trigger. We're still getting overlapping invocations for the same session ids. In our logs we write the session id and the lock period, but we get invocations for the same session concurrently. Any ideas?
We figured it out. Our function wasn't returning a Task.
HI Jeff, Just signed up on dev.to to say thanks. Have been struggling to find a solution approach for supporting multiple queues (Orders landing at various stores, need to be in sequence on a per store basis) was thinking of one queue per store and one function per queue, which would have made an unwieldy solution !
Really appreciate your post on using session with queue.
Cheers !
Is there a way to toggle revive modes from within the azure function? From my own experimentation, it seems that RecieveAndDelete is faster than PeekAndLock. For my use-case, I am okay with removing the message from the queue. I know there is an autocomplete option, but I'd like to experiment with receive modes if possible.
Thank this axticule is really hepfull, if in this implementation i want to set as complete to take out of thr queue or set as Defer for the cases that i had an extra business logic valdiation that i need to do that. how can i do this? since that i do not have a QueueClient to do this
Hi Jeff, thanks for the article. I want to activate sessions for a topic and its subscription. Is it enough to just set the flag requiresSession on the subscription or do I have to set something on the topic as well? Unlike for the queue, I can't find a setting "Enable sessions"
Jeff,
It seems that there have been some lingering issues with regards to how the ServiceBusTrigger connects to Service Bus and generating excessive requests (and thus costs).
github.com/Azure/Azure-Functions/i...
github.com/Azure/azure-webjobs-sdk...
Wondering if you have any observations with regards to the issues which were identified late last year.
Hi Jeff, great article. thanks a lot for writing this.
When Sessions are enabled, will each concurrent process (when maxConcurrentRequests >1 ) in the single Azure Function instance acquires lock for different sessions ?