DEV Community

Dansh
Dansh

Posted on

In short. Concurrent collections.

Virtually all applications today are multi-threaded. Thankfully, in a lot of cases the complexity of managing the data flows between the multiple threads is hidden and you do not need to worry about it. But in some cases you venture out into the tasks where you need to know how to manage the data that may be created/updated/accessed by multiple threads. This article is meant to provide a concise overview of collections that can help you solve your parallel needs and cases where it is best to use other means.

Table of contents:

  1. Focus on the task
    1. Simple data-parallel operation
    2. Producer-consumer
      1. ConcurrentQueue, ConcurrentStack, ConcurrentBag
      2. BlockingCollection and Channel
      3. Task pipelines
    3. Cache (for multiple threads on the same machine)
    4. Shared state
  2. Diagrams
  3. Table of collection comparison

Focus on the task

Depending on the task you are trying to solve the instruments can be different. The typical tasks that programmers try to resolve with concurrent collections can be listed as follows:

  1. Scatter-gather
  2. Producer-consumer
    • Normal case
    • Task pipeline
  3. Cache (for multiple threads on the same machine)
  4. Shared state

🔵
This list is by no means exclusive, feel free do add something in the comments.

With the quantity of instruments to solve the tasks, a lot of developers may just take the most popular one and go along with it. But this is not advisable, since it can both degrade the performance and make you once again develop a solution that have long existed before you.


Simple data-parallel operation

The simplified overall description of this type of issues is the following. You have a data collection ready. In case of embarrassingly parallel problem each item of it can be processed independently, the results of the processing need to be collected back. In more difficult cases, there may be contention over some resources. (The opposite case, where processing of the elements depends on the results of previous calculations is called inherently serial problem.)

In most cases you can solve this by using existing instruments, in some cases even avoiding using concurrent collections. For example, PLINQ provides a lot of solutions out of the box:

  1. If the order needs to be preserved - .AsOrdered method

    public static void ContrastTransformation(IEnumerable<Frame> Movie)
    {
        var ProcessedMovie =
        Movie
        .AsParallel()
        .AsOrdered()
        .Select(frame => AdjustContrast(frame));
    
        foreach (var frame in ProcessedMovie)
        {
            // Movie frames will be evaluated lazily
        }
    }
    
  2. Collection is not materialized, or is too long to materialize - just use PLINQ on it with no issues (e.g. as it is done in example above)

  3. Collection items can be processed in a very different amount of time. In case you divided the work items between threads statically it will lead to some threads freeing up much earlier, while some processing much more. In that case you can specify a chunk partitioner, or even create your custom partitioner

    // Arrays and collection which implement ILIst 
    // by default use static partitioning
    // which can divide work very unevenly between threads 
    // if the processing time varies widely
    var nums = source.ToArray();
    // true = create a load-balancing partitioner. 
    // false = use static partitioning.
    Partitioner<int> partitioner = Partitioner.Create(nums, true);
    // The partitioner is the query's data source.
    var result = partitioner.AsParallel().Select(n=>SomeOperation(n)).ToArray();
    
    
  4. Zip several collections - .Zip method

    var first = a.AsParallel()
                            .AsOrdered()
                            .Select(element => ComputeA(element));
    var second = b.AsParallel()
                            .AsOrdered()
                            .Select(element => ComputeB(element));
    
    return first.Zip(second, (e1, e2) => Combine(e1, e2));                      
    
    
  5. Built-in cancellation token support

