DEV Community

Cover image for How to combine SQS and SNS to implement multiple Consumers (Part 2)
Kevin Lactio Kemta
Kevin Lactio Kemta

Posted on

How to combine SQS and SNS to implement multiple Consumers (Part 2)

Day 020 - 100DaysAWSIaCDevopsChallenge

In my previous article, Why you Shouldn't Use Amazon SQS for multiple Consumers—Choose Amazon SNS Instead! (Part 1), I discussed the differences between Amazon SQS and Amazon SNS from my perspective. The key takeaway was that Amazon SQS is better suited for scenarios where messages need to be processed by a single consumer, whereas Amazon SNS excels at broadcasting notifications to multiple systems simultaneously.

Today, I'll demonstrate how to combine SNS and SQS when you have a message producer that needs to communicate with multiple consumers/subscribers.

Scenario

Let's get straight to the point. The goal of this article is to demonstrate how to create a simple and effective Pub/Sub system. The concept revolves around a basic user registration system. Her's an overview:

  1. A SpringBoot Application exposes an API that allows users to be created.
  2. Once a user is created, a message containing the user's information is sent to an SNS Topic.
  3. Two other applications consume the messages published to this topic throught SQS Queue:
    • the first indexes the user (for future search functionality, e.g: with OpenSearch)
    • the second generates a user code (similar to a PIN) that con be used by the user to access certain services or applications.

Below is the complete scenario illustrated.

+--------------------------+         +------------------------+
|  User Management App     |         |  Amazon SNS Topic      |
|  (Producer)              |-------->|  "UserCreatedTopic"    |
|  Spring Boot             |         |  (Broadcast messages)  |
+--------------------------+         +------------------------+
                                        /           \
                                       /             \
                                      /               \
                  +-----------------------+    +----------------------+
                  |   Amazon SQS Queue 1  |    |  Amazon SQS Queue 2  |
                  |   "SearchQueue"       |    |  "CodeGenQueue"      |
                  +-----------------------+    +----------------------+
                             |                                   |
                             v                                   v
              +--------------------------+      +--------------------------+
              |  Search Indexing App     |      |  Code Generation App     |
              |  (Consumer)              |      |  (Consumer)              |
              |  Spring Boot             |      |  Spring Boot             |
              +--------------------------+      +--------------------------+

Producer & Consumers

To keep things simple and avoid complicating my life, I will use Spring Cloud AWS Docs[↗]

Spring Cloud AWS simplifies using AWS managed services in a Spring Framework and Spring Boot applications. It offers a convenient way to interact with AWS provided services using well-known Spring idioms and APIs.

To configure the SNS and SQS services, this is the beans configuration class for every application:

@Configuration
public class ApplicationConfiguration {
    @Bean
    public AwsRegionProvider customRegionProvider() {
        return new InstanceProfileRegionProvider();
    }
    @Bean
    public AwsCredentialsProvider customInstanceCredProvider() {
        return  InstanceProfileCredentialsProvider.builder()
                .build();
    }
}
Enter fullscreen mode Exit fullscreen mode
User Management App (producer)
package com.nivekaa.msproducer.infra.messaging.producers;
import com.nivekaa.msproducer.core.domain.User;
import com.nivekaa.msproducer.core.interactors.IndexingUserInteractor;
import io.awspring.cloud.sns.core.SnsNotification;
import io.awspring.cloud.sns.core.SnsOperations;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class IndexingUserProducer implements IndexingUserInteractor {
  private final SnsOperations snsOperations;
  private @Value("${sns.topic.users}") String userTopic;
  @Override
  public void pushUserAsMessage(User payload) {
    SnsNotification<User> notification = SnsNotification.builder(payload)
        .build();
    snsOperations.sendNotification(userTopic, notification);
  }
}
Enter fullscreen mode Exit fullscreen mode
## resources/application.properties
sns.topic.users=UserCreatedTopic
Enter fullscreen mode Exit fullscreen mode

The code above is the crucial part, as it handles the publication of the message after a user is created. Given that the application is fairly large and spans multiple files, I’ve decided to provide a link to the full project on GitHub for a more comprehensive view and better context. GitHub repo: User Management[↗]

Search Indexing App (consumer)
package com.nivekaa.sqsconsumer.infra.messaging.consumers;
import java.time.LocalDateTime;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ExampleSQSConsumer {
    @SqsListener(queueNames = { "${sqs.queue.index-user}" })
    public void listen(String payload) {
        log.info("""
        *******************  SQS Payload ***************
        * User Info: {}
        * Received At: {}
        ************************************************
        """, payload, LocalDateTime.now());
    }
}
Enter fullscreen mode Exit fullscreen mode
## resources/application.yml
sqs:
  queue:
    index-user: SearchQueue-main
Enter fullscreen mode Exit fullscreen mode

Follow the full project GitHub repo[↗]

