DEV Community

Cover image for Guide to Testing SQS-Based Microservices with Signadot Sandboxes
Signadot
Signadot

Posted on

Guide to Testing SQS-Based Microservices with Signadot Sandboxes

Read this tutorial on Signadot.

Introduction

Welcome to the SQS + Signadot Sandboxes tutorial! In this guide, you'll learn how to quickly test new versions of SQS-based microservices—either in a pull request or during local development—by leveraging Amazon SQS integration running in a Minikube cluster, with a brief look at the SNS-to-SQS fanout pattern for broader message distribution. This setup enables you to:

  • Rapidly iterate on consumer based message processing without redeploying the entire stack
  • Use sandboxes to isolate and test changes alongside a stable baseline
  • Observe message flow and processing in real time
  • Test SQS integration patterns safely in development

What you'll learn:

  • Deploy SQS-based microservices and Signadot in Kubernetes
  • Run producer and consumer services both baseline and sandboxed versions.
  • See how AWS SQS Message distribution works. Meanwhile SNS + SQS fanout integration
  • Deploy a sandboxed consumer and route messages to it
  • Understand how message routing and selective processing work

Prerequisites

Before you begin, ensure you have the following prerequisites set up:

1. Minikube with Docker

  • Install Docker, Minikube & Helm on your local machine
  • Start Minikube: minikube start —-driver=Docker
  • To use Minikube’s Docker daemon instead of the local one run: eval $(minikube docker-env)
  • Verify cluster is ready: kubectl cluster-info

2. Active AWS account

  • Create AWS account and activate account with card and mobile verification
  • Or alternatively you can use LocalStack ( which has 14 days trial version ) for emulate AWS services
  • Create IAM user to grant the required permission to use aws resource outside the AWS project

3. Signadot Account and Operator

Project Setup

  • Clone the Project Repository
$ clone https://github.com/your-org/SQS-Based-Microservices-with-Signadot
$ cd SQS-Based-Microservices-with-Signadot
Enter fullscreen mode Exit fullscreen mode

Here is a more detailed explanation of the project's folder structure, elaborating on the purpose of each directory and file and how they contribute to the overall application.

├── apps/                 # Houses the distinct microservices of the application.
│   ├── consumer/         # Contains the SQS message consumer service.
│   │   └── app.py
│   ├── frontend/         # Holds the user-facing web server and UI assets.
│   │   ├── app.py
│   │   └── public/       # Stores static files like HTML, CSS, and images.
│   └── producer/         # Contains the service that publishes messages.
│       └── app.py
├── modules/              # Contains shared, reusable code modules for all services.
│   ├── DataTransferObjects/ # Defines Pydantic models for API request/response data.
│   ├── events/           # Manages event logging and retrieval using Redis.
│   ├── logger/           # Provides a standardized logging configuration.
│   ├── otel/             # Includes helpers for OpenTelemetry instrumentation.
│   ├── pull_router/      # Client for communicating with the Signadot routing service.
│   ├── sns/              # A client module for interacting with AWS SNS.
│   └── sqs/              # A client module for interacting with AWS SQS.
├── sandbox/              # Contains configuration files for Signadot sandboxing.
│   └── sns-sqs-router-grp.yaml
├── Dockerfile            # Defines instructions to build the application's Docker image.
├── main.py               # The main script to launch the different microservices locally.
├── README.md             # Contains project documentation, architecture, and setup guide.
└── requirements.txt      # Lists the Python package dependencies for the project.
Enter fullscreen mode Exit fullscreen mode
  • AWS Cloud Setup for SQS and SNS
    • Please go to this URL and complete the required steps to create your AWS account.
    • Create a new IAM user and grant the necessary permissions.
    • Create an access key that you have to store as a k8’s Secret to access aws SNS and SQS services
    • Copy & Past Access Key ID and Secret Access Key into k8s/secrets.yaml file in the project folder. ( Both Access Key ID and Secret Access Key should be converted into base64 format )
# k8s/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
  name: aws-credentials
  namespace: aws-sqs-app
type: Opaque
data:
  AWS_ACCESS_KEY_ID: <Access Key Id>
  AWS_SECRET_ACCESS_KEY: <Secret Access Key>
Enter fullscreen mode Exit fullscreen mode

As you can see, The image below shows sqs-sns-user IAM user information and permission policy AmazoneSNSFullAccess, AmazoneSQSFullAccess that allows access to AWS SNS & AWS SQS services.

Hooray! 🎉 We’ve finished all the setup needed to get our project up and running — now it’s time to fire it up and see it in action!

