DEV Community

Cover image for Serverless ETL/ELT Architecture with S3, EventBridge, Lambda, Step Functions, and Glue
Renaldi for AWS Community Builders

Posted on

Serverless ETL/ELT Architecture with S3, EventBridge, Lambda, Step Functions, and Glue

In this post, I will walk through a production-style serverless ETL/ELT architecture on AWS using Amazon S3, Amazon EventBridge, AWS Lambda, AWS Step Functions, and AWS Glue. I will cover the full flow from event-driven ingestion to validation, quarantine handling, orchestration, schema drift handling, data quality checks, and replay.

I am intentionally designing this as a pattern that can support both ETL and ELT:

  • ETL when I perform transformations in Glue before landing curated outputs
  • ELT when I land validated/raw data first and defer transformation to downstream query engines or warehouse jobs

This architecture is a strong fit for data lake ingestion pipelines where I want:

  • event-driven automation
  • low operational overhead
  • clear failure handling
  • replayability
  • observability
  • and enough flexibility to survive real-world data messiness

A few implementation choices in this post are deliberate:

  • I use Step Functions Standard (not Express) because I want durable, auditable executions and because the common Glue .sync integration pattern is not supported in Express workflows. AWS documents the Standard vs Express execution semantics and notes that Express does not support .sync job-run service integration patterns. (docs.aws.amazon.com)
  • I use S3 Event Notifications via EventBridge for flexible routing and filtering. AWS also notes that after enabling EventBridge delivery on an S3 bucket, it can take around five minutes for changes to take effect, which is worth remembering during testing. (docs.aws.amazon.com)

Why this architecture works well in practice

When I build data ingestion pipelines, the biggest problems are usually not the happy path. They are:

  • malformed files
  • duplicate deliveries
  • late files
  • schema drift
  • partial failures
  • silently bad data
  • and the inability to replay safely

This pattern addresses those operational concerns directly:

  • S3 + EventBridge gives me flexible event routing
  • Lambda handles fast validation/classification
  • Step Functions gives me explicit orchestration, branching, retries, and auditability
  • Glue handles scalable transformation and schema normalization
  • Glue Data Quality (or equivalent checks) enforces quality gates before data becomes trusted

AWS Glue Data Quality is a managed, serverless capability built on DeeQu and uses DQDL (Data Quality Definition Language), which is useful when I want rules-as-code for quality checks. (docs.aws.amazon.com)


Architecture Overview

At a high level, I split the data lake into zones and make each stage explicit:

  • raw-ingest bucket: original landing location (immutable input)
  • validated bucket: files that passed lightweight ingest validation
  • quarantine bucket: invalid files, bad schema changes, or failed quality checks
  • curated bucket: transformed and query-ready outputs
  • dq-results bucket: data quality evaluation outputs and reports
  • replay-manifest bucket: manifests used to reprocess files safely

The flow is:

  1. A producer drops a file in raw-ingest
  2. S3 emits an event to EventBridge
  3. EventBridge triggers a validation Lambda
  4. Lambda validates and stages valid files into validated
  5. Lambda starts Step Functions with a normalized payload
  6. Step Functions runs Glue transformation
  7. Step Functions runs data quality checks
  8. Pass -> publish success / expose curated data
  9. Fail -> quarantine and notify
  10. Replay can be triggered later using replay manifests


End-to-End Walkthrough

1) Event-driven ingest (S3 -> EventBridge)

I start by enabling EventBridge notifications on the raw ingest bucket. This gives me centralized event filtering/routing in EventBridge rather than hardwiring everything as direct S3 notifications.

AWS documents S3 EventBridge integration and bucket-level enablement for EventBridge event delivery. (docs.aws.amazon.com)

Example: enable EventBridge on the raw ingest bucket

aws s3api put-bucket-notification-configuration \
  --bucket my-raw-ingest-bucket \
  --notification-configuration '{
    "EventBridgeConfiguration": {}
  }'
Enter fullscreen mode Exit fullscreen mode

2) Event filtering in EventBridge

I usually create one or more EventBridge rules to target datasets by path prefix and file type. This lets me route different datasets into different workflows without changing the producers.

