DEV Community

Cover image for AWS CDK 100 Drill Exercises #007: SQS-Lambda-Firehose —— Building Event-Driven Data Pipelines

AWS CDK 100 Drill Exercises #007: SQS-Lambda-Firehose —— Building Event-Driven Data Pipelines

Level 300

Introduction

This is the 8th installment of "AWS CDK 100 Drill Exercises". For the series overview, see here.

In this exercise, we'll build an event-driven data pipeline combining SQS, Lambda, and Firehose.

Why SQS-Lambda-Firehose?

Feature Benefit
Event-Driven Loosely coupled and scalable
Reliability Robust error handling with DLQ and batch failure reporting
Cost Efficiency Serverless with pay-per-use pricing
Reduced Ops Minimize infrastructure management with managed services

What You'll Learn

  • SQS + Dead Letter Queue design
  • Lambda ReportBatchItemFailures for batch processing
  • Firehose → S3 streaming delivery
  • Production monitoring with CloudWatch Alarms

📁 Code Repository: GitHub


Architecture Overview

Architecture Overview

Data Flow

Producer → SQS Queue → Lambda → Firehose → S3
                ↓ (on failure)
           Dead Letter Queue
Enter fullscreen mode Exit fullscreen mode

Key Components and Design Points

Component Design Points
SQS Queue Long Polling (20s), Visibility Timeout (30s), SSL enforcement
Dead Letter Queue Move to DLQ after 3 failures, 14-day retention
Lambda Batch size 5, ReportBatchItemFailures enabled, X-Ray enabled
SQS Queue (for Failure Lambda) Long Polling (20s), Visibility Timeout (30s), SSL enforcement
Lambda (Failure) Lambda without Firehose permissions for DLQ behavior validation
Firehose 1min/1MB buffering, partitioned prefix
S3 Lifecycle management (60d→IA, 90d→Glacier, 365d→Delete)
CloudWatch 8 alarms + SNS notifications

Implementation Highlights

1. SQS + Dead Letter Queue

DLQ configuration is critical for isolating failed messages and enabling investigation and reprocessing.

See: stacks/sqs-lambda-firehose-stack.ts#L35

// Dead Letter Queue
const deadLetterQueue = new sqs.Queue(this, 'DeadLetterQueue', {
  retentionPeriod: cdk.Duration.days(14),
  enforceSSL: true,
});

// Main Queue with DLQ
const queue = new sqs.Queue(this, 'MainQueue', {
  visibilityTimeout: cdk.Duration.seconds(30),
  receiveMessageWaitTime: cdk.Duration.seconds(20), // Long Polling
  deadLetterQueue: {
    maxReceiveCount: 3, // Move to DLQ after 3 failures
    queue: deadLetterQueue,
  },
  enforceSSL: true,
});
Enter fullscreen mode Exit fullscreen mode

Best Practice: Set Visibility Timeout to at least 6 times the Lambda timeout (see documentation)

2. Lambda - ReportBatchItemFailures

When only some messages in a batch fail, you can reprocess only the failed ones.

lambdaFunction.addEventSource(
  new lambdaEventSources.SqsEventSource(queue, {
    batchSize: 5,
    reportBatchItemFailures: true, // Support partial failures
  })
);
Enter fullscreen mode Exit fullscreen mode

Without Powertools vs With Powertools

❌ Without Powertools (Manual Implementation)

def lambda_handler(event, context):
    records = event.get("Records", [])
    batch_item_failures = []

    for record in records:
        message_id = record.get("messageId")
        message_body = record.get("body", "")

        try:
            process_message(message_body)
        except Exception as e:
            # Manually add failed record ID
            batch_item_failures.append({"itemIdentifier": message_id})

    # Manually construct response format
    return {"batchItemFailures": batch_item_failures}
Enter fullscreen mode Exit fullscreen mode

Problems:

  • Need to manually add itemIdentifier to batch_item_failures
  • Must construct response format accurately
  • Lots of error handling boilerplate
  • Testing becomes complex

✅ With Powertools (Recommended)

from aws_lambda_powertools.utilities.batch import (
    BatchProcessor, EventType, process_partial_response
)

processor = BatchProcessor(event_type=EventType.SQS)

def record_handler(record):
    """Process each record - just raise exception on failure"""
    payload = record.json_body
    send_to_firehose(payload)

def lambda_handler(event, context):
    return process_partial_response(
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context
    )
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • itemIdentifier extraction and setting are automated
  • No need to worry about response format
  • record_handler can focus solely on processing logic
  • Just raise an exception on failure
Item Manual Implementation Powertools
Code Volume Large Small
Bug Risk High (itemIdentifier mistakes, etc.) Low
Test Ease Complex Simple
Metrics Manual addition Auto-collection available

