DEV Community

Cover image for Spinning up an AWS Fargate service based on messages on a queue - using aws-cdk โ˜๏ธ๐Ÿ‘€
Sean O'Connor
Sean O'Connor

Posted on • Edited on

Spinning up an AWS Fargate service based on messages on a queue - using aws-cdk โ˜๏ธ๐Ÿ‘€

Until recently I had an ECS service that would listen constantly to an SQS queue waiting until there were any messages and then would process the message. This was a waste of resources and not cost-effective at all. So, I looked into alternative ways where I could simply spin up tasks only when messages were pushed to the SQS queue. This would mean my service would not lay idle all day and all night waiting.

Enter aws-cdk. Please bear in mind this is not a beginners tutorial but one that caused me a lot of headaches and iterations to get right. Hopefully, this will help at least one person/team and save them from weeks of pain.

Prerequisites:

  • Experience with aws-cdk.
  • Experience with python.
  • Experience with aws-sdk.

By the end of the tutorial, you will have a Fargate service that spins up tasks when messages are pushed to an SQS queue. I will also show an alternative way to get the service to spin down these tasks when there are no messages on the queue due to some issues around current library implementations.

Quickstart

Install the kit with npm install -g aws-cdk

To check it has been installed cdk --version

To initiate a project:

mkdir fargate-cdk
cd fargate-cdk
cdk init sample-app --language=python
Enter fullscreen mode Exit fullscreen mode

Imports

To begin with, import every service under the sun that aws-cdk provides.

from aws_cdk import (
    core as cdk,
    aws_ec2 as ec2,
    aws_ecs as ecs,
    aws_ecs_patterns as ecs_patterns,
    aws_sqs as sqs,
    aws_ecr as ecr,
    aws_iam as iam,
    aws_ssm as ssm,
    aws_kms as kms,
    aws_autoscaling as autoscaling,
    aws_applicationautoscaling as app_autoscaling,
    aws_cloudwatch as cw,
)
from aws_cdk.aws_cloudwatch_actions import ApplicationScalingAction
from aws_cdk.core import Duration
Enter fullscreen mode Exit fullscreen mode

Initialise and fetch variables

Once you have used the aws-cdk command to initialise your project, navigate to sample-app.py file and you should see a class created by aws-cdk that inherits from cdk.Stack. We shall be placing all our code under the __init__ method for this tutorial.

Throughout the code, you will need some basic variables that will be used throughout so let's define those first.

class SampleApp(cdk.Stack):
    def __init__(
        self,
        scope: cdk.Construct,
        construct_id: str,
        **kwargs,
    ) -> None:
        super().__init__(scope, construct_id, **kwargs)

        image_tag = "" # tag of the repo within ecr

        cluster_name = "" # cluster group name of containers

        security_group = "" # virtual firewall for your EC2 instances

        kms_key = "" # needed for decrypting

        queue_name = "" # name of sqs queue that will be created

        vpc_id = "" # id of the VPC in which to provision the cluster.
...
Enter fullscreen mode Exit fullscreen mode

Using variables to fetch objects

We will now use the defined variables to fetch our vpc, cluster, kms key and to create our sqs queue.

...
        # fetches vpc using the id
        vpc = ec2.Vpc.from_lookup(self, "VPC", vpc_id=vpc_id)

        # fetches kms_key used for decryption using the arn of the key
        kms_key = kms.Key.from_key_arn(
            self, "EncryptionKey", f"arn:aws:kms:us-west-1:123456789:key/{kms_key}" # arn of kms key
        )

        # fetches cluster using defined objects and variables above
        cluster = ecs.Cluster.from_cluster_attributes(
            self,
            "Cluster",
            vpc=vpc,
            cluster_name=cluster_name,
            security_groups=[
                ec2.SecurityGroup.from_security_group_id(
                    self, "SecurityGroup", security_group_id=security_group
                )
            ],
        )

        # creates a fifo queue
        queue = sqs.Queue(
            self,
            "SampleQueue",
            content_based_deduplication=True,
            fifo=True,
            queue_name=queue_name, # must contain .fifo at end of name
            receive_message_wait_time=Duration.seconds(20),
            retention_period=Duration.seconds(345600),
            visibility_timeout=Duration.seconds(43200),
        )
