DEV Community

Cover image for Create Serverless Data Pipeline Using AWS CDK (Python)
Anna Pastushko
Anna Pastushko

Posted on

Create Serverless Data Pipeline Using AWS CDK (Python)

Architecture diagram

Context

Data science team are about to start a research study and they requested a solution on AWS cloud. Here is what I got to offer them:

Main process (Data Processing):

  • Device uploads .mat file with ECG data to S3 bucket (Raw)
  • Upload triggers event creation which is sent to SQS queue.
  • Lambda polls SQS queue (event mapping invocation) and starts processing event. Lamba’s runtime is Python Docker container because libraries’ size exceed layers’ size limit of 250 MB. If any error occurs in the process, you’ll get notification in Slack.
  • Once completed, processed data in parquet format is saved to S3 Bucket (Processed).
  • To enable data scientists to query the data, Glue Crawler job creates a schema in the Data Catalog. Then, Athena is used to query Processed bucket.

Secondary process (CI/CD):

When a developer wants to change processing job logic, he should just prepare the changes and commit them to the CodeCommit repository. Everything else is automated and handled by CI/CD process.

CodeBuild service converts CDK code into CloudFormation template and deploys it to your account. In other words, it creates all infrastructure components automatically. Once completed, deployed resources’ group (stack) is available in CloudFormation service on web UI. To simplify these two steps and provide self-updates for CI/CD process as well, CodePipeline abstraction is used. You’ll also get Slack notifications about the progress.

Preparation

To prepare your local environment for this project, you should follow the steps described below:

  1. Install AWS CLI and set up credentials.
  2. Install NodeJS to be able to use CDK.
  3. Install CDK using command sudo npm install -g aws-cdk.
  4. Create new directory for your project and change your current working directory to it.
  5. Run cdk init --language python to initiate CDK project.
  6. Run cdk bootstrap to bootstrap AWS account with CDK resources.
  7. Install Docker to run Docker container inside Lambda.

Project Structure

This is the final look of the project. I will provide a step-by-step guide so that you’ll eventually understand each component in it.



DataPipeline
├── assets
│├── lambda
││├── dockerfile
││└── processing.py
├──cdk.out
│└── ...
├── stacks
│├── __init__.py
│├── data_pipeline_stack.py
│├── cicd_stack.py
│└──data_pipeline_stage.py
├── app.py
├── cdk.json
└── requirements.txt


Enter fullscreen mode Exit fullscreen mode

Our starting point is stacks directory. It contains mandatory empty file __init__.py to define a Python package. Three other files are located here:

  • data_pipeline_stack.py
  • cicd_stack.py
  • data_pipeline_stage.py

At first, we open data_pipeline_stack.py and import all libraries and constructs needed for further development. Also, we need to define class with parent class cdk.Stack.



import aws_cdk as cdk
import aws_cdk.aws_s3 as _s3
import aws_cdk.aws_sqs as _sqs
import aws_cdk.aws_iam as _iam
import aws_cdk.aws_ecs as _ecs
import aws_cdk.aws_ecr as _ecr
import aws_cdk.aws_lambda as _lambda
import aws_cdk.aws_s3_notifications as _s3n
from aws_cdk.aws_ecr_assets import DockerImageAsset
from aws_cdk.aws_lambda_event_sources import SqsEventSource

class DataPipelineStack(cdk.Stack):

    def __init__(self, scope, construct_id, **kwargs):
        super().__init__(scope, construct_id, **kwargs)


Enter fullscreen mode Exit fullscreen mode

After that, we use SQS Queue construct to connect S3 bucket and Lambda. Arguments are pretty simple: stack element id (‘raw_data_queue’) , queue name (‘data_pipeline_queue’) and time that message in the queue will not be visible after Lambda takes it for processing (cdk.Duration.seconds(200)). Note that visibility timeout value depends on your processing time — if processing takes 30 seconds, it is better to set it to 60 seconds. In this case, I set it to 200 seconds because processing takes ~100 sec.



data_queue = _sqs.Queue(self, 'raw_data_queue',
                        queue_name='data_pipeline_queue',
                        visibility_timeout=cdk.Duration.seconds(200))


Enter fullscreen mode Exit fullscreen mode

Next, we will create S3 buckets for raw and processed data using Bucket construct. Having in mind that raw data usually is accessed within several first days after upload, we can add lifecycle_rules to transfer data from S3 Standard to S3 Glacier after 7 days to reduce storage cost.