Build the demo app

First, you need to build the Docker image. The image we’re creating, sqs-signadot, is a simple demo application that we’ll use to showcase the Shared SQS & SNS + SQS fan-out pattern.

To build the image, run:

$ docker build -t sqs-signadot:latest .
$ docker image ls
Enter fullscreen mode Exit fullscreen mode

Verify the docker images in your docker repository inside minikube.

Whoo! 🎉 The build is successfull. sqs-signadot image exists in docker image repository.

Deploy the demo app

These simple steps will deploy the demo app that consists of Frontend, Producer & Consumer services to set up baseline flow of the AWS SQS & SNS with Signadot integration.

$ export NAME_SPACE=aws-sqs-app
$ kubectl create ns $NAME_SPACE
namespace/aws-sqs-app created

$ kubectl apply -f k8s/
configmap/app-config created
deployment.apps/consumer-deployment created
service/frontend-service created
deployment.apps/frontend-deployment created
service/producer-service created
deployment.apps/producer-deployment created
service/redis created
statefulset.apps/redis created
secret/aws-credentials created
Enter fullscreen mode Exit fullscreen mode

Following services been deployed :

  • Frontend - Service that exposes the GUI. And also forward incoming messages ( From GUI ) to a python Fast API app ( producer ) via HTTP.
  • Producer - Service that can publish messages to AWS SQS queue.
  • Consumer - Service that implement SQS Subscription to Selectively Consume Messages between baseline and sandbox.
  • Redis Server - Store & Retrieve event logs to show how messages have been distributed.

Lets check everything work as expected :

$ kubectl -n $NAME_SPACE get po
Enter fullscreen mode Exit fullscreen mode
NAME                                            READY    STATUS        RESTARTS       AGE
consumer-deployment-7444f9b7f8-96vzm        2/2      Running     2 (4m39s ago)      27m
frontend-deployment-6c5f85dc7-rjzx5         1/1      Running     1 (4m39s ago)      27m
producer-deployment-85c6f7d747-b9lpv        2/2      Running     2 (4m39s ago)      27m
redis-0                                         1/1      Running     1 (4m39s ago)      27m
Enter fullscreen mode Exit fullscreen mode

Next, The tunnel is going to be established by Signadot, so that all in-cluster services are made available locally. To create config.yaml refer https://www.signadot.com/docs/getting-started/installation/signadot-cli#local-configuration

Use the output of the following command as the value for kubeContext.

$ Kubectl config current-context
Enter fullscreen mode Exit fullscreen mode

Note: The config.yaml file path should be ~/.signadot/config.yaml

$ signadot local connect --config ~/.signadot/config.yaml

signadot local connect needs root privileges for:
        - updating /etc/hosts with cluster service names
        - configuring networking to direct local traffic to the cluster

signadot local connect has been started ✓
* runtime config: cluster crop-staging-1, running with root-daemon
✓ Local connection healthy!
    * operator version 0.19.2
    * port-forward listening at ":39911"
    * localnet has been configured
    * 19 hosts accessible via /etc/hosts
    * sandboxes watcher is running
* Connected Sandboxes:
    - No active sandbox
Enter fullscreen mode Exit fullscreen mode

As you can see, Signadot local connect is exposing your services to your host machine, The frontend-service is exposed at http://frontend-service.aws-sqs-app:8080 to interact with web GUI.

Initializing both AWS SQS queue and AWS SNS topic

In the cloud native world the ability to make infrastructure provision automation would be amazing. By compiling these amazing concepts below code shows how the SQS queue and SNS topic can be created by code itself when producer or consumer service startup.

if args.producer or args.consumer:
        # Lazy load the AWS modules only when the producer or consumer are
        # actually run. This prevents services that do not need them (like
        # the frontend) from crashing if AWS credentials are not configured
        # in their environment.
        from modules.sqs.sqs_client import create_queue, get_queue_arn
        from modules.sns.sns_client import create_topic, subscribe_sqs_to_sns

        logger.info("Initializing SQS queue...")
        queue_url = create_queue()
        if not queue_url:
            logger.error("Failed to create or get SQS queue. Exiting.")
            return

        logger.info("Initializing SNS topic and subscription...")
        topic_arn = create_topic()
        if not topic_arn:
            logger.error("Failed to create or get SNS topic. Exiting.")
            return

        queue_arn = get_queue_arn(queue_url)
        if not queue_arn:            logger.error("Failed to get SQS queue ARN. Exiting.")
            return

        subscription_arn = subscribe_sqs_to_sns(topic_arn, queue_arn, queue_url)
        if not subscription_arn:
            logger.error("Failed to subscribe SQS queue to SNS topic. Exiting.")
            return
        logger.info(f"Successfully subscribed queue to topic. Subscription ARN: {subscription_arn}")