Code Generation App (consumer)
@Slf4j
@Component
@RequiredArgsConstructor
public class UserRegisterSQSConsumer {
    private final LocalStorageInteractor storageInteractor;
    @SqsListener(queueNames = { "${sqs.queue.gen-code}" })
    public void listen(String payload) {
        log.info("""
        *******************  SQS Payload ***************"
        * Message Content: {}
        * Received At: {}
        ************************************************
        """, payload, Date.from(Instant.now()));
    // Unwanted code👇🏻👇🏻...
}
Enter fullscreen mode Exit fullscreen mode
## resources/application.yml
sqs:
  queue:
    gen-code: CodeGenQueue-main
Enter fullscreen mode Exit fullscreen mode

Follow the full project GitHub repo[↗]

Build infrastructure using CDK with java as language


// MayStack.java
public class MyStack extends Stack {
  public MyStack(final Construct scope, final String id) {
    this(scope, id, null);
  }
  public MyStack(final Construct scope, final String id, final StackProps props) {
    super(scope, id, props);
    // Create a VPC (Virtual Private Cloud) using a custom NetworkConstruct class.
    IVpc vpc = new NetworkContruct(this, "NetworkResource", props).getVpc();
    // Define the first SQS queue for user code generation, using a custom QueueConstruct.
    IQueue genUserCodeQueue =
        new QueueConstruct(
                this, "GenerateUserCodeQueueResource", new CustomQueueProps("CodeGenQueue"), props)
            .getQueue();
    // Define the second SQS queue for user indexing, also using a custom QueueConstruct.
    IQueue indexingUserQueue =
        new QueueConstruct(
                this, "IndexingUserQueueResource", new CustomQueueProps("SearchQueue", true), props)
            .getQueue();
    // Create an SNS topic named "UserCreatedTopic" using a custom TopicConstruct class.
    TopicConstruct topic =
        new TopicConstruct(this, "TopicResource", new CustomTopicProps("UserCreatedTopic"), props);

    // Subscribe both SQS queues to the SNS topic, so they will receive messages when published.
    topic.subscribeQueue(genUserCodeQueue);
    topic.subscribeQueue(indexingUserQueue);

    // Define an EC2 instance (Compute) for the code generation service.
    ComputerConstruct genCodeService =
        new ComputerConstruct(
            this,
            "GenCodeServerResource",
            ComputerProps.builder()
                .allowSSHConnection(true)
                .vpc(vpc)
                .instanceName("GenCodeServer")
                .volumeSize(8)
                .enableKeyPair(true)
                .hostedAppPort(8081)
                .bootstrapScript("./gen-code-webserver-startup.sh")
                .build(),
            props);
    // Grant the EC2 instance permissions to consume messages from the SQS queue for code generation.
    genCodeService.addPolicyToComputer(
        PolicyStatement.Builder.create()
            .sid("AllowConsumingSQSMessage")
            .actions(
                List.of(
                    "sqs:DeleteMessage",
                    "sqs:ReceiveMessage",
                    "sqs:GetQueueAttributes",
                    "sqs:GetQueueUrl"))
            .resources(List.of(genUserCodeQueue.getQueueArn()))
            .effect(Effect.ALLOW)
            .build());

    // Define an EC2 instance for the user management service (message producer).
    ComputerConstruct userManagementService =
        new ComputerConstruct(
            this,
            "ProducerComputerResource",
            ComputerProps.builder()
                .volumeSize(10)
                .enableKeyPair(true)
                .allowSSHConnection(true)
                .hostedAppPort(8080)
                .vpc(vpc)
                .instanceName("MSProducer")
                .bootstrapScript("./producer-webserver-startup.sh")
                .build(),
            props);

    // Grant the user management EC2 instance permissions to publish messages to the SNS topic.
    userManagementService.addPolicyToComputer(
        new PolicyStatement(
            PolicyStatementProps.builder()
                .effect(Effect.ALLOW)
                .actions(List.of("sns:Publish", "sns:ListTopics", "sns:CreateTopic"))
                .resources(List.of(topic.getTopic().getTopicArn()))
                .build()));

    // Define an EC2 instance for the indexing service (message consumer).
    ComputerConstruct indexingComputer =
        new ComputerConstruct(
            this,
            "IndexingComputerResource",
            ComputerProps.builder()
                .bootstrapScript("./indexing-webserver-startup.sh")
                .vpc(vpc)
                .instanceName("indexing-webserver")
                .enableKeyPair(true)
                .allowSSHConnection(true)
                .volumeSize(8)
                .build(),
            props);

    // Grant the indexing EC2 instance permissions to consume messages from the SQS queue for indexing.
    indexingComputer.addPolicyToComputer(
        PolicyStatement.Builder.create()
            .effect(Effect.ALLOW)
            .sid("AllowInstanceToReceiveSQSMessage")
            .actions(
                List.of(
                    "sqs:DeleteMessage",
                    "sqs:ReceiveMessage",
                    "sqs:GetQueueAttributes",
                    "sqs:GetQueueUrl"))
            .resources(List.of(indexingUserQueue.getQueueArn()))
            .build());
  }
}


// MyApp.java
public class Day020App {
  public static void main(final String[] args) {
    App app = new App();
    new MyStack(
        app, "Day020Stack",
        StackProps.builder()
            .env(Environment.builder()
                    .account(System.getenv("CDK_DEFAULT_ACCOUNT"))
                    .region(System.getenv("CDK_DEFAULT_REGION"))
                    .build())
            .build());
    app.synth();
  }
}
Enter fullscreen mode Exit fullscreen mode

🎯NetworkContruct.java - [↗]
🎯ComputerConstruct.java - [↗]
🎯QueueConstruct.java - [↗]
🎯TopicConstruct.java - [↗]

Deployment

⚠️⚠️ Before run the deployment command ensure that you have java installed on your host machine. I used Java 21 under MacOs to build this insfrastructure.

Open the terminal anywhere and run the following commande:

git clone https://github.com/nivekalara237/100DaysTerraformAWSDevops.git
cd 100DaysTerraformAWSDevops/day_020
cdk bootstrap --profile cdk-user
cdk deploy --profile cdk-user Day020Stack
Enter fullscreen mode Exit fullscreen mode

You can find the full project in my GitHub repo[↗]

Top comments (0)