DEV Community

Chris Gillum
Chris Gillum

Posted on

Scheduling tons of orchestrator functions concurrently in C#

I had a call with an engineer on another team who wanted advice for how to schedule a large number of concurrent orchestration functions in Durable Functions. I shared with him some sample code for how we do it in our own internal performance tests and decided it might be useful to share publicly too.

First, here is the orchestration that we're running for the test. It's just a basic sequential orchestrator that calls a SayHello activity function 5 times.

[FunctionName(nameof(HelloSequence))]
public static async Task<List<string>> HelloSequence(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var outputs = new List<string>
    {
        await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
        await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"),
        await context.CallActivityAsync<string>(nameof(SayHello), "London"),
        await context.CallActivityAsync<string>(nameof(SayHello), "Amsterdam"),
        await context.CallActivityAsync<string>(nameof(SayHello), "Mumbai")
    };

    return outputs;
}

[FunctionName(nameof(SayHello))]
public static string SayHello([ActivityTrigger] string name) => $"Hello {name}!";
Enter fullscreen mode Exit fullscreen mode

And here is the HTTP trigger function we use to trigger a performance run. It takes a count parameter from the query string as the number of concurrent "HelloSequence" orchestrations to run. In our tests, we'll often run more than 100K orchestrations concurrently.

[FunctionName(nameof(StartManySequences))]
public static async Task<IActionResult> StartManySequences(
    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
    [DurableClient] IDurableClient starter,
    ILogger log)
{
    if (!int.TryParse(req.Query["count"], out int count) || count < 1)
    {
        return new BadRequestObjectResult("A 'count' query string parameter is required and it must contain a positive number.");
    }

    string prefix = await ScheduleManyInstances(starter, nameof(HelloSequence), count, log);
    return new OkObjectResult($"Scheduled {count} orchestrations prefixed with '{prefix}'.");
}
Enter fullscreen mode Exit fullscreen mode

This method calls into a ScheduleManyInstances helper method, which does the actual scheduling. Before I get into our implementation, however, I think it would be useful to describe what not to do.

Naïve implementation #1: Sequential

The most common naïve implementation is to use a for-loop with an await in each iteration.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    for (int i = 0; i < count; i++)
    {
        // Start each instance one-at-a-time
        string instanceId = $"{prefix}-{i:X16}";
        await client.StartNewAsync(orchestrationName, instanceId);
    }

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}
Enter fullscreen mode Exit fullscreen mode

The above is really slow because you're only enqueuing a single orchestration start message at a time. If you're scheduling a large number of orchestrations, then the client that invoked this HTTP function will probably time-out before all the orchestrations are scheduled. Ideally we'd schedule orchestrations in parallel, which leads us to the next bad practice.

Naïve implementation #2: Too much parallelism

Next we try using Task.WhenAll to schedule all the orchestrations in parallel. This is better than scheduling orchestrations sequentially because it allows us to queue up new work much more quickly. However, it has a major scalability problem, which I'll describe below.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    // Run all StartNewAsync tasks concurrently
    var startTasks = new Task[count];
    for (int i = 0; i < count; i++)
    {
        string instanceId = $"{prefix}-{i:X16}";
        startTasks[i] = client.StartNewAsync(orchestrationName, instanceId);
    }

    await Task.WhenAll(startTasks);

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}
Enter fullscreen mode Exit fullscreen mode

The problem is that if count is a large number (like 100K), you will very quickly exhaust both threads and outbound TCP connections on your VM. This is because the .NET thread scheduler will try to aggressively allocate a huge number of threads to satisfy your concurrency demands. Each of those threads will also try to open a connection to Azure Storage concurrently, requiring a new TCP connection. The result is often that the function will fail, making this implementation highly unreliable.

Naïve implementation #3: Throttled Parallelism for Parallel.For

This next approach uses Parallel.For to use a throttled concurrency approach.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    // Use up to 200 threads to schedule orchestrations concurrently
    var maxConcurrencyOptions = new ParallelOptions { MaxDegreeOfParallelism = 200 };
    Parallel.For(0, count, maxConcurrencyOptions, i =>
    {
        string instanceId = $"{prefix}-{i:X16}";

        // Use GetAwaiter().GetResult() to block since Parallel.For() doesn't support async
        client.StartNewAsync(orchestrationName, instanceId).GetAwaiter().GetResult();
    });

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}
Enter fullscreen mode Exit fullscreen mode