Enter fullscreen mode Exit fullscreen mode

This image shows the SQS queue being created and how it’s showing in the AWS console. Additionally the Access Policy is being created to work with SNS to SQS Fanout pattern.

The image below shows an AWS SNS topic with an SQS queue subscribed to it. We’ll be using this setup to demonstrate how the SNS + SQS fan-out pattern works in practice.

Test Baseline Flow without using Signadot’s Sandbox

The architectural flow of the baseline message processing diagram.

The code below shows how the producer has done his job to publish a message to the AWS SQS queue.

# Send message to SQS queue
logger.info(f"Sending message to SQS queue: {SQS_QUEUE_URL}")
response = sqs_client.send_message(
    QueueUrl=SQS_QUEUE_URL,
    MessageBody=json.dumps(msg_dict),
    MessageAttributes=message_attributes,
)
Enter fullscreen mode Exit fullscreen mode

Head over to http://localhost:8080 in your browser to use the AWS SQS demo frontend and send a message. With no sandboxes set up yet, the baseline consumer will pick it up, and you can watch it appear in the frontend interface, just like in the screenshot below.

Producer’s header context propagation

Our next step is to explore consumer sandbox testing. Having demonstrated the basic message flow between producer and consumer services in our demo application, we now need to address a critical development challenge: how to rapidly test new versions of producer and/or consumer code without disrupting shared testing environments.

Our goal is to create an isolated testing environment where developers can validate changes before merging. We'll accomplish this by:

  1. Using OpenTelemetry auto-instrumentation to propagate request headers - ensuring context flows seamlessly from producers through the messaging system to consumers
  2. Implementing selective routing based on header values - directing traffic to sandboxed versions of services when specific headers are present
  3. Deploying new service versions using Signadot sandboxes - creating isolated environments for testing code changes from dev branches or local workstations

This approach will enable you to test new consumer logic, producer modifications, or both simultaneously in a controlled sandbox environment. Let's walk through each step:

How OTel auto-instrumentation propagate headers without modifying application code.

When you build the container image using docker build, the Dockerfile will install the required packages to implement OTel auto-instrumentation.

# Install OpenTelemetry SDK + instrumentations
RUN pip install --no-cache-dir \
    opentelemetry-distro \
    opentelemetry-exporter-otlp \
    opentelemetry-instrumentation-asgi \
    opentelemetry-instrumentation-fastapi \
    opentelemetry-instrumentation-requests \
    opentelemetry-instrumentation-botocore

# Install OpenTelemetry bootstrap separately
RUN opentelemetry-bootstrap -a install
Enter fullscreen mode Exit fullscreen mode

After that the Fastapi services ( Both Frontend & producer ) are powered up with Otel automatic header context propagation through the command below.

def run_frontend():
    # Launches the FastAPI frontend app using Uvicorn as a subprocess with OTel auto-instrumentation
    # and handles graceful shutdown on KeyboardInterrupt (Ctrl+C).
    command = ["opentelemetry-instrument", "uvicorn", "apps.frontend.app:app", "--host", "0.0.0.0", "--port", "8000"]
    logger.info(f"Starting frontend server with command: {' '.join(command)}")
    # Pass the modified environment to the subprocess
    process = subprocess.Popen(command)

def run_producer(queue_url: str, topic_arn: str):
    # Launches the FastAPI producer app using Uvicorn as a subprocess with OTel auto-instrumentation
    # and handles graceful shutdown on KeyboardInterrupt (Ctrl+C).
    # Set the queue URL as an environment variable for the producer subprocess
    env = os.environ.copy()
    env["SQS_QUEUE_URL"] = queue_url
    env["SNS_TOPIC_ARN"] = topic_arn
    command = ["opentelemetry-instrument", "uvicorn", "apps.producer.app:app", "--host", "0.0.0.0", "--port", "8000"]
    logger.info(f"Starting producer server with command: {' '.join(command)}")
    # Pass the modified environment to the subprocess
    process = subprocess.Popen(command, env=env)
Enter fullscreen mode Exit fullscreen mode

Test Sandbox flow with using Signadot’s sandboxes

A Signadot sandbox is an isolated, short-lived environment that lets you safely test code changes without impacting test environment traffic.

What does consumer will do in the sandbox:

Instantiate Dedicated Subscribers for Each Sandbox - so their consumption offsets remain isolated.

