loading...
Cover image for Ordered queue processing in Azure Functions with Sessions
Microsoft Azure

Ordered queue processing in Azure Functions with Sessions

jeffhollan profile image Jeff Hollan ・6 min read

Let's chat about ordering. It's one of my favorite topics, and something I've blogged about extensively before. Previously ordered processing in Azure Functions was only possible with event streams like Azure Event Hubs, but today I want to show how you can preserve order for Service Bus queues and topics as well.

On the surface it seems pretty straight-forward: I want to be able to process messages from a queue in the exact order that I received them. For a simple service running on a machine, it's pretty easy to achieve. However, how do I preserve the ordering of queue messages when I want to process at scale? With something like Azure Functions I may be processing messages across dozens of active instances, how can I preserve ordering?

Let's use a simple example of a messaging system that deals with patients at a hospital. Imagine I have a few events for each patient:

  1. Patient arrives
  2. Patient assigned a room
  3. Patient receives treatment
  4. Patient is discharged

I want to make sure I never process a message out of order and potentially discharge a patient before I've processed their treatment!

Let's run some quick experiments to see what happens. For this I'm going to simulate 1000 patients each sending these 4 messages (in order) and processing them (ideally in order as well).

Service Bus logo

Default and out of order

Let's try this with a simple Azure Function that just triggers on a queue. I'm not going to do anything special, just trigger on the queue and push the operation it's processing to a list on Redis Cache.

public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString")]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    await _client.PushData((string)message.UserProperties["patientId"], Encoding.UTF8.GetString(message.Body));
}

After sending 1000 patients worth of data (4 messages each) to this queue, what does the Redis Cache look like after processing? Well some of the patients look great. When I lookup Patient #4 I see:

>lrange Patient-$4 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"

Great! All 4 events were sent for Patient 4, and got processed in order. But if I look at patient 2:

>lrange Patient-$2 0 -1
1) "Message-1"
2) "Message-2"
3) "Message-0"
4) "Message-3"

In this case it didn't finish processing the "patient arrives" message until after 2 other messages had already been processed. So what happened here? Azure Service Bus does guarantee ordering, so why are my messages out of order?

Well by default, the queue trigger will do a few things. First, for every instance that spins up, it will process a set of messages concurrently. By default an instance concurrently processes 32 messages. That means it may be processing all 4 messages for a patient at the same time and they finish in different order than they were sent. Well that seems easy enough to fix, let's just limit the concurrency to 1.

Anti-pattern: limit scale out and concurrency

Here's maybe the most common solution to the above problem that I see. Let's limit the concurrency to only process 1 message at a time instead of 32. For that I modify my host.json file and set the maxConcurrentCalls to 1. Now each instance will only process 1 message at a time. I run the same test again.

First off, it's super slow. It takes me a long time to chew through the 4000 queue messages because each instance only processes 1 at a time. And worse yet? When I check the results afterwards, some of the patients are still out of order! What's going on here? Even though I limited the instance concurrency to 1, Azure Functions has scaled me out to multiple instances. So if I have 20 function app instances that have scaled, I have 20 messages being processed concurrently (1 per instance). That means I still get into a spot where messages from the same patient could be processed at the same time - just on different instances. I'm still not guaranteed ordered processing.

The fix here? Many people want to limit the scale out of Azure Functions. While it's technically possible, it would hurt my throughput even more. Now only one message globally could be processed at a time, meaning during high traffic I'm going to get a large backlog of patient events that my function may not be able to keep up with.

Sessions to the rescue

Wouldn't this be such a sad blog post if I ended it here? There is a better way! Previously I would have said your best bet here may be to use Event Hubs which, because of partitions and batches, you can guarantee ordering. The challenge here though is that sometimes a queue is the right message broker for the job given its transactional qualities like retries and deadlettering. And now you can use queues and get ordering with Service Bus sessions 🎉.

So what are sessions? Sessions enable you to set an identifier for a group of messages. In order to process messages from a session, you first have to "lock" the session. You can then start to process each message from the session individually (using the same lock / complete semantics of a regular queue). The benefit of sessions is it enables you to preserve order even when processing at high scale across multiple instances. Think of before where we had something like 20 Azure Function app instances all competing for the same queue. Rather than not scaling to 20, now all 20 instances each will "lock" its own available session and only process events from that session. Sessions also ensure that messages from a session are processed in order.