Parallel provides some very useful perks too:

  1. The enumeration of the collection should be stopped if condition reached - ParallelLoopState.Break / ParallelLoopState.Stop

    var loopResult = Parallel.ForEach(source,
    (curValue, loopState, curIndex) =>
    {
            if (curValue.Equals(match))
            {
                    loopState.Break();
            }
    
            // other logic
    });
    
  2. A resource is needed for each iteration, but is expensive to initialize / garbage collect each time (e.g. WebClient / HttpClient etc) - localInit / localFinally delegates provide an option to initialize it once per thread

    Parallel.ForEach(urls,
        () => new WebClient(), // WebClient is initialized once per thread used 
        (url, loopstate, index, webclient) =>
        {
            webclient.DownloadFile(url, tempFiles[index]);
            return webclient;
        },
        (webclient) => { });
    
  3. If aggregation only outputs 1 result and allows changing the order, the logic is too long / complicated for PLINQ - you can collect the result separately for each task and aggregate the results for all tasks using the localFinally (don’t forget the synchronization, since multiple localFinally can be executed at once). It minimizes the need for synchronization to only the last execution of each task.

    Parallel.ForEach(source,
    () => new List<R>(),
    (element, loopstate, localAggregate) =>
    {
        // some complex aggregation logic here
        return localAggregate;
    },
    (localAggregate) =>
    {
        // localFinally executed once per thread, but it still may create some contention
        // it is important to remember to synchronize access to shared resources here
        lock(myLock)
        {
            // aggregate results ({localAggregate}s) into finalAggregate
        };
    });
    
  4. For collections with uneven processing time Parallel also allows changing partitioner

    var nums = source.ToArray();
    Partitioner<int> partitioner = Partitioner.Create(nums, true);
    Parallel.ForEach(partitioner, (range, loopState) =>
    {
    // Some operation
    });
    
  5. In case items require asynchronous processing, Parallel has .ForEachAsync / .ForAsync method groups

    await Parallel.ForEachAsync(userHandlers, options, async (uri, cancellationToken) =>
    {
    var user = await client.GetFromJsonAsync<User>(uri, cancellationToken);
    await Process(user);
    });
    
  6. Built-in cancellation token support


Producer-consumer

Several threads produce the data, and several threads consume it at the same time. This pattern is very common, so C# has a lot of tools to handle it, in most cases you should just choose one correctly.

To store the data items produced by producer before they are consumed you need some sort of collection. At this point it is worth to consider if it is possible that the nature of your task may lead to growing of this collection to the unacceptable size. It is entirely possible if Producers outrun Consumers for a long time.

There are three possible answers to this question:

  1. Yes, it is possible - consider using BlockingCollection or BoundedChannel with the limits set up.
  2. Not right now, but in the near future it is possible - BlockingCollection or BoundedChannel, UnboundedChannel are good options. The type of the field for the collection can remain unchanged in such case, which saves you a lot of code modification. (UnboundedChannel and BoundedChannel both can be cast to Channel type, BlockingCollection doesn’t change a type at all, only arguments in constructor).
  3. No, and not in the near future - you can use simpler collections, like ConcurrentQueue, ConcurrentStack, ConcurrentBag. Or BlockingCollection/Channel if you prefer their API. (You can refer to the table at the bottom of the article for a direct comparison).

If standard instruments are not enough, you can also look into AsyncEx library made by Stephen Cleary, it includes several useful collections, e.g. AsyncCollection - an async alternative to BlockingCollection.

(Strictly speaking you can also solve it by DataFlow or Rx, but DataFlow is a tool for creating complex task pipelines and Rx is a tool for processing the stream of events)


ConcurrentQueue, ConcurrentStack, ConcurrentBag

ConcurrentQueue and ConcurrentStack (as it can be seen from the name) are thread-safe analogues to Queue and Stack. However, despite misconceptions, ConcurrentBag is not an analogue to HashSet/List/etc. In fact, it designed with a very specific case in mind.

ConcurrentBag stores items in separate buckets for each thread, this makes it extremely fast in case most threads both produce and consume at separate stages of their lives. If used in case when each thread is pure producer or pure consumer - it is slower then ConcurrentQueue or ConcurrentStack [1].

Another important consideration is performance benefit of using concurrent collections compared to the regular collections with external lock. In the case of ConcurrentStack and ConcurrentQueue Microsoft basically suggests to test it out, because performance depends on the processing duration of items, number of threads producing/consuming, the model of access by threads - can a thread be both producer and consumer at different times [1]. If you are not sure how a thread can be both producer and consumer at different times - here’s an example from Microsoft.


