Parallel.ForEachAsync() and Task.Run() With When.All in C
Parallel Programming
In general, parallel programming involves using multiple threads or processors to execute tasks concurrently. It aims to improve performance and responsiveness by dividing tasks into smaller parts that can be processed simultaneously.
Apart from improving performance and responsiveness, there are additional advantages when using parallel programming. Firstly, by breaking tasks into concurrent subtasks, we can effectively reduce overall execution time. One additional benefit is throughput enhancement as a result of handling multiple tasks simultaneously. Also, running tasks in parallel helps us ensure scalability since it efficiently distributes tasks across processors. This allows performance to scale seamlessly when adding resources.
One more thing we should take into consideration when working with parallel programming is which kind of processes we are trying to parallelize. In this article, we will mention I/O-bound and CPU-bound ones.
I/O bound processes are processes where the computational duration is determined by the time spent awaiting input/output operations, an example of this is a database call. On the other hand, we have CPU-bound processes. In this case, the performance of the CPU determines the task duration, an example is a method that does some heavy numerical calculations.
Now that we have a quick primer about parallel programming and different types of processes, let’s quickly set everything up and see it in action.
Setting up Async Methods
Since we already have a great article going more in-depth on How to Execute Multiple Tasks Asynchronously, here we will only create a baseline for the Task.WhenAll() method which we will modify when comparing the two approaches.
We start with the default web-API project and expand the WeatherForecastController method with an asynchronous method that runs multiple times:
[HttpGet("weather-forecast-when-all", Name = "GetWeatherForecastWhenAll")]
public async Task<IEnumerable<WeatherForecast>> GetWeatherForecastWhenAll()
{
var result1 = await AsyncMethod();
var result2 = await AsyncMethod();
var result3 = await AsyncMethod();
var result = result1.Concat(result2).Concat(result3);
return result;
}
private static async Task<IEnumerable<WeatherForecast>> AsyncMethod()
{
await Task.Delay(250);
return Enumerable.Range(6, 5).Select(index => new WeatherForecast
{
Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
TemperatureC = new Random().Next(-20, 55),
Summary = Summaries[new Random().Next(Summaries.Length)]
})
.ToArray();
}
In the context of types of processes, AsyncMethod() emulates the I/O-bound process and the task delay represents the waiting time of a sub-system response.
After we set everything up, let’s see how to execute these tasks in parallel.
Use Task.WhenAll
First, we need to refactor the GetWeatherForecastWhenAll() method to use the Task.WhenAll() method. It takes an enumerable of tasks and returns a new completed task once all the individual tasks in the collection finish running:
[HttpGet("weather-forecast-when-all", Name = "GetWeatherForecastWhenAll")]
public async Task<IEnumerable<WeatherForecast>> GetWeatherForecastWhenAll()
{
var tasks = new List<Task<IEnumerable<WeatherForecast>>>();
var result1 = AsyncMethod();
var result2 = AsyncMethod();
var result3 = AsyncMethod();
tasks.Add(result1);
tasks.Add(result2);
tasks.Add(result3);
var combinedResults = await Task.WhenAll(tasks);
var result = combinedResults.SelectMany(cr => cr);
return result;
}
We define an empty list of tasks. Next, we call AsyncMethod() three times without the await keyword. This starts executing these tasks one after another without waiting for them to complete. This is exactly what we want since we add those tasks to our tasks list and use Task.WhenAll() to wait for all of them to complete.
Lastly, when all the tasks are completed, we flatten the combinedResults variable that holds the results and return the result to the user.
We need to keep thread usage in mind when we use parallel execution of tasks. Starting too many threads at once increases context-switching overhead and may impact overall application efficiency. Also, we don’t want to block the main thread. So let’s see how we can get a better understanding of how this method works under the hood regarding threads.
Thread Processing
We start by adding logging to the threads:
[HttpGet("weather-forecast-when-all", Name = "GetWeatherForecastWhenAll")]
public async Task<IEnumerable<WeatherForecast>> GetWeatherForecastWhenAll()
{
Console.WriteLine($"GetWeatherForecastWhenAll started on thread: {Environment.CurrentManagedThreadId}");
var tasks = new List<Task<IEnumerable<WeatherForecast>>>();
var result1 = AsyncMethod();
var result2 = AsyncMethod();
var result3 = AsyncMethod();
tasks.Add(result1);
tasks.Add(result2);
tasks.Add(result3);
var combinedResults = await Task.WhenAll(tasks);
var result = combinedResults.SelectMany(cr => cr);
Console.WriteLine($"GetWeatherForecastWhenAll started on thread: {Environment.CurrentManagedThreadId}");
return result;
}
private static async Task<IEnumerable<WeatherForecast>> AsyncMethod()
{
Console.WriteLine($"AsyncMethod started on thread: {Environment.CurrentManagedThreadId}");
await Task.Delay(250);
Console.WriteLine($"AsyncMethod completed on thread: {Environment.CurrentManagedThreadId}");
return Enumerable.Range(6, 5).Select(index => new WeatherForecast
{
Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
TemperatureC = new Random().Next(-20, 55),
Summary = Summaries[new Random().Next(Summaries.Length)]
})
.ToArray();
}
Here, we add a Console.WriteLine() statement at the beginning and end of each method. There, we print on which thread methods start and end by using Environment.CurrentManagedThreadId.
Now, if we execute our request, in the output window we can see how threads behave:
GetWeatherForecastWhenAll started on thread: 16
AsyncMethod started on thread: 16
AsyncMethod started on thread: 16
AsyncMethod started on thread: 16
AsyncMethod completed on thread: 7
AsyncMethod completed on thread: 16
AsyncMethod completed on thread: 15
GetWeatherForecastWhenAll completed on thread: 7
Let’s break this down to understand what happens.
When we send an HTTP request, a thread from the thread pool gets assigned to handle it. In our case, it is thread number 16. Then, when we invoke our async methods and we don’t use the await keyword, tasks will usually start executing on the same thread, i.e., 16.
However, when an asynchronous operation encounters the await keyword, in our case await on Task.WhenAll(), it releases the current thread to the thread pool during the waiting period for the task to be completed. When the awaiting operation completes and we want to return the result, the continuation might not necessarily resume on the original thread. That is why we see some of the tasks finish on different threads than they start on.
Besides creating a task by not using the await keyword we can also use Task.Run() method, so let’s take a look at it.
Use Task.Run With Task.WhenAll
By using the Task.Run() method to execute tasks, we make sure that each new task executes on a separate thread:
[HttpGet("weather-forecast-when-all", Name = "GetWeatherForecastWhenAll")]
public async Task<IEnumerable<WeatherForecast>> GetWeatherForecastWhenAll()
{
var result1 = Task.Run(() => AsyncMethod());
var result2 = Task.Run(() => AsyncMethod());
var result3 = Task.Run(() => AsyncMethod());
var combinedResults = await Task.WhenAll(result1, result2, result3);
var result = combinedResults.SelectMany(cr => cr);
return result;
}
Here, we use the Task.Run() method to execute AsyncMethod() three times in a row. Again, by skipping the await keyword we are not awaiting any method to complete, but we run them in parallel and on Task.WhenAll() await their results.
Now, let’s retake a look at the output logs when executing the request:
GetWeatherForecastWhenAll started on thread: 20
AsyncMethod started on thread: 19
AsyncMethod started on thread: 21
AsyncMethod started on thread: 13
AsyncMethod completed on thread: 21
AsyncMethod completed on thread: 13
AsyncMethod completed on thread: 20
GetWeatherForecastWhenAll completed on thread: 20
This time, we see that each new task starts its execution on a new thread. We expect this behavior when using Task.Run() since its purpose is to offload work from the current thread. Same as in the previous example due to the async/await nature and thread pool assigning threads, tasks finish on different threads than they originally start on.
Using Task.Run() requires caution as it might have some drawbacks. Since it offloads work to a new thread, any time it deals with a large number of tasks it can create a large number of threads, each consuming resources and possibly causing thread pool starvation.
Now that we have seen how we can explicitly offload each task to a new thread, let’s look at how we can use another method to perform these tasks in parallel.
Using Parallel.ForEachAsync
Another way we parallelize this work is to use the Parallel.ForEachAsync() method:
[HttpGet("weather-forecast-parallel", Name = "GetWeatherForecastParallelForEachAsync")]
public async Task<IEnumerable<WeatherForecast>> GetWeatherForecastParallelForEachAsync()
{
Console.WriteLine($"GetWeatherForecastParallelForEachAsync started on thread:
{Environment.CurrentManagedThreadId}");
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 3
};
var resultBag = new ConcurrentBag<IEnumerable<WeatherForecast>>();
await Parallel.ForEachAsync(Enumerable.Range(0, 3), parallelOptions, async (index, _) =>
{
var result = await AsyncMethod();
resultBag.Add(result);
});
Console.WriteLine($"GetWeatherForecastParallelForEachAsync completed on thread:
{Environment.CurrentManagedThreadId}");
return resultBag.SelectMany(cr => cr);
}
First, we set the MaxDegreeOfParallelism value. With this setting, we define how many concurrent operations run. If not set, it uses as many threads as the underlying scheduler provides. To determine this value for a CPU process start with the Environment.ProcessorCount. For I/O-bound processes, this value is harder to determine since it depends on the I/O subsystem, which includes network latency, database responsiveness, etc. So when working with I/O bound processes, we need to do testing with different values to determine the best value for maximum parallelization.
After, we define a ConcurrentBag for our results, which is a thread-safe collection since we use parallel execution of tasks and handle results in a loop. Allowing us to safely modify the collection without worrying about concurrency modification exceptions. Lastly, we set up Parallel.ForEachAsync() method to run three times with set options, and inside the loop, we await each result and add it to the resultBag.
One thing to mention when using the Parallel.ForEachAsync() method is that it has its underlying partitioning. This partitioning divides the input data into manageable batches and assigns each batch to a different thread for parallel processing. The exact size of the batches is determined dynamically by the framework based on factors such as the number of available processors and the characteristics of the input data. So by defining the MaxDegreeOfParallelism, we define the number of batched tasks that execute concurrently.
Regarding thread usage, since we are not explicitly altering thread assignments, threads get assigned as they usually do in the classic async/await process. One difference with the Task.WhenAll() thread usage is that most likely every task starts on its thread since we use the await keyword for each call inside the loop.
Now, let’s take a look at how the Task.Run() method behaves in this case.
Using Task.Run With Parallel.ForEachAsync
Let’s modify our method to use Task.Run() for generating tasks:
[HttpGet("weather-forecast-parallel", Name = "GetWeatherForecastParallelForEachAsync")]
public async Task<IEnumerable<WeatherForecast>> GetWeatherForecastParallelForEachAsync()
{
Console.WriteLine($"GetWeatherForecastParallelForEachAsync started on thread:
{Environment.CurrentManagedThreadId}");
ParallelOptions parallelOptions = new()
{
MaxDegreeOfParallelism = 3
};
var resultBag = new ConcurrentBag<IEnumerable<WeatherForecast>>();
await Parallel.ForEachAsync(Enumerable.Range(0, 3), parallelOptions, async (index, _) =>
{
var result = await Task.Run(() => AsyncMethod());
resultBag.Add(result);
});
Console.WriteLine($"GetWeatherForecastParallelForEachAsync completed on thread:
{Environment.CurrentManagedThreadId}");
return resultBag.SelectMany(cr => cr);
}
However, this may not be the best approach in this case. As we already saw, Parallel.ForEachAsync() has a built-in partitioner that creates batches of tasks and processes them in a single thread. But by using Task.Run() we offload each task into its thread. So using Task.Run() in this case, undermines the benefit of using Parallel.ForEachAsync() for chunking tasks and using fewer threads.
One more thing we may encounter when trying to parallelize the tasks is the usage of the Parallel.ForEach() method.
Pitfalls to Avoid With Parallel.ForEach
The Parallel.ForEach() method, while similar to Parallel.ForEachAsync(), lacks the designed capability to handle asynchronous work. However, we can still encounter some examples of its usage with asynchronous tasks.
So let’s quickly check on why these approaches may not be the best workarounds and see their drawbacks.
One common thing we can see is forcing awaiting the result in synchronous code by using GetAwaiter().GetResult():
Parallel.ForEach(Enumerable.Range(0, 3), parallelOptions, (index, _) =>
{
var result = AsyncMethod().GetAwaiter().GetResult();
resultBag.Add(result);
});
We should avoid this approach since by using GetAwaiter().GetResult() we block the calling thread, which is an anti-pattern of async/await. This may cause issues in deadlocks, decreased performance, and loss of context-switching benefits.
Another approach involves using async void:
Parallel.ForEach(Enumerable.Range(0, 3), parallelOptions, async (index, _) =>
{
var result = await AsyncMethod();
resultBag.Add(result);
});
In this approach, we have another anti-pattern, and that is the usage of async/void. This is a known bad practice with several reasons to avoid it. One such reason is that we cannot catch exceptions in the catch block.
As we can see, both of these approaches involve the use of anti-patterns to make Parallel.ForEach() them compatible with asynchronous methods. Since neither of them is a recommended way to implement parallelization, with the introduction of Parallel.ForEachAsync() in .NET 6 we have a preferable method for working with async tasks in a for-each loop.
Now that we took a look at what not to do, let’s sum up everything we’ve learned so far!
When to Use Which Approach?
As with everything in programming, how we use the knowledge from this article depends on the application’s specific requirements. Nevertheless, when choosing the right method, we should consider several factors.
When talking about CPU-bound tasks that can benefit from parallelization, the use of
Parallel.ForEachAsync()
stands out. Its main benefit is that it efficiently distributes the workload across multiple processor cores. Also, by setting the MaxDegreeOfParallelism we control the concurrency level we want to impose. And as we saw we can easily determine that value.On the other hand, when dealing with I/O-bound tasks, where operations involve waiting for external resources,
Task.WhenAll()
becomes a preferable choice. It allows us to execute multiple asynchronous tasks concurrently, without blocking the calling thread. This makes it an efficient option for scenarios like database queries or network requests. Another benefit is that we don’t need to process results inside the loop, but we can wait on all of them and manipulate the results when they are complete.
However, it’s important to note that Task.WhenAll() lacks a built-in partitioner, and its use in a loop without proper throttling mechanisms may result in the initiation of an infinite number of tasks. So depending on the number of tasks we are executing it may be necessary to create our partition strategy or opt for Parallel.ForEachAsync()
a solution.
One more thing we mentioned is initializing tasks using Task.Run()
. We can use this approach when we want to have explicit control over threading but keep in mind that it can potentially lead to thread pool starvation if too many threads start at once.
Conclusion
In this article, we look at two methods we use to execute repetitive tasks in parallel. We saw how both methods under the hood use threads and partition the given tasks. Also, we saw what are the differences when using the Task.Run() and how it behaves with both options. Lastly, we provide guidance on which approach is most suitable in different scenarios.
https://code-maze.com/csharp-parallel-foreachasync-and-task-run-with-when-all/?ref=dailydev
Top comments (0)