...
Enter fullscreen mode Exit fullscreen mode

Fetching secrets from Systems Manager Parameter Store and defining environment variables

THIS is a very key step.

We must define any parameters that are stored within the systems manager parameter store and that will be used within your application.

...

        database_name = ssm.StringParameter.from_secure_string_parameter_attributes(
            self,
            "DATABASE_NAME",
            version=1,
            parameter_name="", # name of parameter within ssm
            simple_name=True, # if using name of parameter and not arn or any other identifier
            encryption_key=kms_key, # needed if encryption is turned on within parameter store. Normally needed when using simple_name
        )
...
Enter fullscreen mode Exit fullscreen mode

Then create a dictionary of these secrets. Any variables that your app fetches from the local environment must be defined too.

...
        secrets = {
            "DATABASE_NAME": ecs.Secret.from_ssm_parameter(database_name),
        }

        environments = {
            "ENV": environment,
        }
...
Enter fullscreen mode Exit fullscreen mode

Creating the Queue Processing Fargate Service

This is the most important part. It creates the Fargate service and uses everything we have defined up until now.
This service will listen to the queue created and spin up tasks when there are messages on the queue.

...
        fargate_service = ecs_patterns.QueueProcessingFargateService(
            self,
            "SampleAppService",
            cpu=4096, # set the amount of cpu you wish your tasks to have
            memory_limit_mib=8192, # set the amount of memory you with your task to have
            image=ecs.ContainerImage.from_ecr_repository(
                repository=ecr.Repository.from_repository_name(
                    self,
                    "SampleAppRepository",
                    repository_name="",
                ), # fetch repo name from ecr
                tag=image_tag,
            ), # fetch image from a certain ecr repo
            min_scaling_capacity=0, # min number of tasks
            max_scaling_capacity=1, # the max number of tasks a service can spin up
            cluster=cluster, # cluster for the service
            queue=queue, # queue the service shall be listening to.
            scaling_steps=[
                {"upper": 0, "change": 0},
                {"lower": 1, "change": +1},
            ], # this defines how the service shall autoscale
            secrets=secrets, # secret parameters from ssm
            environment=environments, # any additional env variables needed
            visibility_timeout=cdk.Duration.hours(12) # how long the task will take the item off the queue for
        )
...
Enter fullscreen mode Exit fullscreen mode

DO NOT use the variable desired_task_count as it is deprecated and will not have an effect on the service at all.

Key points:

  • Make sure you set a visbility_timeout or when you try to delete the message from the queue after 30 seconds it will fail due to the message id timing out. The default timeout is 30 seconds.
  • It is important to set the upper scaling step to {"upper": 0, "change": 0}. This is because when the task takes the last message off the queue we do not want any change to occur to the service. If we set the change to be -1 the service will try to tear down the task whilst it is processing the last message. So it is key that when there are 0 messages on the queue we do not kill our task. We will solve this issue with CPU alarms later. See this link for an ongoing Github issue.

Adding policies to the service

Now we have set up our service we need to add policies to it so our task application can take action on other aws services.

...
        kms_policy = iam.PolicyStatement(
            actions=[
                "kms:Decrypt",
            ],
            effect=iam.Effect.ALLOW,
            resources=[
                "", # the arn of the resource
            ])

fargate_service.task_definition.add_to_task_role_policy(kms_policy)
...
Enter fullscreen mode Exit fullscreen mode

ย Deployment - Stage 1

Right! Now we have a Fargate service that scales when messages are on the SQS queue. This is stage 1 of deployment and currently, our service can only spin up, and not tear down tasks.

To allow our tasks to tear down, we need to deploy the code we have produced above. I'll explain why in the next section.