raw_bucket = _s3.Bucket(self, 'raw_bucket',
                        bucket_name='raw-ecg-data',
                        auto_delete_objects=True,
                        removal_policy=cdk.RemovalPolicy.DESTROY,
                        lifecycle_rules=[_s3.LifecycleRule(
                          transitions=[_s3.Transition(
                          storage_class=_s3.StorageClass.GLACIER,
                          transition_after=cdk.Duration.days(7))])])
raw_bucket.add_event_notification(_s3.EventType.OBJECT_CREATED,
                                  _s3n.SqsDestination(data_queue))
_s3.Bucket(self, 'processed_bucket',
           bucket_name='processed-ecg-data',
           auto_delete_objects=True,
           removal_policy=cdk.RemovalPolicy.DESTROY)


Enter fullscreen mode Exit fullscreen mode

Also, we need to connect raw bucket and SQS queue to define destination for events which are generated from the bucket. For that, we use add_event_notification method with two arguments: event we want queue to be notified on (_s3.EventType.OBJECT_CREATED) and destination queue to notify (_s3n.SqsDestination(data_queue)).

⚠️ After stack is destroyed, bucket and all the data inside it will be deleted. This behaviour can be changed by deleting (setting to default) removal_policy and auto_delete_objects arguments.

Next step is to create Lambda using DockerImageFunction construct. Please refer to the code below to see what arguments I define. I think they are pretty self-explanatory and you are already familiar with previous examples so I believe it won’t be a hard time. In case of troubles please refer to documentation.

⚠️ The only thing I should highlight is the value of timeout parameter in Lambda — it should always be less than visibility_timeout parameter in Queue (180 vs 200).



processing_lambda = _lambda.DockerImageFunction(self,    
                           'processing_lambda',
                           function_name='data_processing',
                           description='Starts Docker Container in Lambda to process data',                                              
                           code=_lambda.DockerImageCode
                           .from_image_asset('assets/lambda/',                                                                                                  
                                              file='dockerfile'),                                                
                           architecture=_lambda.Architecture.X86_64,
                           events=[SqsEventSource(data_queue)],
                           timeout=cdk.Duration.seconds(180),
                           retry_attempts=0,
                           environment={'QueueUrl': 
                                        data_queue.queue_url})

processing_lambda.role.attach_inline_policy(_iam.Policy(self,  
                  'access_fer_lambda',
                  statements=[_iam.PolicyStatement( 
                              effect=_iam.Effect.ALLOW,
                              actions=['s3:*'],
                              resources=['*'])]))


Enter fullscreen mode Exit fullscreen mode

Then, we attach policy to automatically created Lambda role, so it can process files from S3 using attach_inline_policy method. You can tune actions/resources parameter to grant Lambda more granular access to S3.

Now we move to assets directory.

There we need to create dockerfile and processing.py with data transformation logic, which is pretty simple. At first, we parse event from SQS to get information about file and bucket, then parse .mat file with ECG data, clean it and save it in .parquet format to Processed bucket. Also, it includes logging and Slack error messages. In the end, we should delete message from queue, so file is not processed again.

For your pipeline, you can change processing logic and replace _url with your own Slack hook.



import io
import json
import os
import boto3
import urllib3
import logging
import scipy.io
import pandas as pd
import pathlib as pl
import awswrangler as wr
from urllib.parse import unquote_plus

s3 = boto3.client('s3')
sqs = boto3.client('sqs')

def handler(event, context):
    # Parse event and get file name
    message_rh = event['Records'][0]['receiptHandle']
    key = json.loads(event['Records'][0]['body'])['Records'][0]['s3']['object']['key']
    key = unquote_plus(key)
    raw_bucket = json.loads(event['Records'][0]['body'])['Records'][0]['s3']['bucket']['name']
    processed_bucket = 'processed-ecg-data'

    try:
        # Read mat file
        _obj = s3.get_object(Bucket=raw_bucket, Key=key)['Body'].read()
        raw_data = scipy.io.loadmat(io.BytesIO(_obj))

        # Clean data
        _df = pd.DataFrame(raw_data['val'][0], columns=['ECG_data'])
        _df.fillna(method='pad', inplace=True)
        _df['ECG_data'] = (_df['ECG_data'] - _df['ECG_data'].min()) / (_df['ECG_data'].max() - _df['ECG_data'].min())

        # Save parquet file to verified bucket
        file_name = pl.Path(key).stem
        wr.s3.to_parquet(df=_df, path=f's3://{processed_bucket}/{file_name}.parquet')
        logging.info(f'File {file_name} was successfully processed.')

    except Exception:
        # Send notification about failure to Slack channel
        _url = 'https://hooks.slack.com/YOUR_HOOK'
        _msg = {'text': f'Processing of {key} was unsuccessful.'}
        http = urllib3.PoolManager()
        resp = http.request(method='POST', url=_url, body=json.dumps(_msg).encode('utf-8'))
        # Write message to logs
        logging.error(f'Processing of {key} was unsuccessful.', exc_info=True)

    # Delete processed message from SQS
    sqs.delete_message(
        QueueUrl=os.environ.get('QueueUrl'),
        ReceiptHandle=message_rh)