📝 Complete Lambda Function Code (Python)

See: sqs-firehose-powertools/index.py

import json
import os
import boto3
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.utilities.batch import (
    BatchProcessor, EventType, process_partial_response
)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

logger = Logger()
tracer = Tracer()
metrics = Metrics()

processor = BatchProcessor(event_type=EventType.SQS)
firehose_client = boto3.client('firehose')
delivery_stream_name = os.environ['FIREHOSE_DELIVERY_STREAM_NAME']

@tracer.capture_method
def record_handler(record: SQSRecord):
    """Process individual record"""
    payload = record.json_body
    logger.info("Processing message", extra={"message_id": record.message_id})

    response = firehose_client.put_record(
        DeliveryStreamName=delivery_stream_name,
        Record={'Data': json.dumps(payload) + '\n'}
    )

    logger.info("Sent to Firehose", extra={"record_id": response['RecordId']})
    metrics.add_metric(name="ProcessedMessages", unit="Count", value=1)

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
    return process_partial_response(
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context
    )
Enter fullscreen mode Exit fullscreen mode

3. Firehose - Partitioned Delivery

Improves S3 query performance and analysis efficiency with Athena and similar tools.

See: stacks/sqs-lambda-firehose-stack.ts#L168

See: dev-params.ts#L69

const deliveryStream = new firehose.DeliveryStream(this, 'DeliveryStream', {
  destination: new firehose.S3Bucket(bucket, {
    dataOutputPrefix: '!{timestamp:yyyy/MM/dd}/',      // Partition by date
    errorOutputPrefix: '!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/',
    bufferingInterval: cdk.Duration.minutes(1),
    bufferingSize: cdk.Size.mebibytes(1),
  }),
});
Enter fullscreen mode Exit fullscreen mode

4. S3 Lifecycle Management

See: stacks/sqs-lambda-firehose-stack.ts#L88

bucket.addLifecycleRule({
  transitions: [
    { storageClass: s3.StorageClass.INFREQUENT_ACCESS, transitionAfter: cdk.Duration.days(60) },
    { storageClass: s3.StorageClass.GLACIER, transitionAfter: cdk.Duration.days(90) },
  ],
  expiration: cdk.Duration.days(365),
});
Enter fullscreen mode Exit fullscreen mode

📝 Complete Stack Implementation Code

import * as cdk from 'aws-cdk-lib/core';
import { Construct } from 'constructs';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as pythonLambda from '@aws-cdk/aws-lambda-python-alpha';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as logs from 'aws-cdk-lib/aws-logs';

export class SqsLambdaFirehoseStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: SqsLambdaFirehoseStackProps) {
    super(scope, id, props);

    // 1. Dead Letter Queue
    const deadLetterQueue = new sqs.Queue(this, 'DeadLetterQueue', {
      retentionPeriod: cdk.Duration.days(14),
      enforceSSL: true,
      encryption: sqs.QueueEncryption.SQS_MANAGED,
    });

    // 2. Main Queue
    const queue = new sqs.Queue(this, 'MainQueue', {
      visibilityTimeout: cdk.Duration.seconds(30),
      retentionPeriod: cdk.Duration.days(4),
      receiveMessageWaitTime: cdk.Duration.seconds(20),
      deadLetterQueue: { maxReceiveCount: 3, queue: deadLetterQueue },
      enforceSSL: true,
    });

    // 3. S3 Bucket
    const bucket = new s3.Bucket(this, 'DataBucket', {
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
      enforceSSL: true,
      versioned: true,
      encryption: s3.BucketEncryption.S3_MANAGED,
    });

    bucket.addLifecycleRule({
      transitions: [
        { storageClass: s3.StorageClass.INFREQUENT_ACCESS, transitionAfter: cdk.Duration.days(60) },
        { storageClass: s3.StorageClass.GLACIER, transitionAfter: cdk.Duration.days(90) },
      ],
      expiration: cdk.Duration.days(365),
    });

    // 4. Firehose
    const deliveryStream = new firehose.DeliveryStream(this, 'DeliveryStream', {
      destination: new firehose.S3Bucket(bucket, {
        dataOutputPrefix: '!{timestamp:yyyy/MM/dd}/',
        errorOutputPrefix: '!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/',
        bufferingInterval: cdk.Duration.minutes(1),
        bufferingSize: cdk.Size.mebibytes(1),
      }),
    });