Example EventBridge rule pattern (illustrative)

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["my-raw-ingest-bucket"]
    },
    "object": {
      "key": [
        { "prefix": "landing/orders/" }
      ]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

From here, I send the event to a Validation Lambda (or directly to Step Functions if I want validation inside the state machine).


3) Validation and quarantine buckets

I like to separate lightweight ingest validation from heavy transforms:

  • file naming / suffix checks
  • minimum size checks
  • header checks (CSV/JSON)
  • partition key extraction from path
  • duplicate detection / idempotency signal generation
  • metadata enrichment (dataset, source, arrival timestamp, run ID)

If validation passes, I stage the object into a validated bucket.
If it fails, I route it to quarantine with a reason code.

This gives me two important benefits:

  1. I preserve the original raw object for traceability
  2. I avoid sending obviously bad data into expensive Glue runs

Example validation Lambda (Python)

import json
import os
import uuid
import urllib.parse
from datetime import datetime, timezone

import boto3

s3 = boto3.client("s3")
sfn = boto3.client("stepfunctions")

VALIDATED_BUCKET = os.environ["VALIDATED_BUCKET"]
STATE_MACHINE_ARN = os.environ["STATE_MACHINE_ARN"]

ALLOWED_SUFFIXES = (".csv", ".json", ".jsonl")
MIN_BYTES = 10  # example threshold


def _extract_dataset(key: str) -> str:
    # Example key: landing/orders/2026/02/25/orders_001.csv
    parts = key.split("/")
    if len(parts) < 2 or parts[0] != "landing":
        raise ValueError("invalid_key_prefix")
    return parts[1]


def _validate_object(bucket: str, key: str) -> dict:
    head = s3.head_object(Bucket=bucket, Key=key)
    size = head["ContentLength"]

    if not key.lower().endswith(ALLOWED_SUFFIXES):
        return {"valid": False, "reason": "unsupported_extension"}

    if size < MIN_BYTES:
        return {"valid": False, "reason": "file_too_small"}

    dataset = _extract_dataset(key)

    # Optional: inspect a small header/sample for CSV/JSON sanity
    sample = s3.get_object(Bucket=bucket, Key=key, Range="bytes=0-1023")["Body"].read()
    if not sample:
        return {"valid": False, "reason": "empty_sample"}

    return {
        "valid": True,
        "dataset": dataset,
        "size_bytes": size,
        "content_type": head.get("ContentType", "application/octet-stream"),
        "etag": head.get("ETag", "").strip('"'),
    }


def _stage_validated_copy(src_bucket: str, src_key: str, dataset: str, run_id: str) -> str:
    # I usually add a run/date prefix for traceability
    now = datetime.now(timezone.utc)
    dst_key = (
        f"validated/{dataset}/"
        f"ingest_date={now:%Y-%m-%d}/"
        f"run_id={run_id}/"
        f"{src_key.split('/')[-1]}"
    )

    s3.copy_object(
        Bucket=VALIDATED_BUCKET,
        Key=dst_key,
        CopySource={"Bucket": src_bucket, "Key": src_key},
        MetadataDirective="REPLACE",
        Metadata={
            "source_bucket": src_bucket,
            "source_key": src_key,
            "run_id": run_id,
            "dataset": dataset,
            "validated_at": now.isoformat()
        }
    )
    return dst_key


def lambda_handler(event, context):
    """
    Supports direct EventBridge S3 events.
    """
    detail = event["detail"]
    src_bucket = detail["bucket"]["name"]
    src_key = urllib.parse.unquote_plus(detail["object"]["key"])

    run_id = str(uuid.uuid4())
    validation = _validate_object(src_bucket, src_key)

    payload = {
        "runId": run_id,
        "source": {
            "bucket": src_bucket,
            "key": src_key
        },
        "validation": validation,
        "receivedAt": datetime.now(timezone.utc).isoformat()
    }

    if validation["valid"]:
        dst_key = _stage_validated_copy(src_bucket, src_key, validation["dataset"], run_id)
        payload["validated"] = {
            "bucket": VALIDATED_BUCKET,
            "key": dst_key,
            "s3Uri": f"s3://{VALIDATED_BUCKET}/{dst_key}"
        }

    # Start orchestration (Step Functions Standard)
    sfn.start_execution(
        stateMachineArn=STATE_MACHINE_ARN,
        name=run_id,
        input=json.dumps(payload)
    )

    return {"statusCode": 202, "body": json.dumps({"runId": run_id})}