Enter fullscreen mode Exit fullscreen mode

Let’s go quickly through the logic of Docker file: at first, we pool special image for Lambda from AWS ECR repository, then install all Python libraries, copy our processing.py script to container and run command to launch handler function from the script.



FROM public.ecr.aws/lambda/python:3.8

RUN pip3 install pandas && \
   pip3 install numpy && \
   pip3 install boto3 && \
   pip3 install scipy && \
   pip3 install awswrangler

WORKDIR /

COPY processing.py ${LAMBDA_TASK_ROOT}

CMD [ "processing.handler" ]


Enter fullscreen mode Exit fullscreen mode

⚠️ Do not forget add libraries you used in processing.py to dockerfile.

At this stage we finished creating Data pipeline stack and can go further and start developing CI/CD stack.

CI/CD stack

For CI/CD process we will use CodePipeline service, which helps us to make deployment process easier. Each time we change Data pipeline or CI/CD stack via CodeCommit push, CodePipeline will automatically re-deploy both stacks. In short, stack for application should be added to CodePipeline stage, after that stage is added to CodePipeline. After that app is synthesised for CI/CD stack, not application stack. You can find more detailed description of connection between files and logic behind CodePipeline construct below.

Steps for CodePipeline stack creation

At first, we need to open cicd_stack.py and start with import of all libraries and constructs we will use. Later, we will create CodeCommit repository manually, but for now we need only reference to it, so we can add it as source for CodePipeline.



import aws_cdk as cdk
import aws_cdk.aws_iam as _iam
import aws_cdk.aws_chatbot as _chatbot
import aws_cdk.aws_codecommit as _ccommit
from stacks.data_pipeline_stage import *
import aws_cdk.aws_codestarnotifications as _notifications
from aws_cdk.pipelines import CodePipeline, CodePipelineSource, ShellStep

class CICDStack(cdk.Stack):

    def __init__(self, scope, construct_id, **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        pipeline_repo = _ccommit.Repository.from_repository_arn(  
                        self, 'data_pipeline_repository',
                        repository_name='data_pipeline_repository')


Enter fullscreen mode Exit fullscreen mode

We use CodePipeline construct to create CI/CD process. We use parameter self_mutation set to True to allow pipeline to update itself, it has True value by default. Parameter docker_enables_for_synth should be set to True if we use Docker in our application stack. After that, we add stage with application deployment and initiate pipeline build to construct our pipeline. Latter is necessary step to set up Slack notifications in the future.



pipeline = CodePipeline(self, 'cicd_pipeline',
                        pipeline_name='cicd_pipeline',
                        docker_enabled_for_synth=True,
                        self_mutation=True,
                        synth=ShellStep('Synth',
                        input=CodePipelineSource.code_commit(  
                              pipeline_repo, 'master'),
                              commands=['npm install -g aws-cdk',
                                        'python -m pip install -r 
                                         requirements.txt',
                                        'cdk synth']))
pipeline.add_stage(DataPipelineStage(self, 'DataPipelineDeploy'))
cicd_pipeline.build_pipeline()


Enter fullscreen mode Exit fullscreen mode

Next step is to configure Slack notifications for CodePipeline, so developers can monitor deployment. For that we use SlackChannelConfiguration construct. We can get value for slack_channel_id by right-clicking channel name and copying last 9 characters of URL. To get slack_workspace_id parameter value, use AWS Chatbot Guide. To define types of notifications we want to get, we use NotificationRule constract. If you want to define events for notification more granularly, use Events for notification rules.



slack = _chatbot.SlackChannelConfiguration(self, 'MySlackChannel',                                           
                 slack_channel_configuration_name='#cicd_events',                                           
                 slack_workspace_id='YOUR_ID',                                          
                 slack_channel_id='YOUR_ID')
slack.role.attach_inline_policy(_iam.Policy(self, 'slack_policy',
      statements=[_iam.PolicyStatement(effect=_iam.Effect.ALLOW,  
                                       actions=['chatbot:*'],
                                       resources=['*'])]))
rule = _notifications.NotificationRule(self, 'NotificationRule',
       source=cicd_pipeline.pipeline,
       detail_type=_notifications.DetailType.BASIC,
       events=['codepipeline-pipeline-pipeline-execution-started',
               'codepipeline-pipeline-pipeline-execution-succeeded',
               'codepipeline-pipeline-pipeline-execution-failed'],
       targets=[slack])


Enter fullscreen mode Exit fullscreen mode

ℹ️ With .pipeline property we refer to the CodePipeline pipeline that deploys the CDK app. It is available only after the pipeline has been constructed with build_pipeline() method. For source argument we should pass not construct, but pipeline object.

After defining pipeline we add stage for Data pipeline deploy. To make our project cleaner, we define stage for Data Pipeline deployment in separate file. For that we use cdk.Stage parent class.



import aws_cdk as cdk
from constructs import Construct
from stacks.data_pipeline_stack import *

class DataPipelineStage(cdk.Stage):
    def __init__(self, scope, construct_id, **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        DataPipelineStack(self, 'DataPipelineStack')


Enter fullscreen mode Exit fullscreen mode

For those of you, who use CDKv1, additional step is to modify cdk.json configuration file, you should add the following expression to context.



"context": {"@aws-cdk/core:newStyleStackSynthesis": true}


Enter fullscreen mode Exit fullscreen mode

At this point, we created all constructs and files for Data pipeline stack. The only thing left is to create app.py with all final steps. We import all constructs we created from cicd_stack.py and create tags for all stack resources.



from stacks.cicd_stack import *

app = cdk.App()
CICDStack(app, 'CodePipelineStack,
          env=cdk.Environment(account='REPLACE WITH_YOUR_ACCOUNT',
                              region='YOUR_REGION'))
cdk.Tags.of(app).add('env', 'prod')
cdk.Tags.of(app).add('creator', 'anna-pastushko')
cdk.Tags.of(app).add('owner', 'ml-team')
app.synth()


Enter fullscreen mode Exit fullscreen mode

Congratulations, we finished creating our stacks. Now, we can finally create CodeCommit repository called data_pipeline_repository and push files to it.

CodeCommit repository creation

We can manually add the same tags as we created in the Stack, so we can see all our resources created for this task bound together in cost reports.

⚠️ Check limitations for CodeBuild in Service Quotas before deployment.

Congratulations, now we can finally deploy our stack to AWS using command cdk deploy and enjoy how all resources are set up automatically.

Athena queries

Let’s start with Glue Crawler creation, for that you need to go to Data Catalog section in Glue console and click on Crawlers. Then you should click on add crawler button and go over all steps. I added the same tags as for other Data pipeline resources, so I can track them together.

Glue Crawler creation

Don’t change crawler source type, add S3 data store and specify path to your bucket in Include path. After that create new or add already existing role and specify how often you want to run it. Then you should create database , in my case I created ecg_data database. After all steps are completed and crawler is created, run it.

That is all we need to query processed_ecg_data table with Athena. Example of simple query can be found below.

Athena query

Account cleanup

In case you want to delete all resources created in your account during development, you should perform the following steps:

  1. Run the following command to delete all stacks resources:cdk destroy CodePipelineStack/DataPipelineDeploy/DataPipelineStack CodePipelineStack
  2. Delete CodeCommit repository
  3. Clean ECR repository and S3 buckets created for Athens and CDK because it can incur costs.
  4. Delete Glue Crawler and Database with tables.

ℹ️ Command cdk destroy will only destroy CodePipeline (CI/CD) stack and stacks that depend on it. Since the application stacks don't depend on the CodePipeline stack, they won't be destroyed. We need to destroy Data pipeline stack separately, there is a discussion on how to delete them both.

It is not very convenient to delete some resources manually and there are several discussions with AWS developers to fix it.

Conclusion

CDK provides you with toolkit for development of applications based on AWS services. It can be challenging at first, but your efforts will pay off at the end. You will be able to manage and transfer your application with one command.

CDK resources and full code can be found in GitHub repository.


Thank you for reading till the end. I do hope it was helpful, please let me know if you spot any mistakes in the comments.

Top comments (4)

Collapse
 
wesleycheek profile image
Wesley Cheek

Nice article, both for the data processing and the CI/CD!

Collapse
 
kasukur profile image
Sri

Very well written, May I ask which tool you have used for the architecture diagrams?

Collapse
 
annpastushko profile image
Anna Pastushko
Collapse
 
kasukur profile image
Sri

Thank you