How to use System.IO.Pipelines and System.Threading.Channels APIs to speed up processing
This post is kind of the continuation of my previous post series: Evaluating “ReadLine using System.IO.Pipelines” Performance in C#.
When I wrote those posts, I was in the need of processing a huge text file (hundreds of thousands of lines), read, parse and transform line by line, and then finally save each of them as a text file. Yes, it will end up a ton of files being created!
I was able to speed up the reading time using System.IO.Pipelines APIs as described in the previous post. The result? 10 minutes faster processing time! 🚀
I could have stopped there. But I didn’t. I recalled the new kids on the block: System.Threading.Channels. See Stephen Toub’s excellent post “An Introduction to System.Threading.Channels” for more information.
An Introduction to System.Threading.Channels | .NET Blog
Think of the reading line by line as the Producer and the line processing part as Consumer. The idea is, to produce as fast as possible using System.IO.Pipelines and consume it— spread the workload concurrently, asynchronously, without blocking like BlockingCollection does.
BlockingCollection involves blocking; there are no task-based APIs to perform work asynchronously. Channels is all about asynchrony; there are no synchronously-blocking APIs — Stephen Toub.
Okay, enough talking. Show me the code!
Base Implementation
Let’s start from the base implementation (i.e., before using System.Threading.Channels APIs); the code is simply adding each line processing method to the Task list and await on all of them.
Note that code on this post is slightly adjusted for benchmarks purpose; eliminating unrelated parts as much as possible.
[Benchmark(Baseline = true)] | |
public async Task ProcessTasksAsync() | |
{ | |
while (true) | |
{ | |
ReadResult result = await _reader.ReadAsync().ConfigureAwait(false); | |
ReadOnlySequence<byte> buffer = result.Buffer; | |
AddToTaskList(ref buffer); | |
_reader.AdvanceTo(buffer.Start, buffer.End); | |
if (result.IsCompleted) break; | |
} | |
await Task.WhenAll(_tasks).ConfigureAwait(false); | |
await _reader.CompleteAsync().ConfigureAwait(false); | |
} | |
[MethodImpl(MethodImplOptions.AggressiveInlining)] | |
private void AddToTaskList(ref ReadOnlySequence<byte> buffer) | |
{ | |
string str = null; | |
if (buffer.IsSingleSegment) | |
{ | |
var span = buffer.FirstSpan; | |
int consumed; | |
while (span.Length > 0) | |
{ | |
var newLine = span.IndexOf(NewLine); | |
if (newLine == -1) break; | |
var line = span.Slice(0, newLine); | |
str = Encoding.UTF8.GetString(line); | |
// add to Task list | |
_tasks.Add(ProcessLineCoreAsync(new MyData { Content = str, No = ++_counter })); | |
consumed = line.Length + NewLine.Length; | |
span = span.Slice(consumed); | |
buffer = buffer.Slice(consumed); | |
} | |
} | |
else | |
{ | |
var sequenceReader = new SequenceReader<byte>(buffer); | |
while (!sequenceReader.End) | |
{ | |
while (sequenceReader.TryReadTo(out ReadOnlySequence<byte> line, NewLine)) | |
{ | |
str = Encoding.UTF8.GetString(line); | |
// add to Task list | |
_tasks.Add(ProcessLineCoreAsync(new MyData { Content = str, No = ++_counter })); | |
} | |
buffer = buffer.Slice(sequenceReader.Position); | |
sequenceReader.Advance(buffer.Length); | |
} | |
} | |
} |
We can see the task list growing as we process the line.