This works much better than the first two solutions. It fixes the TCP connection exhaustion issue by giving the system enough time to reuse existing TCP connections. It also addresses the thread starvation issue by ensuring we don't use more than 200 threads at the same time.

However, the Parallel.For solution is still inefficient because each degree of parallelism is occupying a dedicated thread, and those threads will get blocked waiting for the StartNewAsync call to complete. Threads are expensive in terms of CPU and memory and take time to allocate. Ideally we'd do this work in a non-blocking way that allows us to aggressively reuse threads.

Final implementation: Async throttled parallelism

The solution we use is a variation of the above that provides much better thread reuse by using a fully async implementation. I defined a ParallelForEachAsync helper extension method to achieve this.

public static async Task<string> ScheduleManyInstances(
    IDurableOrchestrationClient client,
    string orchestrationName,
    int count,
    ILogger log)
{
    log.LogWarning($"Scheduling {count} orchestration(s)...");
    DateTime utcNow = DateTime.UtcNow;
    string prefix = utcNow.ToString("yyyyMMdd-hhmmss");

    await Enumerable.Range(0, count).ParallelForEachAsync(200, i =>
    {
        string instanceId = $"{prefix}-{i:X16}";
        return client.StartNewAsync(orchestrationName, instanceId);
    });

    log.LogWarning($"All {count} orchestrations were scheduled successfully!");
    return prefix;
}
Enter fullscreen mode Exit fullscreen mode

Here is the ParallelForEachAsync extension method implementation, which includes the async parallel throttling behavior.

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> items, int maxConcurrency, Func<T, Task> action)
{
    List<Task> tasks;
    if (items is ICollection<T> itemCollection)
    {
        // optimization to reduce the number of memory allocations
        tasks = new List<Task>(itemCollection.Count);
    }
    else
    {
        tasks = new List<Task>();
    }

    using var semaphore = new SemaphoreSlim(maxConcurrency);
    foreach (T item in items)
    {
        tasks.Add(InvokeThrottledAction(item, action, semaphore));
    }

    await Task.WhenAll(tasks);
}

static async Task InvokeThrottledAction<T>(T item, Func<T, Task> action, SemaphoreSlim semaphore)
{
    await semaphore.WaitAsync();
    try
    {
        await action(item);
    }
    finally
    {
        semaphore.Release();
    }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, we use a SemaphoreSlim to ensure we don't execute more than maxConcurrency concurrent actions at the same time. Because the code path is fully async, we can now run many operations in parallel with a much smaller number of threads. Right now we have maxConcurrency set to 200, but it's very possible that a larger number could have worked as well.

Note that this ParallelForEachAsync extension method is generic and can be used for any .NET code that needs to execute asynchronous tasks with a cap on concurrency. In fact, we use it inside parts of the Durable Task Framework to reduce the number of concurrent TCP connections we open when making calls to Azure Storage.

Anyways, I hope this is helpful for anyone doing performance work with Durable Functions. If there is a simple way to do this in .NET and I just didn't know about it, I'm interested in learning about that too!

Top comments (5)

Collapse
 
rahuldj profile image
rahuldj

You can also use ActionBlock (TPL dataflow) to orchestrate throttled consumer. It provides maxDegreeOfParallelism which works similar to max Concurrency. It would be more concise and has additional features that are useful in more complex scenarios (e.g. Chaining ActionBlocks to create a workflow.)

Collapse
 
paulcbrown profile image
Paul Brown

Could you explain how to accomplish somthing similar from within an orchestration function to start tons of sub orchestrations or activity functions? There does not seem to be connction pooling for starting sub orchestrations or activity functions. So, my consumtion plan AZ func instance quickly runs out of connections.

Collapse
 
cgillum profile image
Chris Gillum

Check to see if you're using the latest nuget package version. We added similar logic from this post into the Durable Task Framework code that schedules activities and sub-orchestrations. If you're still running out of connections, then there might be something we missed.

However, be careful about scheduling too many actions from a single orchestration instance. That could cause the history to become large and unwieldy. It's better to break up large fan-outs across multiple orchestrations. Perhaps the subject of another blog post. :)

Collapse
 
allanshady profile image
Allan Camilo

This is a such advanced C# topic. Thanks!

Collapse
 
tingwei628 profile image
Tingwei • Edited

As @rahuldj mentioned, I also wrote a post to discuss the usage of ActionBlock in async/await codes.