BlockingCollection and Channel

BlockingCollection and Channel are thought-through and devised specifically for use in Producer-Consumer pattern. Their most important feature is ability to limit the size of the collection and pause the threads that attempt to add further items or attempt to read from an empty collection. This features are extremely important in case when the amount of data is unlimited and producers / consumers may differ in processing speed.

How to choose between BlockingCollection and Channel? Let’s see the unique capabilities of each of them.

BlockingCollection:

  1. Can change underlying collection. It means that:

    1. The result order can be changed from ‘vaguely FIFO’ (ConcurrentQueue) to ‘vaguely LIFO’ (ConcurrentStack) to ‘priority-based’ (using your custom collection implementing IProducerConsumerCollection).

      var backingCollection = new ConcurrentStack<string>();
      // Using ConcurrentStack as backing collection instead of default ConcurrentQueue
      // Limiting the max number of elements to 1000
      var blockingCollection = new BlockingCollection<string>(backingCollection, 1000);
      
    2. You can create a background collection backed by an actual queue (like Kafka) by creating your custom implementation of IProducerConsumerCollection.

    3. In specific case when most threads are expected to produce and consume the results of their work on different stages of their lifecycle, you can use ConcurrentBag as a backing collection, since it is optimized specifically for this case and minimized the probability of contention.

  2. You can look though the snapshot of data items without consuming them (BlockingCollection implements IEnumerable for that). Although it works for default backing collections - ConcurrentQueue, ConcurrentStack, ConcurrentBag. Whether it will work for your custom backing collection - depends on your implementation.

    var bc = new BlockingCollection<int>();
    // some logic here
    //...
    
    // here we created a snapshot and are enumerating it 
    // it will not reflect the changes that may have happened after a start of the enumeration
    foreach (var i in bc)
    {
        Console.WriteLine(i);
    }
    
  3. An array of BlockingCollections provides a very useful set of extension methods - TakeFromAny / AddToAny.

    
    public static void ProducerMethod()
    {
            // ...
            Collection.Add(idA);
            // ...
            if(!Collection.TryAdd(idB))
            {
                // ...
            }
    
            // ...
            if(flag)
            {
                    Collection.CompleteAdding();
            }
    }
    
    public static void ConsumerMethodFirstType()
    {
            foreach (var id in Collection.GetConsumingEnumerable())
          {
              Console.WriteLine(id);
          }
    }
    
    public static void ConsumerMethodSecondType()
    {
            while (!Collection.IsCompleted)
        {
                if(!Collection.TryTake(out int id))
                {
                     //...
                }
    
                //...
        }
    }
    

Channel:

  1. Supports ‘asynchronous locking’ for the cases of trying to add to full channel / trying to read empty channel.

    // First option
    while (await idChannelReader.WaitToReadAsync())
    {
        while (idChannelReader.TryRead(out int id))
        {
            // process the id
        }
    }
    
    //Second option, with IAsyncEnumerable
    await foreach (int id in idChannelReader.ReadAllAsync())
    {
        // process the id
    }
    
  2. Has optimizations for single producer / single consumer cases. (As of .NET 8 there are optimizations only for the case of Unbounded Channel + Single reader)

    var unboundedChannel = Channel.CreateUnbounded<int>(
                new UnboundedChannelOptions()
                {
                    SingleReader = true,
                    SingleWriter = false
                });
    
  3. Probably the most powerful feature of channels - it has options for the case of ‘overflow’ of items. You can even specify a delegate that will be executed for the items that are discarded.

    var itemDroppedAction = new Action<int>(_ => { });
            var channel = Channel.CreateBounded<int>(
                new BoundedChannelOptions(10)
                {
                    // Wait - default, DropOldest, DropNewest,DropWrite
                    FullMode = BoundedChannelFullMode.DropOldest, 
                    SingleReader = false,
                    SingleWriter = false
                },
                itemDroppedAction);
    

Task pipelines