Sessions can be dynamically created at any time. An instance of Azure Functions spins up and first asks "are there any messages that have a session ID that hasn't been locked?" If so, it locks the session and starts processing in order. When a session no longer has any available messages, Azure Functions will release the lock and move on to the next available session. No message will be processed without first having to lock the session the message belongs to.

For our example above, I'm going to send the same 4000 messages (4 patient events for 1000 patients). In this case, I'm going to set the patient ID as the session ID. Each Azure Functions instance will acquire a lock on a session (patient), process any messages that are available, and then move on to another patient that has messages available.

Using sessions in Azure Functions

Sessions are currently available in the Microsoft.Azure.WebJobs.Extensions.ServiceBus extension using version >= 3.1.0, and at the time of writing this is in preview. So first I'll pull in the extension.

Install-Package Microsoft.Azure.WebJobs.Extensions.ServiceBus -Pre

And then make the tiniest code change to my function code to enable sessions (isSessionsEnabled = true):

public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    await _client.PushData(message.SessionId, Encoding.UTF8.GetString(message.Body));
}

I also need to make sure I'm using a session-enabled queue or topic.

session enabled queue

And when I push the messages to the queue, I'll set the right sessionId for each patient message I send.

After publishing the function I push the 4000 messages. The queue gets drained pretty quickly, because I'm able to process multiple sessions concurrently across scaled-out instances. After running the test I check Redis Cache. As expected, I see all messages were processed, and for every single patient I see they were processed in order:

>lrange Patient-$10 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"

>lrange Patient-$872 0 -1
1) "Message-0"
2) "Message-1"
3) "Message-2"
4) "Message-3"

So with the new Azure Functions support for sessions, I can process messages from a Service Bus queue or topic in order without having to sacrifice on overall throughput. I can dynamically add messages to a new or existing session, and have confidence that messages in a session will be processed in the order they are received by service bus.

You can see the full sample I used for testing and loading messages in my GitHub repo. The master branch will be all in order, and the out-of-order branch is the default and out of order experiment.

Discussion

pic
Editor guide
Collapse
thetinomen profile image
Bob Langley

This looks compelling when processing is successful, but what happens if a message in the session fails processing, say message #2? Will they still receive treatment and be discharged even though room assignment fails?

Collapse
jeffhollan profile image
Jeff Hollan Author

It will retry the message according to whatever retry policies you've set on the queue. So there's definitely a world you could see:

>lrange Patient-$10 0 -1
1) "Message-0"
2) "Message-0"
3) "Message-1"
4) "Message-2"
5) "Message-3"

Eventually I expect the message would "deadletter" and processing on the session would continue. Here's the key line in the sessions doc:

A new message can only be obtained [for that session] when the prior message has been completed or dead-lettered. Abandoning a message causes the same message to be served again with the next receive operation.

Collapse
devakumar profile image
Devakumar

We have done custom logic to handle this. We moved the message to a SQL table after the retries. If a new message arrives with same session available in SQL table, then the message will be moved to SQL table without processing. There was separate timer function to notify / process / cleanup the data in SQL table.

Collapse
thetinomen profile image
Bob Langley

If that is the case, to ensure proper business logic as described in this scenario, I would not rely on message ordering.

Using only Azure Functions it would make more sense to me to use Durable Function(s) to orchestrate the business logic, probably using the external events and/or monitoring patterns.

Thread Thread
jeffhollan profile image
Jeff Hollan Author

Even with durable you’re going to have at-least-once guarantees. You have more control over some of the retries for sure, but a scenario of “message 1 is poisoned. It won’t ever successfully process” is going to be a scenario your app will need to come to terms with regardless if functions or not. Do you not ever process 2-4? If you tried to resubmit message 1 it would go to back of queue anyway, so how does your app get to it? You’re not wrong that durable may provide a bit more control here but some of the problems if you want your app to be “once and always once” successful delivery just aren’t very feasible in a distributed cloud world.

Collapse
stimms profile image
Simon Timms

This is a pretty interesting approach. It puts me very much in mind of an Actor system. If this were to be coupled with durable functions to hold the state of the actor then I think you'd end up pretty close.

Collapse
tmenier profile image
Todd Menier

Great article. This solution fits my needs perfectly...almost. In my scenario it would be ideal if I could grab a whole batch of messages for a given session in a single Function invocation. Do you think we'll ever have a function trigger that supports this or is Event Hubs the way to go here? Hub triggers have the advantage (for me) of batching, but the disadvantage that partition keys are more "static". My scenario is closer to yours with Patient ID, where it would be nice if this partitioning were completely dynamic like how Sessions do it. So I'm a little torn on which messaging service to use.