    // 5. Lambda Function
    const lambdaFunction = new pythonLambda.PythonFunction(this, 'ProcessorFunction', {
      runtime: lambda.Runtime.PYTHON_3_14,
      handler: 'lambda_handler',
      entry: '../../common/src/python-lambda/sqs-firehose',
      timeout: cdk.Duration.seconds(5),
      memorySize: 256,
      environment: {
        FIREHOSE_DELIVERY_STREAM_NAME: deliveryStream.deliveryStreamName,
      },
      tracing: lambda.Tracing.ACTIVE,
      loggingFormat: lambda.LoggingFormat.JSON,
    });

    // 6. Permissions & Event Source
    queue.grantConsumeMessages(lambdaFunction);
    deliveryStream.grantPutRecords(lambdaFunction);

    lambdaFunction.addEventSource(
      new lambdaEventSources.SqsEventSource(queue, {
        batchSize: 5,
        reportBatchItemFailures: true,
      })
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

CloudWatch Monitoring

In production, we configure 8 alarms with SNS notifications.

Alarm List

Category Alarm Threshold Purpose
SQS ApproximateAgeOfOldestMessage 300s Detect message backlog
SQS NumberOfEmptyReceives 100 Detect Long Polling issues
DLQ ApproximateNumberOfMessagesVisible 1 Immediate failure detection
Firehose DeliveryToS3.DataFreshness 900s Detect delivery delays
Firehose ThrottledRecords 1 Detect throttling
Firehose IncomingBytes Rate 80% Quota utilization
Firehose IncomingRecords Rate 80% Quota utilization
Firehose IncomingPutRequests Rate 80% Quota utilization

DLQ Alarm (Most Critical)

See: stacks/sqs-lambda-firehose-stack.ts#L369

const dlqAlarm = deadLetterQueue
  .metricApproximateNumberOfMessagesVisible()
  .createAlarm(this, 'DlqAlarm', {
    threshold: 1,  // Alert even with 1 message
    evaluationPeriods: 1,
  });
Enter fullscreen mode Exit fullscreen mode

Firehose Quota Monitoring (Math Expression)

See: stacks/sqs-lambda-firehose-stack.ts#L390

const incomingBytesRateAlarm = new cw.Alarm(this, 'IncomingBytesRateAlarm', {
  threshold: 80, // Alert at 80% usage
  metric: new cw.MathExpression({
    expression: '100*(m1/300/m2)',  // Calculate usage as %
    usingMetrics: {
      m1: firehose.metric('IncomingBytes', { statistic: 'Sum' }),
      m2: firehose.metric('BytesPerSecondLimit', { statistic: 'Minimum' }),
    },
  }),
});
Enter fullscreen mode Exit fullscreen mode

📝 SNS Notification Integration Code

const topic = new sns.Topic(this, 'AlertTopic', {
  displayName: 'SQS-Firehose Alerts',
});

[dlqAlarm, sqsAgeAlarm, firehoseFreshnessAlarm, ...otherAlarms].forEach(alarm => {
  alarm.addAlarmAction(new cw_actions.SnsAction(topic));
});
Enter fullscreen mode Exit fullscreen mode

Deployment & Verification

npm run stage:deploy:all -w workspaces/sqs-lambda-firehose --project=myproject --env=dev
Enter fullscreen mode Exit fullscreen mode

See: test-scripts/send-messages.sh

# Send test messages
./test-scripts/send-messages.sh --env dev --project myproject

# Verify data is saved in S3
./test-scripts/check-s3.sh --env dev --project myproject
Enter fullscreen mode Exit fullscreen mode

Best Practices Summary

Component Recommended Avoid
SQS Long Polling, DLQ configuration, SSL enforcement Short Polling, no DLQ
Lambda ReportBatchItemFailures, appropriate batch size (5-10) Too large batches, no error handling
Firehose Partitioning, 1-5MB buffer No partitioning, excessively long buffer time
S3 Lifecycle management, encryption No lifecycle, public access

Cost Estimation

💰 Monthly Estimate (Tokyo Region, Low-to-Medium Usage)

Service Usage Monthly Cost
SQS 1M requests $0.40
Lambda 1M requests, 256MB $0.83
Firehose 1GB delivery $0.03
S3 10GB~60GB $0.50~1.00
CloudWatch 5GB Logs $0.27

Total: ~$7-10/month


Summary

What we learned from this pattern:

  1. SQS + DLQ: Reliable message processing
  2. ReportBatchItemFailures: Efficient handling of partial failures
  3. Firehose Partitioning: Analytics-friendly data storage
  4. CloudWatch Alarms: Essential monitoring for production

References


Let's continue learning practical AWS CDK patterns through the 100 drill exercises!
If you found this helpful, please ⭐ the repository!

📌 You can see the entire code in My GitHub Repository.

Top comments (0)