Channel Writer (Producer)
Here is the producer part.
[Benchmark] | |
public async Task ProcessTasksUsingChannelAsync() | |
{ | |
while (true) | |
{ | |
ReadResult result = await _reader.ReadAsync().ConfigureAwait(false); | |
ReadOnlySequence<byte> buffer = result.Buffer; | |
WriteToChannel(ref buffer); | |
_reader.AdvanceTo(buffer.Start, buffer.End); | |
if (result.IsCompleted) break; | |
} | |
// mark the channel as being complete, meaning no more items will be written to it. | |
_channel.Writer.TryComplete(); | |
// await the Task that completes when no more data will ever be available to be read from this channel. | |
await _channel.Reader.Completion.ConfigureAwait(false); | |
// wait the ProcessLineCoreAsync to finish | |
await Task.WhenAll(_channelReaderTasks).ConfigureAwait(false); | |
await _reader.CompleteAsync().ConfigureAwait(false); | |
} | |
[MethodImpl(MethodImplOptions.AggressiveInlining)] | |
private void WriteToChannel(ref ReadOnlySequence<byte> buffer) | |
{ | |
string str = null; | |
if (buffer.IsSingleSegment) | |
{ | |
var span = buffer.FirstSpan; | |
int consumed; | |
while (span.Length > 0) | |
{ | |
var newLine = span.IndexOf(NewLine); | |
if (newLine == -1) break; | |
var line = span.Slice(0, newLine); | |
str = Encoding.UTF8.GetString(line); | |
// write to the channel | |
_ = _channel.Writer.WriteAsync(new MyData { Content = str, No = ++_counter }); | |
consumed = line.Length + NewLine.Length; | |
span = span.Slice(consumed); | |
buffer = buffer.Slice(consumed); | |
} | |
} | |
else | |
{ | |
var sequenceReader = new SequenceReader<byte>(buffer); | |
while (!sequenceReader.End) | |
{ | |
while (sequenceReader.TryReadTo(out ReadOnlySequence<byte> line, NewLine)) | |
{ | |
str = Encoding.UTF8.GetString(line); | |
// write to the channel | |
_ = _channel.Writer.WriteAsync(new MyData { Content = str, No = ++_counter }); | |
} | |
buffer = buffer.Slice(sequenceReader.Position); | |
sequenceReader.Advance(buffer.Length); | |
} | |
} | |
} |
To help spot the changes quickly, here is the diff:

Should be pretty easy to spot the differences.
Channel Reader (Consumer)
Finally, the consumer part.
private const int ChannelReaderCount = 3; | |
public override void IterationSetup() | |
{ | |
// ... | |
_channel = Channel.CreateUnbounded<MyData>(new UnboundedChannelOptions() { SingleReader = ChannelReaderCount == 1 }); | |
for (var i = 0; i < ChannelReaderCount; i++) | |
{ | |
_channelReaderTasks.Add(DoProcessLine(async (s) => _ = await ProcessLineCoreAsync(s).ConfigureAwait(false))); | |
} | |
} | |
private async Task DoProcessLine(Func<MyData, Task<string>> func) | |
{ | |
var channelReader = _channel.Reader; | |
await foreach (var item in channelReader.ReadAllAsync().ConfigureAwait(false)) | |
{ | |
_ = await func(item).ConfigureAwait(false); | |
} | |
} |
Here I defined 3 channel readers and set SingleReader to false (by evaluating ChannelReaderCount == 1). This way, we will have 3 consumers that will process the line concurrently.
This can be observed from the Visual Studio Parallel Stacks window.

Tune this value and measure until you get the best performance. Start small, increment the value until you reach to the point where it will give you slower results; it’s the point where you have too many active Task resources, possibly too many context switches.
Benchmarks Result
Okay, let’s see the benchmarks result.
*Wow. Almost 2x faster! * 🚀🚀🚀
Here is the gist version:
BenchmarkDotNet=v0.12.1, OS=Windows 10.0.20215
Intel Core i5-9400F CPU 2.90GHz (Coffee Lake), 1 CPU, 6 logical and 6 physical cores
.NET Core SDK=5.0.100-rc.1.20452.10
[Host] : .NET Core 5.0.0 (CoreCLR 5.0.20.45114, CoreFX 5.0.20.45114), X64 RyuJIT
Job-SGYLYY : .NET Core 5.0.0 (CoreCLR 5.0.20.45114, CoreFX 5.0.20.45114), X64 RyuJIT
InvocationCount=1 UnrollFactor=1
Method | LineNumber | LineCharMultiplier | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|---|---|
ProcessTasksAsync | 400000 | 15 | 506.9 ms | 9.60 ms | 8.51 ms | 1.00 | 0.00 | 36000.0000 | 11000.0000 | 3000.0000 | 200.91 MB |
ProcessTasksUsingChannelAsync | 400000 | 15 | 279.1 ms | 10.36 ms | 29.90 ms | 0.55 | 0.06 | 47000.0000 | 21000.0000 | 6000.0000 | 251.62 MB |
That’s the benchmarks result. What about my case? Well, I saved another 10 minutes, so about 20 minutes faster in total!
Source Code
You can find the source code in my GitHub repository, branch: pipelines-and-channels.
Conclusion
If you have huge text files in size containing hundreds of thousands of lines to be processed, consider to use System.IO.Pipelines for reading and parsing the lines, and combine it with System.Threading.Channels APIs to spread the workload concurrently, asynchronously.
Top comments (1)
Great, thank you