Let's chat about ordering. It's one of my favorite topics, and something I've blogged about extensively before. Previously ordered processing in Azure Functions was only possible with event streams like Azure Event Hubs, but today I want to show how you can preserve order for Service Bus queues and topics as well.
On the surface it seems pretty straight-forward: I want to be able to process messages from a queue in the exact order that I received them. For a simple service running on a machine, it's pretty easy to achieve. However, how do I preserve the ordering of queue messages when I want to process at scale? With something like Azure Functions I may be processing messages across dozens of active instances, how can I preserve ordering?
Let's use a simple example of a messaging system that deals with patients at a hospital. Imagine I have a few events for each patient:
- Patient arrives
- Patient assigned a room
- Patient receives treatment
- Patient is discharged
I want to make sure I never process a message out of order and potentially discharge a patient before I've processed their treatment!
Let's run some quick experiments to see what happens. For this I'm going to simulate 1000 patients each sending these 4 messages (in order) and processing them (ideally in order as well).
Default and out of order
Let's try this with a simple Azure Function that just triggers on a queue. I'm not going to do anything special, just trigger on the queue and push the operation it's processing to a list on Redis Cache.
public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString")]Message message,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
await _client.PushData((string)message.UserProperties["patientId"], Encoding.UTF8.GetString(message.Body));
}
After sending 1000 patients worth of data (4 messages each) to this queue, what does the Redis Cache look like after processing? Well some of the patients look great. When I lookup Patient #4 I see:
>lrange Patient-$4 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"
Great! All 4 events were sent for Patient 4, and got processed in order. But if I look at patient 2:
>lrange Patient-$2 0 -1
1) "Message-1"
2) "Message-2"
3) "Message-0"
4) "Message-3"
In this case it didn't finish processing the "patient arrives" message until after 2 other messages had already been processed. So what happened here? Azure Service Bus does guarantee ordering, so why are my messages out of order?
Well by default, the queue trigger will do a few things. First, for every instance that spins up, it will process a set of messages concurrently. By default an instance concurrently processes 32 messages. That means it may be processing all 4 messages for a patient at the same time and they finish in different order than they were sent. Well that seems easy enough to fix, let's just limit the concurrency to 1.
Anti-pattern: limit scale out and concurrency
Here's maybe the most common solution to the above problem that I see. Let's limit the concurrency to only process 1 message at a time instead of 32. For that I modify my host.json
file and set the maxConcurrentCalls
to 1. Now each instance will only process 1 message at a time. I run the same test again.
First off, it's super slow. It takes me a long time to chew through the 4000 queue messages because each instance only processes 1 at a time. And worse yet? When I check the results afterwards, some of the patients are still out of order! What's going on here? Even though I limited the instance concurrency to 1, Azure Functions has scaled me out to multiple instances. So if I have 20 function app instances that have scaled, I have 20 messages being processed concurrently (1 per instance). That means I still get into a spot where messages from the same patient could be processed at the same time - just on different instances. I'm still not guaranteed ordered processing.
The fix here? Many people want to limit the scale out of Azure Functions. While it's technically possible, it would hurt my throughput even more. Now only one message globally could be processed at a time, meaning during high traffic I'm going to get a large backlog of patient events that my function may not be able to keep up with.
Sessions to the rescue
Wouldn't this be such a sad blog post if I ended it here? There is a better way! Previously I would have said your best bet here may be to use Event Hubs which, because of partitions and batches, you can guarantee ordering. The challenge here though is that sometimes a queue is the right message broker for the job given its transactional qualities like retries and deadlettering. And now you can use queues and get ordering with Service Bus sessions 🎉.
So what are sessions? Sessions enable you to set an identifier for a group of messages. In order to process messages from a session, you first have to "lock" the session. You can then start to process each message from the session individually (using the same lock / complete semantics of a regular queue). The benefit of sessions is it enables you to preserve order even when processing at high scale across multiple instances. Think of before where we had something like 20 Azure Function app instances all competing for the same queue. Rather than not scaling to 20, now all 20 instances each will "lock" its own available session and only process events from that session. Sessions also ensure that messages from a session are processed in order.
Sessions can be dynamically created at any time. An instance of Azure Functions spins up and first asks "are there any messages that have a session ID that hasn't been locked?" If so, it locks the session and starts processing in order. When a session no longer has any available messages, Azure Functions will release the lock and move on to the next available session. No message will be processed without first having to lock the session the message belongs to.
For our example above, I'm going to send the same 4000 messages (4 patient events for 1000 patients). In this case, I'm going to set the patient ID as the session ID. Each Azure Functions instance will acquire a lock on a session (patient), process any messages that are available, and then move on to another patient that has messages available.
Using sessions in Azure Functions
Sessions are currently available in the Microsoft.Azure.WebJobs.Extensions.ServiceBus
extension using version >= 3.1.0, and at the time of writing this is in preview. So first I'll pull in the extension.
Install-Package Microsoft.Azure.WebJobs.Extensions.ServiceBus -Pre
And then make the tiniest code change to my function code to enable sessions (isSessionsEnabled = true
):
public async Task Run(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message,
ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
await _client.PushData(message.SessionId, Encoding.UTF8.GetString(message.Body));
}
I also need to make sure I'm using a session-enabled queue or topic.
And when I push the messages to the queue, I'll set the right sessionId
for each patient message I send.
After publishing the function I push the 4000 messages. The queue gets drained pretty quickly, because I'm able to process multiple sessions concurrently across scaled-out instances. After running the test I check Redis Cache. As expected, I see all messages were processed, and for every single patient I see they were processed in order:
>lrange Patient-$10 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"
>lrange Patient-$872 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"
So with the new Azure Functions support for sessions, I can process messages from a Service Bus queue or topic in order without having to sacrifice on overall throughput. I can dynamically add messages to a new or existing session, and have confidence that messages in a session will be processed in the order they are received by service bus.
You can see the full sample I used for testing and loading messages in my GitHub repo. The master
branch will be all in order, and the out-of-order
branch is the default and out of order experiment.
Oldest comments (29)
This looks compelling when processing is successful, but what happens if a message in the session fails processing, say message #2? Will they still receive treatment and be discharged even though room assignment fails?
It will retry the message according to whatever retry policies you've set on the queue. So there's definitely a world you could see:
Eventually I expect the message would "deadletter" and processing on the session would continue. Here's the key line in the sessions doc:
If that is the case, to ensure proper business logic as described in this scenario, I would not rely on message ordering.
Using only Azure Functions it would make more sense to me to use Durable Function(s) to orchestrate the business logic, probably using the external events and/or monitoring patterns.
Even with durable you’re going to have at-least-once guarantees. You have more control over some of the retries for sure, but a scenario of “message 1 is poisoned. It won’t ever successfully process” is going to be a scenario your app will need to come to terms with regardless if functions or not. Do you not ever process 2-4? If you tried to resubmit message 1 it would go to back of queue anyway, so how does your app get to it? You’re not wrong that durable may provide a bit more control here but some of the problems if you want your app to be “once and always once” successful delivery just aren’t very feasible in a distributed cloud world.
We have done custom logic to handle this. We moved the message to a SQL table after the retries. If a new message arrives with same session available in SQL table, then the message will be moved to SQL table without processing. There was separate timer function to notify / process / cleanup the data in SQL table.
Is there a limit to the number of sessions processed concurrently?
I.e. Functions will scale out, but will Service Bus broker keep up?
Suppose it depends on the SKU of service bus. You can modify the SessionHandlerOptions in the
host.json
, so you can concurrently process multiple sessions on a single instance.Hi Jeff, excellent post, what happen if we have more than 32 messages for a specific session?
When a function places a lock on a session, it may make multiple requests to get many messages. You could have thousands of messages for the same session, the function app will process one at a time and then go back and ask service bus for the next one. With sessions you could even send a message to a session 1 year later, a function app will wake up, lock the session, and process that message. So really no limit to how many or how frequently you can push messages to a session.
In the case of sessions, because each message must be processed in order, you really get the concurrency from having multiple sessions processed at once on an instance, but to preserve ordering only one message from a session will be pulled at a time (if that makes sense)
docs.microsoft.com/en-us/azure/ser...
Great article, going to implement this :)
This is a pretty interesting approach. It puts me very much in mind of an Actor system. If this were to be coupled with durable functions to hold the state of the actor then I think you'd end up pretty close.
Hi Jeff, We implemented this and are using the latest pre-release version of the trigger. We're still getting overlapping invocations for the same session ids. In our logs we write the session id and the lock period, but we get invocations for the same session concurrently. Any ideas?
We figured it out. Our function wasn't returning a Task.
Is there a way to toggle revive modes from within the azure function? From my own experimentation, it seems that RecieveAndDelete is faster than PeekAndLock. For my use-case, I am okay with removing the message from the queue. I know there is an autocomplete option, but I'd like to experiment with receive modes if possible.
Hi Jeff,
Do the sessions work with Azure function with consumption plan? Thanks!
Regards,
Dmitrii
Yes - will work in all environments
Hi Jeff, thanks for the great article and example. One question I had. If you are using topics and subscriptions, is the session lock at the subscription level?
For example if you had two subscriptions (session enabled) with the enforce ordering on the topic. I assume the 2 subscriptions would both get copies of the messages (in order), if you had two azure functions each listening to a respective subscription, the order of those triggering functions maybe in any order?
I think the order of the messages themselves would be in-order delivered to each function. But the functions could possibly execute at the same or different times?
Jeff,
It seems that there have been some lingering issues with regards to how the ServiceBusTrigger connects to Service Bus and generating excessive requests (and thus costs).
github.com/Azure/Azure-Functions/i...
github.com/Azure/azure-webjobs-sdk...
Wondering if you have any observations with regards to the issues which were identified late last year.
Hi Jeff,
i have put my comments in below link.
github.com/jeffhollan/functions-cs...
Please look into it
HI Jeff, Just signed up on dev.to to say thanks. Have been struggling to find a solution approach for supporting multiple queues (Orders landing at various stores, need to be in sequence on a per store basis) was thinking of one queue per store and one function per queue, which would have made an unwieldy solution !
Really appreciate your post on using session with queue.
Cheers !
Hi Jeff, thanks for the article. I want to activate sessions for a topic and its subscription. Is it enough to just set the flag requiresSession on the subscription or do I have to set something on the topic as well? Unlike for the queue, I can't find a setting "Enable sessions"
Previously, I had to use Durable Functions with Monitors, Table Storage and Redis to achieve this. This makes it simple, easier and more efficient.
great article. thanks a lot for writing this.
could you please share your thoughts for processing in-order for cosmosdb change feed trigger? From the docs I read, it's not clear to me how it works. We are planning to use cosmos for achieving atomicity between making an entry in db and raising an event. There are cases where I need to maintain order when processing the change feed. when azure functions are scaled out to multiple instances do they process change feed of a specific partition key in parallel? or even a single instance of azure function process feed messages in parallel?