Introduction
When building distributed systems, message queues like Amazon SQS play a crucial role in handling asynchronous workloads. In this post, I'll share my experience implementing a robust SQS consumer in Go that handles user registration events for Keycloak. The solution uses the fan-out/fan-in concurrency pattern to process messages efficiently without overwhelming system resources.
The Challenge
I faced an interesting problem: process around 50,000 SQS events daily to register users in Keycloak. A naive approach might spawn a new goroutine for each message, but this could quickly lead to resource exhaustion. We needed a more controlled approach to concurrency.
Why Fan-out/Fan-in?
The fan-out/fan-in pattern is perfect for this use case because it:
- Maintains a fixed pool of worker goroutines
- Distributes work evenly across workers
- Prevents resource exhaustion
- Provides better control over concurrent operations
Implementation Deep Dive
1. The Consumer Structure
First, let's look at our basic consumer structure:
type Consumer struct {
Client *sqs.Client
QueueName string
}
2. Message Processing Pipeline
The implementation consists of three main components:
- Message Receiver: Continuously polls SQS for new messages
- Worker Pool: Fixed number of goroutines processing messages
- Message Channel: Connects the receiver to workers
Here's how we start the consumer:
func StartPool[requestBody any](
serviceFunc func(c context.Context, dto *requestBody) error,
consumer *Consumer) {
ctx := context.Background()
params := &sqs.ReceiveMessageInput{
MaxNumberOfMessages: 10,
QueueUrl: aws.String(consumer.QueueName),
WaitTimeSeconds: 20,
VisibilityTimeout: 30,
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
}
msgCh := make(chan types.Message)
var wg sync.WaitGroup
// Start worker pool first
startPool(ctx, msgCh, &wg, consumer, serviceFunc)
// Then start receiving messages
// ... rest of the implementation
}
3. Key Configuration Parameters
Let's examine the crucial SQS configuration parameters:
- MaxNumberOfMessages (10): Batch size for each poll
- WaitTimeSeconds (20): Long polling duration
- VisibilityTimeout (30): Grace period for message processing
4. Worker Pool Implementation
The worker pool is where the fan-out pattern comes into play:
func startPool[requestBody any](
ctx context.Context,
msgCh chan types.Message,
wg *sync.WaitGroup,
consumer *Consumer,
serviceFunc func(c context.Context, dto *requestBody) error) {
processingMessages := &sync.Map{}
// Start 10 workers
for i := 0; i < 10; i++ {
go worker(ctx, msgCh, wg, consumer, processingMessages, serviceFunc)
}
}
5. Duplicate Message Handling
We use a sync.Map
to prevent processing duplicate messages:
if _, loaded := processingMessages.LoadOrStore(*msg.MessageId, true); loaded {
wg.Done()
continue
}
Best Practices and Learnings
- Error Handling: Always handle errors gracefully and log them appropriately
- Message Cleanup: Delete messages only after successful processing
- Graceful Shutdown: Implement proper shutdown mechanisms using context
- Monitoring: Add logging at key points for observability
Performance Considerations
- Worker Count: Choose based on your workload and available resources
- Batch Size: Balance between throughput and processing time
- Visibility Timeout: Set according to your average processing time
Future Improvements
- Dynamic Worker Scaling: Adjust worker count based on queue depth
- Circuit Breaker: Add circuit breaker for downstream services
- Metrics Collection: Add Prometheus metrics for monitoring
- Dead Letter Queue: Implement DLQ handling for failed messages
- Retries: Add exponential backoff for transient failures
Conclusion
The fan-out/fan-in pattern provides an elegant solution for processing high-volume SQS messages in Go. By maintaining a fixed worker pool, we avoid the pitfalls of unbounded goroutine creation while ensuring efficient message processing.
Remember to always consider your specific use case when implementing such patterns. The configuration values shown here (worker count, timeout values, etc.) should be adjusted based on your requirements and resource constraints.
Source code: [Link to your repository if available]
Tags: #golang #aws #sqs #concurrency #distributed-systems
Top comments (0)