Reading messages from a queue is usually simple at first.
You create a worker, connect it to the queue, receive some messages, process them, and delete them after a successful execution.
That works well for small scenarios.
But as soon as message volume increases, or the processing step depends on slower resources such as databases, external APIs, file storage, or third-party services, the problem changes.
At that point, reading more messages does not always mean processing better.
Sometimes, it only means moving the bottleneck somewhere else.
In this article, I want to show a practical example using:
- .NET Worker Service
- Amazon SQS
- LocalStack
- Docker
System.Threading.Channels
We will start with a simple worker that consumes messages directly from SQS. Then, we will refactor it using Channel<T> to separate message reading from message processing.
The goal is not to replace SQS with Channel.
SQS remains the external durable queue. Channel is used inside the application as an in-memory queue to help control the internal processing flow.
The Architecture
The final flow will look like this:
SQS / LocalStack
↓
SqsMessageProducer
↓
Channel<Message>
↓
SqsMessageConsumer
↓
MessageProcessingService
↓
DeleteMessageAsync
The idea is simple:
- The producer reads messages from SQS.
- The producer writes messages into a bounded Channel.
- The consumer reads messages from the Channel.
- The consumer processes the message.
- If the processing succeeds, the message is deleted from SQS.
Running SQS Locally with LocalStack
To avoid using a real AWS account for this demo, we can run SQS locally using LocalStack.
Here is the docker-compose.yml file:
services:
localstack:
image: localstack/localstack:4.2.0
container_name: channel-sqs-localstack
ports:
- "4566:4566"
environment:
- SERVICES=sqs
- AWS_DEFAULT_REGION=us-east-1
- DEFAULT_REGION=us-east-1
I am using a fixed image version instead of latest to make the example more reproducible.
Start the container:
docker compose up -d
Create the queue:
docker exec -it channel-sqs-localstack awslocal sqs create-queue --queue-name messages-queue
List the queues:
docker exec -it channel-sqs-localstack awslocal sqs list-queues
Expected output:
{
"QueueUrls": [
"http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/messages-queue"
]
}
Sending Test Messages
Create a script called send-messages.sh:
#!/bin/bash
QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/messages-queue"
for i in {1..10}
do
docker exec -i channel-sqs-localstack awslocal sqs send-message \
--queue-url "$QUEUE_URL" \
--message-body "{\"id\": $i, \"content\": \"Test message $i\"}"
echo "Message $i sent"
done
Make it executable:
chmod +x send-messages.sh
Run it:
./send-messages.sh
Now we have some messages available in the local SQS queue.
Creating the .NET Worker
Create a new Worker Service project:
dotnet new worker -n ChannelSqs.Worker
Add the AWS SQS SDK package:
dotnet add ChannelSqs.Worker/ChannelSqs.Worker.csproj package AWSSDK.SQS
First Version: A Simple Worker
Before introducing Channel, let's start with a direct implementation.
The worker will:
- receive messages from SQS;
- process each message;
- delete each message after successful processing.
Program.cs:
using Amazon.SQS;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<IAmazonSQS>(_ =>
{
var config = new AmazonSQSConfig
{
ServiceURL = "http://localhost:4566",
AuthenticationRegion = "us-east-1"
};
return new AmazonSQSClient("test", "test", config);
});
builder.Services.AddHostedService<Worker>();
var host = builder.Build();
host.Run();
Worker.cs:
using Amazon.SQS;
using Amazon.SQS.Model;
namespace ChannelSqs.Worker;
public class Worker : BackgroundService
{
private const string QueueUrl = "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/messages-queue";
private readonly IAmazonSQS _sqs;
private readonly ILogger<Worker> _logger;
public Worker(
IAmazonSQS sqs,
ILogger<Worker> logger)
{
_sqs = sqs;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Simple worker started.");
while (!stoppingToken.IsCancellationRequested)
{
var response = await _sqs.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = QueueUrl,
MaxNumberOfMessages = 10,
WaitTimeSeconds = 5,
VisibilityTimeout = 30
}, stoppingToken);
var messages = response.Messages ?? new List<Message>();
if (messages.Count == 0)
{
_logger.LogInformation("No messages found.");
continue;
}
foreach (var message in messages)
{
_logger.LogInformation(
"Message received. MessageId: {MessageId} | Body: {Body}",
message.MessageId,
message.Body);
await ProcessMessageAsync(message, stoppingToken);
await _sqs.DeleteMessageAsync(
QueueUrl,
message.ReceiptHandle,
stoppingToken);
_logger.LogInformation(
"Message deleted from SQS. MessageId: {MessageId}",
message.MessageId);
}
}
}
private static async Task ProcessMessageAsync(
Message message,
CancellationToken cancellationToken)
{
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken);
}
}
Run the worker:
dotnet run --project ChannelSqs.Worker
The output should look similar to this:
Simple worker started.
Message received. MessageId: ... | Body: ...
Message deleted from SQS. MessageId: ...
Message received. MessageId: ... | Body: ...
Message deleted from SQS. MessageId: ...
This implementation works.
But it also mixes several responsibilities in the same loop:
- reading from SQS;
- processing the business logic;
- deleting the message;
- controlling the application flow.
For simple scenarios, that is fine.
But when message processing becomes slow, the whole flow becomes tied to that processing time.
This is where Channel<T> becomes useful.
Introducing Channel
Channel<T> is useful for producer/consumer scenarios.
In this example, we will split the worker into two background services:
-
SqsMessageProducer: reads messages from SQS and writes them to the Channel. -
SqsMessageConsumer: reads messages from the Channel and processes them.
The SQS queue remains the external durable queue.
The Channel is only an internal in-memory queue used by the application.
Creating the Shared Channel
Create a class called SqsMessageChannel:
using System.Threading.Channels;
using Amazon.SQS.Model;
namespace ChannelSqs.Worker;
public sealed class SqsMessageChannel
{
private readonly Channel<Message> _channel;
private int _count;
public SqsMessageChannel()
{
_channel = Channel.CreateBounded<Message>(
new BoundedChannelOptions(capacity: 5)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = true,
SingleReader = false
});
}
public ChannelWriter<Message> Writer => _channel.Writer;
public ChannelReader<Message> Reader => _channel.Reader;
public int Count => Volatile.Read(ref _count);
public void Increment() => Interlocked.Increment(ref _count);
public void Decrement() => Interlocked.Decrement(ref _count);
}
Here I am using a bounded Channel with a capacity of 5.
That means the internal queue can hold up to five messages.
When the Channel is full, WriteAsync waits until there is space available again.
That behavior helps us apply backpressure.
Simulating a Bottleneck
To make the behavior easier to observe, let's create a service that simulates slow processing.
using Amazon.SQS.Model;
namespace ChannelSqs.Worker;
public sealed class MessageProcessingService
{
private readonly ILogger<MessageProcessingService> _logger;
public MessageProcessingService(ILogger<MessageProcessingService> logger)
{
_logger = logger;
}
public async Task ProcessAsync(Message message, CancellationToken cancellationToken)
{
_logger.LogInformation(
"Simulated bottleneck: starting message processing. MessageId: {MessageId}",
message.MessageId);
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
_logger.LogInformation(
"Simulated bottleneck: message processing finished. MessageId: {MessageId}",
message.MessageId);
}
}
In a real application, this bottleneck could be:
- a database operation;
- a third-party API call;
- a file upload;
- another queue or integration;
- any resource with limited throughput.
Creating the Producer
The producer reads messages from SQS and writes them to the Channel.
using Amazon.SQS;
using Amazon.SQS.Model;
namespace ChannelSqs.Worker;
public sealed class SqsMessageProducer : BackgroundService
{
private const string QueueUrl = "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/messages-queue";
private readonly IAmazonSQS _sqs;
private readonly SqsMessageChannel _messageChannel;
private readonly ILogger<SqsMessageProducer> _logger;
public SqsMessageProducer(
IAmazonSQS sqs,
SqsMessageChannel messageChannel,
ILogger<SqsMessageProducer> logger)
{
_sqs = sqs;
_messageChannel = messageChannel;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Producer started.");
while (!stoppingToken.IsCancellationRequested)
{
var response = await _sqs.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = QueueUrl,
MaxNumberOfMessages = 10,
WaitTimeSeconds = 5,
VisibilityTimeout = 30
}, stoppingToken);
var messages = response.Messages ?? new List<Message>();
if (messages.Count == 0)
{
_logger.LogInformation("Producer: no messages found.");
continue;
}
foreach (var message in messages)
{
_logger.LogInformation(
"Producer: trying to add message to Channel. MessageId: {MessageId} | CurrentItems: {Count}",
message.MessageId,
_messageChannel.Count);
await _messageChannel.Writer.WriteAsync(message, stoppingToken);
_messageChannel.Increment();
_logger.LogInformation(
"Producer: message added to Channel. MessageId: {MessageId} | CurrentItems: {Count}",
message.MessageId,
_messageChannel.Count);
}
}
}
}
The most important line is this one:
await _messageChannel.Writer.WriteAsync(message, stoppingToken);
If the Channel has space, the message is written immediately.
If the Channel is full, the producer waits.
That is the backpressure point.
Creating the Consumer
The consumer reads messages from the Channel, processes them, and deletes them from SQS after success.
using Amazon.SQS;
using Amazon.SQS.Model;
namespace ChannelSqs.Worker;
public sealed class SqsMessageConsumer : BackgroundService
{
private const string QueueUrl = "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/messages-queue";
private readonly IAmazonSQS _sqs;
private readonly SqsMessageChannel _messageChannel;
private readonly MessageProcessingService _processingService;
private readonly ILogger<SqsMessageConsumer> _logger;
public SqsMessageConsumer(
IAmazonSQS sqs,
SqsMessageChannel messageChannel,
MessageProcessingService processingService,
ILogger<SqsMessageConsumer> logger)
{
_sqs = sqs;
_messageChannel = messageChannel;
_processingService = processingService;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Consumer started.");
await foreach (var message in _messageChannel.Reader.ReadAllAsync(stoppingToken))
{
_messageChannel.Decrement();
try
{
_logger.LogInformation(
"Consumer: message removed from Channel. MessageId: {MessageId} | CurrentItems: {Count}",
message.MessageId,
_messageChannel.Count);
await _processingService.ProcessAsync(message, stoppingToken);
await _sqs.DeleteMessageAsync(
QueueUrl,
message.ReceiptHandle,
stoppingToken);
_logger.LogInformation(
"Consumer: message processed and deleted from SQS. MessageId: {MessageId}",
message.MessageId);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Consumer: error processing message. MessageId: {MessageId}. The message will not be deleted from SQS.",
message.MessageId);
}
}
}
}
The message is deleted only after successful processing.
If processing fails, the message is not deleted. After the visibility timeout expires, it can become visible again in SQS.
Updating Program.cs
Now we no longer register the original Worker class.
Instead, we register the Channel, the processing service, the producer, and the consumer.
using Amazon.SQS;
using ChannelSqs.Worker;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<IAmazonSQS>(_ =>
{
var config = new AmazonSQSConfig
{
ServiceURL = "http://localhost:4566",
AuthenticationRegion = "us-east-1"
};
return new AmazonSQSClient("test", "test", config);
});
builder.Services.AddSingleton<SqsMessageChannel>();
builder.Services.AddSingleton<MessageProcessingService>();
builder.Services.AddHostedService<SqsMessageProducer>();
builder.Services.AddHostedService<SqsMessageConsumer>();
var host = builder.Build();
host.Run();
Both hosted services run in the same process and share the same Channel instance because it is registered as a singleton.
Running the Demo
Send some messages again:
./send-messages.sh
Run the worker:
dotnet run --project ChannelSqs.Worker
You should see logs similar to this:
Producer started.
Consumer started.
Producer: trying to add message to Channel. MessageId: A | CurrentItems: 0
Producer: message added to Channel. MessageId: A | CurrentItems: 1
Producer: trying to add message to Channel. MessageId: B | CurrentItems: 1
Producer: message added to Channel. MessageId: B | CurrentItems: 2
Producer: trying to add message to Channel. MessageId: C | CurrentItems: 2
Producer: message added to Channel. MessageId: C | CurrentItems: 3
Producer: trying to add message to Channel. MessageId: D | CurrentItems: 3
Producer: message added to Channel. MessageId: D | CurrentItems: 4
Producer: trying to add message to Channel. MessageId: E | CurrentItems: 4
Producer: message added to Channel. MessageId: E | CurrentItems: 5
Consumer: message removed from Channel. MessageId: A | CurrentItems: 4
Simulated bottleneck: starting message processing. MessageId: A
Because the Channel capacity is only five and the processing service waits five seconds, the Channel quickly reaches its limit.
When that happens, the producer cannot keep writing indefinitely. It has to wait for the consumer to remove messages from the Channel.
That is the main behavior we wanted to demonstrate.
Channel does not magically make processing faster.
It helps control the flow.
What Changed?
Before Channel:
Worker
├── Read from SQS
├── Process message
└── Delete from SQS
After Channel:
Producer
└── Read from SQS and write to Channel
Channel
└── Hold messages temporarily in memory with bounded capacity
Consumer
├── Read from Channel
├── Process message
└── Delete from SQS
The main improvement is separation of responsibilities.
The producer does not care about business processing.
The consumer does not care about polling SQS.
The Channel sits in the middle and controls how many messages can wait inside the application.
Important Considerations
This approach is useful, but there are important things to keep in mind.
Channel does not replace SQS
Channel is in-memory.
If the application stops, anything that exists only inside the Channel is lost.
SQS is still the durable queue and the source of truth.
Do not delete before processing
A message should be deleted from SQS only after it has been processed successfully.
Deleting before processing can cause data loss.
Configure Visibility Timeout carefully
If processing takes longer than the visibility timeout, the message may become visible again before the current processing attempt finishes.
Design for idempotency
SQS can deliver a message more than once.
Your processing logic should be prepared for duplicate deliveries.
Use a Dead Letter Queue in real scenarios
For production systems, messages that fail repeatedly should be moved to a DLQ after a configured number of attempts.
Conclusion
In this example, we used LocalStack to run SQS locally and built two versions of a .NET Worker.
The first version consumed messages directly from SQS in a single loop.
The second version introduced Channel<T> to separate message reading from message processing.
The main benefit is not automatic performance improvement.
The main benefit is control.
With a bounded Channel, the application can limit how many messages are waiting in memory and apply backpressure when processing is slower than message intake.
SQS remains the external durable queue.
Channel becomes an internal coordination mechanism.
In distributed systems, the goal is not always to consume faster.
Sometimes, the goal is to consume at the right pace.
Top comments (0)