<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Anish Jain</title>
    <description>The latest articles on DEV Community by Anish Jain (@anishjain94).</description>
    <link>https://dev.to/anishjain94</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F549111%2F4960ddea-74d3-4973-8257-39d042f68d3c.jpeg</url>
      <title>DEV Community: Anish Jain</title>
      <link>https://dev.to/anishjain94</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/anishjain94"/>
    <language>en</language>
    <item>
      <title>Building a Scalable SQS Consumer in Go</title>
      <dc:creator>Anish Jain</dc:creator>
      <pubDate>Sun, 24 Nov 2024 19:17:44 +0000</pubDate>
      <link>https://dev.to/anishjain94/building-a-scalable-sqs-consumer-in-go-1leg</link>
      <guid>https://dev.to/anishjain94/building-a-scalable-sqs-consumer-in-go-1leg</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;When building distributed systems, message queues like Amazon SQS play a crucial role in handling asynchronous workloads. In this post, I'll share my experience implementing a robust SQS consumer in Go that handles user registration events for Keycloak. The solution uses the fan-out/fan-in concurrency pattern to process messages efficiently without overwhelming system resources.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Challenge
&lt;/h2&gt;

&lt;p&gt;I faced an interesting problem: process around 50,000 SQS events daily to register users in Keycloak. A naive approach might spawn a new goroutine for each message, but this could quickly lead to resource exhaustion. We needed a more controlled approach to concurrency.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why Fan-out/Fan-in?
&lt;/h2&gt;

&lt;p&gt;The fan-out/fan-in pattern is perfect for this use case because it:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Maintains a fixed pool of worker goroutines&lt;/li&gt;
&lt;li&gt;Distributes work evenly across workers&lt;/li&gt;
&lt;li&gt;Prevents resource exhaustion&lt;/li&gt;
&lt;li&gt;Provides better control over concurrent operations&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Implementation Deep Dive
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. The Consumer Structure
&lt;/h3&gt;

&lt;p&gt;First, let's look at our basic consumer structure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Client&lt;/span&gt;    &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;sqs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Client&lt;/span&gt;
    &lt;span class="n"&gt;QueueName&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  2. Message Processing Pipeline
&lt;/h3&gt;

&lt;p&gt;The implementation consists of three main components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Message Receiver&lt;/strong&gt;: Continuously polls SQS for new messages&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Worker Pool&lt;/strong&gt;: Fixed number of goroutines processing messages&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Message Channel&lt;/strong&gt;: Connects the receiver to workers&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Here's how we start the consumer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;StartPool&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;requestBody&lt;/span&gt; &lt;span class="n"&gt;any&lt;/span&gt;&lt;span class="p"&gt;](&lt;/span&gt;
    &lt;span class="n"&gt;serviceFunc&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dto&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;requestBody&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

    &lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Background&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;params&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;sqs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ReceiveMessageInput&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;MaxNumberOfMessages&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;QueueUrl&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;           &lt;span class="n"&gt;aws&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;QueueName&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;WaitTimeSeconds&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="m"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;VisibilityTimeout&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;  &lt;span class="m"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;MessageAttributeNames&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;QueueAttributeNameAll&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;msgCh&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="nb"&gt;make&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;wg&lt;/span&gt; &lt;span class="n"&gt;sync&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WaitGroup&lt;/span&gt;

    &lt;span class="c"&gt;// Start worker pool first&lt;/span&gt;
    &lt;span class="n"&gt;startPool&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msgCh&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;wg&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;serviceFunc&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c"&gt;// Then start receiving messages&lt;/span&gt;
    &lt;span class="c"&gt;// ... rest of the implementation&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  3. Key Configuration Parameters
&lt;/h3&gt;

&lt;p&gt;Let's examine the crucial SQS configuration parameters:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;MaxNumberOfMessages (10)&lt;/strong&gt;: Batch size for each poll&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;WaitTimeSeconds (20)&lt;/strong&gt;: Long polling duration&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;VisibilityTimeout (30)&lt;/strong&gt;: Grace period for message processing&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  4. Worker Pool Implementation
&lt;/h3&gt;

&lt;p&gt;The worker pool is where the fan-out pattern comes into play:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;startPool&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;requestBody&lt;/span&gt; &lt;span class="n"&gt;any&lt;/span&gt;&lt;span class="p"&gt;](&lt;/span&gt;
    &lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;msgCh&lt;/span&gt; &lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="n"&gt;types&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;wg&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;sync&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WaitGroup&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;serviceFunc&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dto&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;requestBody&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

    &lt;span class="n"&gt;processingMessages&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;sync&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Map&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;

    &lt;span class="c"&gt;// Start 10 workers&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="n"&gt;worker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msgCh&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;wg&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;processingMessages&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;serviceFunc&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  5. Duplicate Message Handling
&lt;/h3&gt;

&lt;p&gt;We use a &lt;code&gt;sync.Map&lt;/code&gt; to prevent processing duplicate messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;loaded&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;processingMessages&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;LoadOrStore&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MessageId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;loaded&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;wg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Done&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;continue&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Best Practices and Learnings
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Error Handling&lt;/strong&gt;: Always handle errors gracefully and log them appropriately&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Message Cleanup&lt;/strong&gt;: Delete messages only after successful processing&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Graceful Shutdown&lt;/strong&gt;: Implement proper shutdown mechanisms using context&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Monitoring&lt;/strong&gt;: Add logging at key points for observability&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Performance Considerations
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Worker Count&lt;/strong&gt;: Choose based on your workload and available resources&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Batch Size&lt;/strong&gt;: Balance between throughput and processing time&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Visibility Timeout&lt;/strong&gt;: Set according to your average processing time&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Future Improvements
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Dynamic Worker Scaling&lt;/strong&gt;: Adjust worker count based on queue depth&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Circuit Breaker&lt;/strong&gt;: Add circuit breaker for downstream services&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Metrics Collection&lt;/strong&gt;: Add Prometheus metrics for monitoring&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Dead Letter Queue&lt;/strong&gt;: Implement DLQ handling for failed messages&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Retries&lt;/strong&gt;: Add exponential backoff for transient failures&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;The fan-out/fan-in pattern provides an elegant solution for processing high-volume SQS messages in Go. By maintaining a fixed worker pool, we avoid the pitfalls of unbounded goroutine creation while ensuring efficient message processing.&lt;/p&gt;

&lt;p&gt;Remember to always consider your specific use case when implementing such patterns. The configuration values shown here (worker count, timeout values, etc.) should be adjusted based on your requirements and resource constraints.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Source code&lt;/strong&gt;: [Link to your repository if available]&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Tags&lt;/strong&gt;: #golang #aws #sqs #concurrency #distributed-systems&lt;/p&gt;

</description>
      <category>go</category>
      <category>goroutines</category>
      <category>sqs</category>
    </item>
  </channel>
</rss>