Filter Out Irrelevant Messages Using the Routes API - based on routing key evaluation via the Routes API.

Preserve Context by Propagating the Routing Key Downstream - by including the routing key when the subscriber communicates with other services or message flows

Let's take a look at how the router api periodically fetches routing keys in the consumer sandbox.

# --- Start asyncio background task in a separate thread ---
routes_client = RoutesAPIClient(sandbox_name=SANDBOX_NAME)
cache_updater_coro = routes_client._periodic_cache_updater()
asyncio_thread = threading.Thread(
    target=start_async_loop,
    args=(cache_updater_coro,),
    daemon=True,
)
asyncio_thread.start()
logger.info("Started background cache updater.")
Enter fullscreen mode Exit fullscreen mode

The thread that is running in the background to keep the router api stay alive. Meanwhile _periodic_cache_updater routine fetches the routing keys with 05 sec period manner.

How to extract the OTEL baggage header inside the consumer code

def extract_routing_key_from_baggage(message_attributes: dict, getter: Optional[Getter] = sqs_getter) -> Optional[str]:
    ctx = W3CBaggagePropagator().extract(
        carrier=message_attributes,
        getter=getter
    )
    return baggage.get_baggage(ROUTING_KEY, ctx)
Enter fullscreen mode Exit fullscreen mode

And here’s where selective consumption comes into play. At this stage, the consumer checks the routing key of each message against its own sandbox routing key using the Routes API. If it’s not a match, the message is skipped and immediately released back into the queue, reducing its visibility in this sandbox and making it instantly available for the correct sandbox’s consumer. This guarantees isolation while keeping message delivery fast and efficient.

if not router_api.should_process(routing_key):
      # This message is not for this consumer instance. Make it immediately visible again for other consumers.
      logger.info(f"Skipping message with routing_key: '{routing_key}'. Releasing back to queue.")
      sqs_client.change_message_visibility(
            QueueUrl=sqs_queue_url,
            ReceiptHandle=message["ReceiptHandle"],
            VisibilityTimeout=0,
      )
      continue
Enter fullscreen mode Exit fullscreen mode

Create the sandbox — Signadot’s Signature feature

To create the consumer sandbox lets go to build a sandbox configuration file.

# sqs_sandbox.yaml
apiVersion: signadot.com/v1
kind: Sandbox
name: sqs-counsumer-sandbox
spec:
  labels:
    app: "sns-sqs-fanout-sandbox"
  description: Isolated sandbox environments to enable sqs messgae routing
  cluster: "@{cluster}"
  forks:
  - forkOf:
      kind: Deployment
      name: consumer-deployment
      namespace: aws-sqs-app
Enter fullscreen mode Exit fullscreen mode

This YAML tells Signadot:

Create me a sandbox called sqs-counsumer-sandbox in my cluster, fork the consumer-deployment from aws-sqs-app namespace, set an environment variable to indicate the sandbox name.

Lets apply the sandbox configuration:

Note: To run the following command, you have to install the signadot CLI tool on your local machine.

$ signadot sandbox apply -f ./sandbox/sqs_sandbox.yaml --set cluster="crop-staging-1"

# To list the pods being created
$ kubectl -n $NAME_SPACE get po
Enter fullscreen mode Exit fullscreen mode
NAME                                                              READY   STATUS    RESTARTS        AGE
consumer-deployment-7444f9b7f8-lrcld                              2/2     Running   2 (5m58s ago)   23h

frontend-deployment-6c5f85dc7-8mzfk                               1/1     Running   1 (5m58s ago)   23h

producer-deployment-85c6f7d747-vt56s                              2/2     Running   2 (5m58s ago)   23h

redis-0                                                           1/1     Running   1 (5m58s ago)   23h

sqs-counsumer-sandbox-dep-consumer-deployment-7ca2ec39-cc4dfw2p   2/2     Running   0               61s
Enter fullscreen mode Exit fullscreen mode

As you can see the sandbox called sqs-counsumer-sandbox-dep-consumer-deployment-7ca2ec39-cc4dfw2p has been created and the age of the pod shows as 61 seconds.

Test Sandbox behavior with routing key

The diagram below will illustrate how the routing key works.

Select the sqs-consumer-sandbox after enabling Signadot’s browser extension.

In the next section, building upon the shared SQS pattern, the focus shifts to the SNS-to-SQS Fan-out pattern. For simplicity, instead of creating ephemeral queues, the behavior of this pattern will be demonstrated using an existing queue, with multiple consumers sharing that queue and coordinating message handling in the same way as with plain SQS.