Enter fullscreen mode Exit fullscreen mode

Implementation notes for validation

A few things I recommend in production:

  • Do not rely on ETag as a universal checksum for multipart uploads
  • Keep validation Lambda fast and deterministic
  • Record a consistent run ID and propagate it through every stage
  • Treat the raw bucket as immutable evidence
  • Add bucket lifecycle policies for quarantine and intermediate zones

4) Orchestration for transforms (Step Functions)

This is where the pipeline becomes operationally manageable.

I use Step Functions to:

  • branch on validation status
  • invoke quarantine logic on failure
  • invoke Glue for transform
  • run data quality checks
  • branch on DQ result
  • emit success/failure events/notifications
  • maintain an auditable execution trail

AWS documents the Step Functions Glue integration for starting Glue jobs, which is exactly what I use here. (docs.aws.amazon.com)

Example Step Functions state machine (ASL JSON, simplified)

{
  "Comment": "Serverless ETL/ELT pipeline with validation, transform, DQ, and quarantine",
  "StartAt": "ValidationPassed?",
  "States": {
    "ValidationPassed?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.validation.valid",
          "BooleanEquals": true,
          "Next": "PersistMetadata"
        }
      ],
      "Default": "QuarantineInvalidInput"
    },

    "PersistMetadata": {
      "Type": "Pass",
      "Parameters": {
        "runId.$": "$.runId",
        "source.$": "$.source",
        "validated.$": "$.validated",
        "validation.$": "$.validation",
        "receivedAt.$": "$.receivedAt"
      },
      "Next": "RunGlueTransform"
    },

    "RunGlueTransform": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "orders-curation-job",
        "Arguments": {
          "--run_id.$": "$.runId",
          "--input_s3_uri.$": "$.validated.s3Uri",
          "--output_s3_uri": "s3://my-curated-bucket/curated/orders/",
          "--dataset": "orders"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["Glue.ConcurrentRunsExceededException", "States.TaskFailed"],
          "IntervalSeconds": 10,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.error",
          "Next": "QuarantineTransformFailure"
        }
      ],
      "Next": "RunDataQualityChecks"
    },

    "RunDataQualityChecks": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "dq-evaluator-lambda",
        "Payload": {
          "runId.$": "$.runId",
          "dataset": "orders",
          "curatedS3Prefix": "s3://my-curated-bucket/curated/orders/"
        }
      },
      "ResultPath": "$.dqInvoke",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.error",
          "Next": "QuarantineDQFailure"
        }
      ],
      "Next": "DQPassed?"
    },

    "DQPassed?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.dqInvoke.Payload.pass",
          "BooleanEquals": true,
          "Next": "PublishSuccess"
        }
      ],
      "Default": "QuarantineDQFailure"
    },

    "PublishSuccess": {
      "Type": "Pass",
      "Result": {
        "status": "SUCCEEDED"
      },
      "End": true
    },

    "QuarantineInvalidInput": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "quarantine-lambda",
        "Payload": {
          "runId.$": "$.runId",
          "source.$": "$.source",
          "reason.$": "$.validation.reason",
          "stage": "INGEST_VALIDATION"
        }
      },
      "End": true
    },

    "QuarantineTransformFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "quarantine-lambda",
        "Payload": {
          "runId.$": "$.runId",
          "source.$": "$.source",
          "reason": "GLUE_TRANSFORM_FAILURE",
          "error.$": "$.error",
          "stage": "TRANSFORM"
        }
      },
      "End": true
    },

    "QuarantineDQFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "quarantine-lambda",
        "Payload": {
          "runId.$": "$.runId",
          "source.$": "$.source",
          "reason": "DATA_QUALITY_FAILURE",
          "dqResult.$": "$.dqInvoke.Payload",
          "stage": "DATA_QUALITY"
        }
      },
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Why I prefer Standard over Express here