To deploy: cdk deploy

Scaling down task

Now we have deployed the first step, how do we go about getting our service to tear down our task when there are no messages?

Early I mentioned we could not tear down based on messages on the queue due to the fact it would kill a task mid processing. So to scale down we will focus on the CPU usage of the task. If our tasks CPU usage is below a threshold for a length of time we will scale our tasks down to 0. This way we get around our tasks being killed whilst processing as CPU usage will be high when the task is in use.

So now add below the Fargate service and before the policies:

...
        # initialise the fargate service CPU usage metric to average over 3 minutes
        fargate_service_cpu_metric =         fargate_service.service.metric_cpu_utilization(
            period=Duration.minutes(3),
            statistic="Average"
        )

        # add an alarm to our fargate service CPU usage
        scale_in_init = fargate_service_cpu_metric.create_alarm(
            self,
            'SampleApp-ZeroCPUAlarm',
            alarm_description="For when sample app is idle, scale service to 0",
            alarm_name=alarm_name,
            evaluation_periods=1,
            threshold=0.01, # set threshold of cpu usage.
            actions_enabled=True,

        # create comparison operator so that we compare our cpu usage with our threshold. We want it less than or equal to the threshold             comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
            datapoints_to_alarm=1,
            statistic="Average",
        )

        # define our auto scaling target for our fargate service 
        scalable_target = app_autoscaling.ScalableTarget.from_scalable_target_id(
            self,
            'SampleApp-ScalableTarget',
            scalable_target_id=f'service/{fargate_service.cluster.cluster_name}/{fargate_service.service.service_name}|ecs:service:DesiredCount|ecs',
        )

        # define the action taken on our scaling target
        scaling_action = app_autoscaling.StepScalingAction(
            self,
            "scaleToZero",
            scaling_target=scalable_target,
            adjustment_type=autoscaling.AdjustmentType.EXACT_CAPACITY,
        )

        # create the adjustment made by our action
        scaling_action.add_adjustment(adjustment=0, upper_bound=0)

        # finally add our alarm action to our fargate alarm
        scale_in_init.add_alarm_action(ApplicationScalingAction(scaling_action))
...
Enter fullscreen mode Exit fullscreen mode

ย Deployment - Stage 2

Now, we have to redeploy our stack so our alarm is activated on our Fargate service.

So now, when our tasks are inactive and CPU usage is below the 0.01 threshold. Our Fargate service will spin down our tasks.

Conclusion

Hopefully, that will save someone a huge amount of pain. I am definitely a fan of using aws-cdk instead of cloudformation as it allows you to deal with traditional programming languages instead of pure YAML.

I will definitely be iterating on this code to remove the need for two-stage deployments by taking advantage of pipelines. I will update this tutorial as I learn more.

Top comments (5)

Collapse
 
bcambel profile image
Bahadir Cambel

What a fantastic piece of information!

Some small updates wrt latest CDK version(s).

Note: The below updates I needed to make in CDK Python, perhaps it might still work CDK JS, probably not. Just to be aware...

create_alarm does not accept statistic="Average"

The below is what the adjustment_type needs;
app_autoscaling.AdjustmentType not the autoscaling one...

scaling_action = app_autoscaling.StepScalingAction(
            self,
            "scaleToZero",
            scaling_target=scalable_target,
            adjustment_type=app_autoscaling.AdjustmentType.EXACT_CAPACITY,
        )
Enter fullscreen mode Exit fullscreen mode
Collapse
 
kinkysprout profile image
Casey Slaught

You have saved me from weeks of pain. Thank you!

Collapse
 
shellscape profile image
Andrew Powell

Your post was really invaluable to me getting my service setup and avoiding some scaling issues, thanks for this.

Collapse
 
seanyboi profile image
Sean O'Connor

Thanks @shellscape! That means a lot, glad it helped :)

Collapse
 
fishstick_kitty profile image
Ed Samprakos

This is great info. How does the contents of the SQS message get passed into my code using this approach?