Another option with SNS/SQS is to give each consumer its own queue, where every consumer receives a full copy of the messages. In this case, queues may be created dynamically, and consumers apply selective logic in their code to drop messages not intended for them.
The https://aws.amazon.com/getting-started/hands-on/send-fanout-event-notifications/ elaborate the implementation of SNS + SQS Fanout pattern with ephemeral SQS Queue.

Creating Producer Sandbox to implement SNS to SQS Fanout pattern

# sns_sandbox.yaml
apiVersion: signadot.com/v1
kind: Sandbox
name: sns-sqs-fanout-sandbox
spec:
  labels:
    app: "sns-sqs-fanout-sandbox"
  description: Isolated sandbox environments to enable sns to sqs fanout routing
  cluster: "@{cluster}"
  forks:
  - forkOf:
      kind: Deployment
      name: producer-deployment
      namespace: aws-sqs-app
    customizations:
      env:
        - name: SNS_FANOUT_PUBLISH
          value: "true"
Enter fullscreen mode Exit fullscreen mode

To avoid repetition with consumer sandbox, the sandbox boilerplate won’t be revisited here; however, a few key points will be highlighted to ensure clarity and prevent any confusion.

  • Forked workload — The producer deployment is sandboxed for SNS integration
  • ENV variable — Introduce new env variable called SNS_FANOUT_PUBLISH to conditionally check SQS message publish from SNS publish. The related code shown below
event_description = (
    'Sending produce request to SNS topic'
    if SNS_FANOUT_PUBLISH
    else 'Sending produce request to SQS queue'
)

if SNS_FANOUT_PUBLISH:
    # Publish message to SNS topic
    logger.info(f"Publishing message to SNS topic: {SNS_TOPIC_ARN}")
    response = sns_client.publish(
         TopicArn=SNS_TOPIC_ARN,
         Message=json.dumps(msg_dict),
         MessageAttributes=message_attributes,
    )
else:
    # Send message to SQS queue
    logger.info(f"Sending message to SQS queue: {SQS_QUEUE_URL}")
    response = sqs_client.send_message(
          QueueUrl=SQS_QUEUE_URL,
          MessageBody=json.dumps(msg_dict),
          MessageAttributes=message_attributes,
    )
Enter fullscreen mode Exit fullscreen mode

Let's provision the sandbox:

$ signadot sandbox apply -f ./sandbox/sns_sandbox.yaml --set cluster="crop-staging-1"
Enter fullscreen mode Exit fullscreen mode

Create Signadot Router Group to control how traffic is routed into sandboxes.

# sns-sqs-router-grp.yaml
name: sns-sqs-router-grp
spec:
  cluster: "@{cluster}"
  description: "route group for testing multiple sandboxes together"
  match:
    any:
    - label:
        key: app
        value: sns-sqs-fanout-sandbox
Enter fullscreen mode Exit fullscreen mode

The router group lets you route network traffic from one or more sandboxes to one or more endpoints based on label selectors. It acts like a traffic router or load balancer within your Signadot sandboxes and Kubernetes clusters.

Let's provision the router group:

$ signadot routegroup apply -f ./sandbox/sns-sqs-router-grp.yaml --set cluster="crop-staging-1"
Enter fullscreen mode Exit fullscreen mode

Scenario 1 - SNS to SQS baseline consumer

The diagram illustrates how the message will flow in scenario 1.

As you can see the request has been sent through the producer sandbox. Which has published the message to AWS SNS topic and consumed by baseline consumer.

Scenario 2 - SNS to SQS sandbox consumer

Another diagram that illustrates the message flowing behavior in scenario 2.

As you can see the request has been sent through the producer sandbox. Which has published the message to AWS SNS topic and consumed by sandbox consumers.

‍Summary‍

In this tutorial, you learned how to use a shared Amazon SQS queue with Signadot Sandboxes to quickly test new message processing logic in an isolated Minikube environment. We demonstrated deploying baseline services, safely routing messages through the shared queue, and using sandboxes to validate changes without disrupting the main processing flow. While the focus was on the shared-queue pattern, we also touched on how the SNS-to-SQS fanout pattern can broadcast messages to multiple queues for broader testing scenarios.

This approach enables faster iteration and more reliable integration testing for event-driven microservices architectures, offering:

  • Realistic message flow simulation with shared SQS queues.
  • Safe isolation for experimental consumers in sandboxes.
  • Compatibility with fanout-based testing via SNS-to-SQS.
  • Reduced risk when validating new logic alongside live-like traffic.

Top comments (0)