This pattern tends to include:

  • Glue jobs
  • multiple branches
  • retries
  • audit needs
  • potential human investigation/replay

Step Functions Standard gives me durability and auditability, and AWS documents that Standard and Express have different execution semantics, durations, and integration pattern support (including the .sync limitation in Express). (docs.aws.amazon.com)


Glue transformation implementation (ETL core)

For the transformation layer, I use AWS Glue because it scales well and integrates naturally with S3 and the Data Catalog.

AWS Glue’s DynamicFrame is especially useful when incoming schemas are not cleanly stable because records are self-describing and schema inconsistencies can be represented as choice/union types. AWS also documents resolveChoice() as a recommended approach to handle multi-typed fields. (docs.aws.amazon.com)

Example Glue job script (PySpark, simplified)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F

args = getResolvedOptions(sys.argv, [
    "JOB_NAME",
    "run_id",
    "input_s3_uri",
    "output_s3_uri",
    "dataset"
])

sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

run_id = args["run_id"]
input_s3_uri = args["input_s3_uri"]
output_s3_uri = args["output_s3_uri"]
dataset = args["dataset"]

# Example assumes CSV; in production I usually parameterize format/options
raw_dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [input_s3_uri], "recurse": True},
    format="csv",
    format_options={
        "withHeader": True,
        "separator": ",",
        "quoteChar": "\""
    },
    transformation_ctx="raw_dyf"
)

# Handle schema drift / ambiguous types
# Example: amount might arrive as string in some files and numeric in others
resolved_dyf = raw_dyf.resolveChoice(specs=[
    ("amount", "cast:double"),
    ("order_id", "cast:string"),
    ("customer_id", "cast:string")
])

# Convert to Spark DataFrame for richer transforms
df = resolved_dyf.toDF()

# Add operational metadata
df = (
    df
    .withColumn("pipeline_run_id", F.lit(run_id))
    .withColumn("dataset_name", F.lit(dataset))
    .withColumn("processed_at_utc", F.current_timestamp())
)

# Example normalization
if "currency" in df.columns:
    df = df.withColumn("currency", F.upper(F.col("currency")))

if "event_ts" in df.columns:
    df = df.withColumn("event_ts", F.to_timestamp("event_ts"))

# Example partition column
df = df.withColumn("ingest_date", F.to_date(F.col("processed_at_utc")))

# Optional: enforce target schema / select only known columns (ELT-friendly alternative is to keep extras)
target_columns = [c for c in [
    "order_id", "customer_id", "amount", "currency", "event_ts",
    "pipeline_run_id", "dataset_name", "processed_at_utc", "ingest_date"
] if c in df.columns]

df_out = df.select(*target_columns)

# Write curated data
(
    df_out.write
    .mode("append")
    .format("parquet")
    .partitionBy("ingest_date")
    .save(output_s3_uri)
)

job.commit()
Enter fullscreen mode Exit fullscreen mode

Schema drift handling (the part that usually breaks first)

Schema drift is not one problem. It is several different problems:

  • additive drift (new columns appear)
  • type drift (same column changes type)
  • structural drift (nested shape changes)
  • semantic drift (same field name, different meaning)
  • contract drift (breaking changes without notice)

I recommend treating them differently.

My practical schema drift policy

1) Additive columns (usually safe)

If a producer adds a new column and my downstream consumers can tolerate it, I usually:

  • allow the file
  • retain the new column in a bronze/validated or semi-curated layer
  • update downstream mappings later

2) Type drift (common and dangerous)

Example:

  • amount was numeric yesterday
  • today it arrives as "12.34" string
  • next week it arrives as "N/A"

This is where Glue DynamicFrame + resolveChoice() helps because I can explicitly coerce or split logic for ambiguous types. AWS documents DynamicFrame and resolveChoice() for schema inconsistencies and choice types. (docs.aws.amazon.com)

My preference:

  • cast when safe
  • preserve raw/original field if coercion may lose information
  • quarantine only when the drift breaks contractual expectations for trusted datasets

3) Breaking structural changes (often quarantine-worthy)