This article is about concurrent collections, so we will not spend a lot of time on this point. Let me just say: if you want to use concurrent collections to implement a task pipeline - you better be sure that the wheel that you are coding is better then DataFlow or Rx.


Cache (for multiple threads on the same machine)

There are different cases for cache in your application. Our first assumption is that calculating / retrieving items is fairly expensive (otherwise - would we really need a cache for them). Second - items do not expire, since it will be hard to develop the strategy for that case without knowing the specifics.

  1. The amount of items to cache is limited, (at least some) items are rarely needed:

    In this case you can get by with just using ConcurrentDictionary from several threads.

  2. The amount of items to cache is limited, (practically all) items are often needed:

    Consider initializing them once at startup, putting them in FrozenDictionary. ConcurrentDictionary still has overhead over normal Dictionary. FrozenDictionary has even faster access than Dictionary, and (as items do not change/expire by our assumption), a marginally slower initialization time will only influence the application once.

  3. The amount of items to cache is not limited or extremely large:

    Since the amount of memory is not unlimited, some sort of cache eviction policy would be needed.

    1. Some items are much more popular than other:

      LFU cache policy is probably the best option

    2. All items have practically the same probability of being used, but if item is used once it has high probability of being used several times soon:

      LRU / FIFO cache policies are recommended depending on the case.

    3. All items have practically the same probability of being used, the item used several times is just a fluke:

      Random replacement strategy can be an option, but more likely, more study of data needed. Consider experimenting with different strategies.

    It is also worth to say, you can find some implementations for local collections with such policies. However, it is worth to consider if you need an actual cache, like Redis or Memcached.


Shared state

While shared state case may be similar to cache case, it is worth to look at it as a superset of the cache. Shared state, for example, may include shared settings for multiple threads.

Lets look at the shared settings case in a bit more detail. At first glance it may look like you can store them in a ConcurrentDictionary, the same as cache. But it is not so. Settings, unlike the cache, may be changed together - one action may change two or more settings at a time. ConcurrentDictionary does not provide instruments for doing so atomically, so you create a possibility for race condition in such case.
A much better solution would be to create a static field of an immutable record type. This way your modifications can be atomic.

using System;
using System.Threading;

// record class with init access to fields
public record Settings(NetworkStrategy Network, int ThreadLimit); 

public static class Program
{
    // Static field to hold the current settings
    private static Settings _currentSettings = new Settings(NetworkStrategy.Greedy, 10);

    // Method to safely update the settings
    public static void UpdateSettings(Func<Settings, Settings> updateFunc)
    {
        Settings original, updated;

        do
        {
            // Capture the current settings
            original = _currentSettings;

            // Generate the updated settings based on the original
            updated = updateFunc(original);

            // Try to update _currentSettings using Interlocked.CompareExchange
            // If _currentSettings hasn't changed, it gets updated to `updated`.
        } while (Interlocked.CompareExchange(ref _currentSettings, updated, original) != original);
    }

    public static void ResetDefault()
    {
            // ...
              UpdateSettings(current => current with { Network = NetworkStrategy.Greedy, ThreadLimit = 10 });
    }

    public static void CheckResourseLimitations()
    {
            // get availible resources and calculate network stategy and thread limit
              UpdateSettings(current => current with { Network = networkStrategy, threadLimit = threadLimit });
    }

    public static void TemporaryBoost(NetworkStrategy desiredStrategy)
    {
         //...
    }

        //...
}

Enter fullscreen mode Exit fullscreen mode

Pay attention to the data type you are using for the static setting field. Do not use structs for the static fields that can be accessed from multiple threads, as it will lead to torn writes. In general, beware of structs as class fields, they can bring a lot of surprising behaviors.


Diagrams

For the purpose of readability, the diagram was spit into 3 parts, each corresponds to the type of task being solved.

The diagrams are color-coded, green corresponds to answer 'yes', red - to answer 'no'.

Simple data-parallel operations

Producer-consumer diagram

Shared state diagram

Table for collection comparison

My fight for the format of the table was unsuccessful. And there is no integration with google doc. So let me share a link to the table.

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay