DEV Community

Cover image for Setting up a Queue Ingestion system for S3 to Redshift Transfer
Sreekesh Iyer for AWS Community Builders

Posted on • Edited on

16 1 1 1 1

Setting up a Queue Ingestion system for S3 to Redshift Transfer

I've always been curious about data pipelines and how we are able to work with data engineering in the middle of everything in my company.

Literally one object moves in 500 different formats through 500 different systems for perhaps 500 different purposes.

The base object is a transaction that has all the raw L0 data stored in Object Storage. However we cannot pull out Object reads all day for analytics, so this needs to be moved into a separate database. Let me correct myself - moved into multiple databases in different forms, for different reasons - Confirmations, Regulatory Reporting, etc.

I was trying to see if I could develop a PoC of sorts, that can simulate this pipeline with the help of our friends in AWS.

Here's what we have -

  • Amazon S3: Object Storage
  • Amazon SQS: Message Queue
  • Amazon Redshift: SQL-ish database for preserving transformed data for further analysis.
  • AWS Lambda: Compute for performing business logic to enrich the base data, push this to SQS and then pull it from SQS again to send it out to Redshift.

--

Architecture

Let me bring up the architecture diagram from the cover image again.
We're storing the initial piece of information in S3. This is where our first Lambda function gets triggered.

We take the baseline object and perform some analysis and transformation on it. Consider this as your business logic that you'd perform at L1 before the data is sent further downstream.

The transformed object is then sent out via the SQS Queue. This message gets fetched by the 2nd Lambda which is virtually polling the queue. Finally, this message is transformed into an SQL Query and the data is inserted into a database in Redshift serverless.

Now that we have a better picture, let's hop into action.

Setting up the Queue

FIFO Queue in Amazon SQS

We can setup a simple FIFO Queue in Amazon SQS to ensure ordered delivery of messages. I'll walk you through the code separately where we avoid message duplication.

The 1st Lambda Function

We're writing a function that is waiting for an object to be loaded into S3 so that it can be processed.

Lambda Visuals

I didn't find an exact method to set the SQS queue as the destination, I am not sure if it's a permission issue because I wouldn't be able to touch it via boto3 either, but I was. Here, the S3 is our trigger and SQS is our programmatic destination.

The code is available on GitHub.

Here's where the business logic for our transactions goes in. Given that this is serverless and decoupled fully, this can be as complex as it can get. I've attached a simplified version below -

Business Logic for Transactions

Object Storage via S3

S3 Bucket Visuals

It can be a standard bucket in the same region, just make sure you add this event notification in the S3 Properties to invoke the Lambda we just created.

Event Notification in S3 Properties

The 2nd Lambda

We're now inside the analytics side of things where it'a all about ETL and further processing in Redshift. For starters, we need a function that can trigger from an SQS message and subsequently pass it on to Amazon Redshift.

SQS Response to Redshift Lambda Function

Here's what this function looks like. Waits for a message in the SQS queue, performs a validation and pushes it to Redshift. Since we are using a FIFO Queue, we get access to a Deduplication value from the SQS Message which we can use to filter out duplicate messages.

Deduplication Logic

We can also perform type-validations easily because "thanks Python!"

Validations

Redshift - the last checkpoint

Redshift Query Editor

You can make use of the default workgroup in Redshift serverless and the base "dev" database where you can create a new table "transactions" to insert all of this data.

Object Sample

After that, all you need to do is upload a JSON file to the S3 Bucket that looks like this and see the magic:)

I hope you found this thread informative. Cheers, Happy Holidays!

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more →

Top comments (4)

Collapse
 
ike_nefcy_0d69f7121bdf0ae profile image
Ike Nefcy •

Just use s3 copy

Collapse
 
sreekeshiyer profile image
Sreekesh Iyer •

Hey, thanks! I could've done that if the object was to be sent as-is. But there is a compute layer in between (that can actually become quite heavy depending on the business logic) so we need a Lambda in the middle.

Collapse
 
gaurav_chandratre profile image

You can use firehose stream to send message to redshift (from first lambda)and use redshift inbuilt zero etl integration with firehose.

Collapse
 
sreekeshiyer profile image
Sreekesh Iyer •

Thank you! I've never used Firehose before - I'll give that a shot.

Best Practices for Running  Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK cover image

Best Practices for Running Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK

This post discusses the process of migrating a growing WordPress eShop business to AWS using AWS CDK for an easily scalable, high availability architecture. The detailed structure encompasses several pillars: Compute, Storage, Database, Cache, CDN, DNS, Security, and Backup.

Read full post