If a file violates the expected envelope (for example, missing required keys, wrong delimiter, broken nesting), I quarantine it and emit an alert.

4) Catalog and schema tracking

I typically track schema evolution using one or both:

  • Glue Data Catalog + crawler/table versioning
  • a dataset contract file (JSON/YAML) in a config repo or S3

For higher maturity pipelines, I define a per-dataset schema policy:

  • allow_additive_columns: true
  • allow_type_coercion: limited
  • strict_required_columns: ["order_id", "event_ts"]
  • quarantine_on_breaking_change: true

Data quality checks (quality gates before trust)

Schema validity is not data quality.

A file can be structurally valid and still be unusable:

  • duplicate IDs
  • null business keys
  • stale data
  • impossible values
  • code list mismatches
  • row counts far outside expected ranges

I usually place data quality checks after transformation and before I publish the data as trusted/curated.

AWS Glue Data Quality supports rules via DQDL and can be used in Glue ETL jobs or against Data Catalog datasets. (docs.aws.amazon.com)

Example DQDL ruleset (illustrative)

Rules = [
  RowCount > 0,
  IsComplete "order_id",
  IsUnique "order_id",
  ColumnExists "event_ts",
  ColumnValues "currency" in ["AUD","USD","EUR"],
  ColumnDataType "amount" = "DOUBLE"
]
Enter fullscreen mode Exit fullscreen mode

Example DQ evaluator Lambda (simple custom approach)

In some environments, I start with a lightweight custom Lambda check before moving to full Glue DQ. This is especially useful for small curated datasets or when I want fast control over business-specific rules.

import json
import os
import re

import boto3
import pyarrow.parquet as pq
import s3fs

s3 = boto3.client("s3")

def _parse_s3_uri(uri: str):
    m = re.match(r"^s3://([^/]+)/?(.*)$", uri)
    if not m:
        raise ValueError("Invalid S3 URI")
    return m.group(1), m.group(2)

def lambda_handler(event, context):
    run_id = event["runId"]
    curated_prefix = event["curatedS3Prefix"]

    bucket, prefix = _parse_s3_uri(curated_prefix)
    resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1000)

    objects = [o["Key"] for o in resp.get("Contents", []) if o["Key"].endswith(".parquet")]
    if not objects:
        return {"pass": False, "reason": "NO_PARQUET_OUTPUT", "runId": run_id}

    # Example: check first file quickly (for large jobs, use Glue DQ or Athena-based checks)
    fs = s3fs.S3FileSystem()
    first_uri = f"{bucket}/{objects[0]}"
    table = pq.read_table(first_uri, filesystem=fs)
    df = table.to_pandas()

    required_cols = ["order_id", "amount"]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        return {"pass": False, "reason": "MISSING_COLUMNS", "missing": missing, "runId": run_id}

    if df["order_id"].isnull().any():
        return {"pass": False, "reason": "NULL_ORDER_ID", "runId": run_id}

    if (df["amount"].astype(float) < 0).any():
        return {"pass": False, "reason": "NEGATIVE_AMOUNT", "runId": run_id}

    return {"pass": True, "runId": run_id, "checkedFiles": len(objects)}
Enter fullscreen mode Exit fullscreen mode

My recommendation in production

  • Start with a small ruleset that reflects actual business criticality
  • Separate hard-fail rules (must quarantine) from warn rules (monitor only)
  • Persist DQ results to an S3 prefix and emit metrics (pass rate, failed rules, affected rows)

Replay design (safe reprocessing without chaos)

Replay is one of the most under-designed parts of data pipelines.

If I cannot replay safely, operations become manual and risky very quickly.

What I want from replay

  • deterministic input selection
  • idempotent processing
  • clear audit trail
  • no accidental duplicate publishing
  • support for single-file and bulk replays

Replay pattern used here

I use a replay manifest dropped into a replay bucket. That manifest triggers the replay dispatcher Lambda (via EventBridge), which then starts the same Step Functions state machine with replay metadata.

This gives me:

  • a consistent path for normal and replay runs
  • a clear replay artifact I can review/version
  • easy automation from scripts or ops tooling