Collapse
aaron53 profile image
Aaron53

Is there a way to toggle revive modes from within the azure function? From my own experimentation, it seems that RecieveAndDelete is faster than PeekAndLock. For my use-case, I am okay with removing the message from the queue. I know there is an autocomplete option, but I'd like to experiment with receive modes if possible.

Collapse
charliedigital profile image
CharlieDigital

Jeff,

It seems that there have been some lingering issues with regards to how the ServiceBusTrigger connects to Service Bus and generating excessive requests (and thus costs).

github.com/Azure/Azure-Functions/i...
github.com/Azure/azure-webjobs-sdk...

Wondering if you have any observations with regards to the issues which were identified late last year.

Collapse
dtrusty profile image
Dennis Trusty

Hi Jeff, We implemented this and are using the latest pre-release version of the trigger. We're still getting overlapping invocations for the same session ids. In our logs we write the session id and the lock period, but we get invocations for the same session concurrently. Any ideas?

Collapse
dtrusty profile image
Dennis Trusty

We figured it out. Our function wasn't returning a Task.

Collapse
smeile profile image
smeile

Hi Jeff, thanks for the article. I want to activate sessions for a topic and its subscription. Is it enough to just set the flag requiresSession on the subscription or do I have to set something on the topic as well? Unlike for the queue, I can't find a setting "Enable sessions"

Collapse
gautamjsingh profile image
gautam singh

HI Jeff, Just signed up on dev.to to say thanks. Have been struggling to find a solution approach for supporting multiple queues (Orders landing at various stores, need to be in sequence on a per store basis) was thinking of one queue per store and one function per queue, which would have made an unwieldy solution !

Really appreciate your post on using session with queue.

Cheers !

Collapse
rameshjanjyam profile image
Ramesh Janjyam

great article. thanks a lot for writing this.
could you please share your thoughts for processing in-order for cosmosdb change feed trigger? From the docs I read, it's not clear to me how it works. We are planning to use cosmos for achieving atomicity between making an entry in db and raising an event. There are cases where I need to maintain order when processing the change feed. when azure functions are scaled out to multiple instances do they process change feed of a specific partition key in parallel? or even a single instance of azure function process feed messages in parallel?

Collapse
seanfeldman profile image
Sean Feldman

Is there a limit to the number of sessions processed concurrently?
I.e. Functions will scale out, but will Service Bus broker keep up?

Collapse
jeffhollan profile image
Jeff Hollan Author

Suppose it depends on the SKU of service bus. You can modify the SessionHandlerOptions in the host.json, so you can concurrently process multiple sessions on a single instance.

Collapse
jwisener profile image
Jason Wisener

Hi Jeff, thanks for the great article and example. One question I had. If you are using topics and subscriptions, is the session lock at the subscription level?

For example if you had two subscriptions (session enabled) with the enforce ordering on the topic. I assume the 2 subscriptions would both get copies of the messages (in order), if you had two azure functions each listening to a respective subscription, the order of those triggering functions maybe in any order?

I think the order of the messages themselves would be in-order delivered to each function. But the functions could possibly execute at the same or different times?

Collapse
tehmas profile image
Asad Raheem

Previously, I had to use Durable Functions with Monitors, Table Storage and Redis to achieve this. This makes it simple, easier and more efficient.

Collapse
dimanike profile image
DimaNike

Hi Jeff,

Do the sessions work with Azure function with consumption plan? Thanks!

Regards,
Dmitrii

Collapse
jeffhollan profile image
Jeff Hollan Author

Yes - will work in all environments

Collapse
appalaraju profile image
appalaraju

Hi Jeff,

i have put my comments in below link.
github.com/jeffhollan/functions-cs...

Please look into it

Collapse
zerohash profile image
Justin King

Great article, going to implement this :)

Collapse
eltavodev profile image
ElTavo

Hi Jeff, excellent post, what happen if we have more than 32 messages for a specific session?

Collapse
jeffhollan profile image
Jeff Hollan Author

When a function places a lock on a session, it may make multiple requests to get many messages. You could have thousands of messages for the same session, the function app will process one at a time and then go back and ask service bus for the next one. With sessions you could even send a message to a session 1 year later, a function app will wake up, lock the session, and process that message. So really no limit to how many or how frequently you can push messages to a session.

In the case of sessions, because each message must be processed in order, you really get the concurrency from having multiple sessions processed at once on an instance, but to preserve ordering only one message from a session will be pulled at a time (if that makes sense)

docs.microsoft.com/en-us/azure/ser...