Example replay manifest

{
  "replayId": "replay-2026-02-25-orders-fix-01",
  "reason": "Reprocess files after schema mapping fix",
  "dataset": "orders",
  "files": [
    {
      "bucket": "my-raw-ingest-bucket",
      "key": "landing/orders/2026/02/24/orders_001.csv"
    },
    {
      "bucket": "my-raw-ingest-bucket",
      "key": "landing/orders/2026/02/24/orders_002.csv"
    }
  ],
  "options": {
    "force": false,
    "dryRun": false
  }
}
Enter fullscreen mode Exit fullscreen mode

Example replay dispatcher Lambda (conceptual)

import json
import os
import uuid
import urllib.parse

import boto3

s3 = boto3.client("s3")
sfn = boto3.client("stepfunctions")

STATE_MACHINE_ARN = os.environ["STATE_MACHINE_ARN"]

def lambda_handler(event, context):
    detail = event["detail"]
    bucket = detail["bucket"]["name"]
    key = urllib.parse.unquote_plus(detail["object"]["key"])

    obj = s3.get_object(Bucket=bucket, Key=key)
    manifest = json.loads(obj["Body"].read())

    replay_id = manifest["replayId"]

    for i, f in enumerate(manifest["files"]):
        run_id = f"{replay_id}-{i:04d}"
        payload = {
            "runId": run_id,
            "source": {
                "bucket": f["bucket"],
                "key": f["key"]
            },
            "replay": {
                "replayId": replay_id,
                "reason": manifest.get("reason", ""),
                "options": manifest.get("options", {})
            },
            "validation": {
                "valid": True,
                "reason": None
            }
            # In a fuller implementation, I would re-run validation or call the same validator Lambda
        }

        sfn.start_execution(
            stateMachineArn=STATE_MACHINE_ARN,
            name=str(uuid.uuid4()),
            input=json.dumps(payload)
        )

    return {"statusCode": 202, "started": len(manifest["files"])}
Enter fullscreen mode Exit fullscreen mode

Replay best practices I strongly recommend

  • Use a business idempotency key (for example, source file URI + dataset + version)
  • Tag replay outputs with replay metadata
  • Keep replay behavior explicit (force, dryRun, skipIfPublished)
  • Never mutate raw inputs during replay
  • Record replay requests and outcomes in a durable audit log (DynamoDB or S3 append logs)

ETL vs ELT: how I support both in the same architecture

This pattern is intentionally hybrid.

ETL mode (transform before publish)

I use this when:

  • downstream consumers need consistent schema fast
  • I want centralized transformation logic
  • I need quality gates before data is widely consumed

Flow:
validated -> Glue transform -> DQ -> curated

ELT mode (load first, transform later)

I use this when:

  • I need rapid ingestion of many datasets
  • transforms are evolving quickly
  • downstream teams own transformations (Athena/warehouse/dbt/etc.)

Flow:
validated -> catalog/register -> downstream SQL transforms
with the same validation/quarantine/replay capabilities still in place.

I can even run both:

  • land curated “v1” in Glue
  • also preserve validated data for future ELT workloads

Implementation discussion (what matters beyond the happy path)

1) Idempotency and duplicates

At-least-once delivery can show up in many places (producer retries, EventBridge fan-out, replays, manual operations). I assume duplicates can happen.

My approach:

  • derive a stable source_object_id (bucket + key + versionId if enabled)
  • persist processing status (optional DynamoDB table)
  • make publishing steps idempotent
  • partition outputs carefully to avoid duplicate row amplification

If your producers can overwrite keys, enable S3 versioning and include versionId in your identity model.


2) Observability

I treat observability as part of the pipeline design, not an add-on.

I emit:

  • ValidationPassed, ValidationFailed
  • GlueJobSucceeded, GlueJobFailed
  • DQPassed, DQFailed
  • Quarantined
  • ReplayStarted, ReplayCompleted

I also propagate:

  • runId
  • dataset
  • source bucket/key
  • replayId (if applicable)

This makes CloudWatch Logs searches and alarms much more useful.


3) Security and access control

At minimum, I lock down:

  • S3 bucket policies to only the required principals
  • KMS encryption for all buckets (SSE-KMS)
  • Least-privilege IAM roles for Lambda, Step Functions, and Glue
  • Explicit separation between raw, validated, curated, and quarantine write permissions

I also recommend:

  • object ownership settings to avoid ACL surprises
  • S3 lifecycle policies for quarantine retention
  • VPC configuration for Glue/Lambda only when needed (avoid unnecessary complexity)

4) Cost and performance tuning

This serverless pattern scales well, but costs can drift if I do not tune it.

Where I watch cost first

  • too many tiny files causing too many executions
  • over-frequent Glue job runs for tiny payloads
  • verbose DQ checks on every micro-batch
  • repeated replay runs without dedupe

Practical optimizations

  • batch small files by dataset/time window before heavy transforms
  • use prefix routing to isolate noisy datasets
  • right-size Glue workers and concurrency
  • use Step Functions Express only for truly short, idempotent, high-volume orchestration (not this .sync Glue-driven path)

5) Deployment strategy

I normally deploy this stack with AWS CDK, SAM, or Terraform.

A clean decomposition is:

  • Foundation stack
    • S3 buckets
    • KMS keys
    • IAM roles
    • EventBridge bus/rules
  • Compute stack
    • Lambda functions
    • Glue jobs
    • Step Functions
  • Data contracts stack/config
    • dataset configs
    • DQ rulesets
    • schema policies

This reduces blast radius when I update Glue scripts or state machine logic.


6) Testing strategy (worth doing early)

I test this pipeline at three levels:

Unit tests

  • Lambda validation logic
  • path parsing
  • schema policy decisions
  • quarantine payload generation

Integration tests

  • sample files dropped into raw bucket
  • expected state machine transitions
  • curated output creation
  • quarantine routing and reason codes

Replay tests

  • replay one file
  • replay a batch
  • replay a previously quarantined file after a code fix
  • verify no duplicate publish side effects

I strongly recommend keeping a small set of golden input files:

  • valid file
  • missing required column
  • type drift file
  • malformed file
  • duplicate file
  • late-arriving file

Common failure modes and how I handle them

Failure mode: “Pipeline is green but data is wrong”

This usually means schema validation exists but business DQ does not.
Fix: add DQ gates and track DQ outcomes as first-class metrics.

Failure mode: “Replays create duplicate records”

This is almost always an idempotency design issue.
Fix: make output writes and publish steps idempotent using stable keys and checkpointing.

Failure mode: “Glue job keeps failing after producer change”

This is often schema drift or delimiter/header changes.
Fix: detect earlier in validation, classify drift type, and quarantine with explicit reason.

Failure mode: “Debugging is painful”

This happens when correlation IDs are not propagated.
Fix: carry runId across EventBridge -> Lambda -> Step Functions -> Glue -> DQ -> Notifications.


Closing thoughts

This architecture works well because it is not just a data flow. It is an operational flow.

The key idea is simple: I do not let “data landed in S3” mean “data is trusted.” I explicitly separate:

  • landing
  • validation
  • transformation
  • quality enforcement
  • quarantine
  • replay

That separation gives me reliability, observability, and a much better developer experience when the inevitable producer-side surprises happen.

If I were extending this next, I would add:

  • a dataset contract registry (versioned)
  • a metadata/status table (DynamoDB)
  • automated replay tooling for quarantined objects
  • a producer scorecard (quality and schema stability over time)

References

  • Amazon S3 EventBridge integration and enablement (S3 User Guide) (docs.aws.amazon.com)
  • Step Functions workflow type selection (Standard vs Express semantics, durations, integration pattern differences) (docs.aws.amazon.com)
  • Step Functions integration with AWS Glue (startJobRun / Glue service integration) (docs.aws.amazon.com)
  • AWS Glue Data Quality (DQDL, DeeQu-based managed service) (docs.aws.amazon.com)
  • AWS Glue DynamicFrame and schema inconsistency handling (docs.aws.amazon.com)
  • AWS Glue resolveChoice() guidance for ambiguous types in DynamicFrame (docs.aws.amazon.com)

Top comments (0)