DEV Community

Cover image for Building a Document Processing Pipeline with S3, Textract, Step Functions and EventBridge
Renaldi for AWS Community Builders

Posted on

Building a Document Processing Pipeline with S3, Textract, Step Functions and EventBridge

This is one of my favorite AWS patterns to demo because it is both visually compelling and production-relevant. It shows event-driven architecture, orchestration, asynchronous AI/ML service integration, scale-out processing, human-in-the-loop review, and operational discipline in one workflow.

In this post, I will walk through an end-to-end implementation of a document processing pipeline built with:

  • Amazon S3 for document ingress and result storage
  • Amazon Textract for OCR and structured extraction
  • AWS Step Functions for orchestration (including Distributed Map for batch scale)
  • Amazon EventBridge for event routing and downstream integration

I will also cover:

  • Async Textract orchestration
  • Batch scaling with Distributed Map
  • Result storage and audit trail
  • Human review step
  • Cost and throughput tuning
  • Architecture and code walkthrough

Why this pattern is so effective

In real teams, document processing is rarely just “OCR a file and store JSON.” We usually need to handle:

  • Multi-page PDFs and asynchronous processing
  • Bursty uploads (for example, end-of-day batch drops)
  • Traceability for compliance and audits
  • Human review for low-confidence or ambiguous fields
  • Clean integration points for downstream systems

This architecture solves those concerns in a way that is easy to demonstrate and scale.


What we are building

I am designing the pipeline around two operating modes:

  1. Single-document event-driven processing

    • A file lands in S3
    • EventBridge triggers the workflow
    • The workflow runs end-to-end for that document
  2. Batch processing mode

    • A manifest (JSON/CSV/list of document keys) is provided
    • Step Functions uses Distributed Map to fan out child workflows
    • Each child workflow processes one document independently

This keeps the core processing logic consistent while letting me scale from demos to production.


Architecture Overview

At a high level, the flow is:

  1. A document is uploaded to an S3 input bucket
  2. S3 emits an event to EventBridge
  3. EventBridge starts a Step Functions parent workflow
  4. The parent workflow prepares a manifest (single item or batch list)
  5. A Distributed Map launches child workflows for each document
  6. Each child workflow:
    • validates input
    • starts async Textract
    • waits/polls for completion
    • retrieves and normalizes results
    • stores raw + normalized outputs
    • writes audit records
    • routes low-confidence cases to human review
    • emits a processed event to EventBridge
  7. Downstream systems consume the processed event


End-to-End Walkthrough

1) Document ingress with S3 + EventBridge

I use S3 as the system of record for inbound documents. The upload path usually looks like:

  • s3://my-docs-incoming/raw/YYYY/MM/DD/<tenant>/<document>.pdf

I prefer a structured key naming convention because it helps with:

  • lifecycle policies
  • tenant scoping
  • troubleshooting
  • cost attribution
  • replay operations

S3 can publish object events to EventBridge, and I use an EventBridge rule to filter only the prefixes and file types that should trigger processing (for example, PDFs in raw/).

Example EventBridge rule pattern (S3 object created)

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["my-docs-incoming"]
    },
    "object": {
      "key": [{ "prefix": "raw/" }]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

From there, EventBridge starts the Step Functions parent state machine.


2) Parent orchestration with Step Functions

I use a Standard workflow for the parent because:

  • document jobs can run for a while
  • I may have retries and waits
  • I may pause for human review (callback pattern)
  • I want richer execution history

The parent workflow does a few things:

  • Parses the incoming event (or batch request)
  • Creates a manifest for processing (even if only one document)
  • Sets concurrency controls (important for Textract/Lambda quotas)
  • Launches a Distributed Map

Why Distributed Map matters

For real batches, a normal Map state can become limiting because of concurrency and history size. Distributed Map gives me child workflow executions, better scaling, and cleaner observability for each document.


3) Child workflow per document (core pipeline)

Each child workflow processes a single document from start to finish.

Core steps in the child workflow

  1. Validate input

    • file type
    • bucket/key exists
    • metadata (tenant, doc type, correlation ID)
    • idempotency key check
  2. Start Textract asynchronously

    • call StartDocumentAnalysis
    • capture JobId
    • store correlation in audit trail
  3. Wait for completion

    • polling loop (simple and explicit for Step Functions)
    • or callback/SNS-based completion (production optimization)
  4. Retrieve paginated results

    • call GetDocumentAnalysis until NextToken is exhausted
  5. Normalize + enrich

    • convert Textract blocks into domain JSON
    • score confidence thresholds
    • apply business rules
  6. Store outputs

    • raw output
    • normalized output
    • processing summary
    • audit entries
  7. Human review (if needed)

    • pause with task token callback
    • reviewer approves/edits/rejects
  8. Emit completion event

    • publish a custom EventBridge event
    • downstream systems subscribe independently

4) Async Textract orchestration (practical implementation)

Textract async processing is the correct choice for multi-page documents and larger workloads. The key point is that I do not try to synchronously block a Lambda while waiting for Textract.

Instead, I let Step Functions own the waiting and retry logic.

Two patterns I use in practice

Pattern A (implemented in this post): Step Functions polling loop

This is the simplest pattern to explain and demo.

  • StartDocumentAnalysis
  • Wait
  • GetDocumentAnalysis (status check)
  • loop until SUCCEEDED / FAILED

Pros:

  • Easy to understand
  • No extra callback plumbing
  • Great for demos and many production cases

Cons:

  • More state transitions
  • Not the most efficient if jobs are very long

Pattern B (production optimization): SNS/SQS + callback

Textract async operations notify completion via SNS. I can correlate JobId, then call SendTaskSuccess to resume the Step Functions execution.

Pros:

  • Fewer polling transitions
  • More event-driven

Cons:

  • More moving parts (SNS/SQS/Lambda/token correlation)

For this article, I am implementing Pattern A for clarity, and I will still preserve auditability and scalability.


5) Result storage and audit trail (what I store and why)

This is where many demos stop too early. I do not want a pipeline that only prints extraction results to logs.

I store four classes of artifacts:

A. Raw input (immutable)

  • Original document in S3 (raw/)

B. Raw Textract result snapshot

  • Either Textract output JSON (if using OutputConfig)
  • Or consolidated raw blocks written by my retrieval Lambda

C. Normalized business result

A clean JSON model that downstream systems can actually use, for example:

{
  "documentId": "INV-2026-000123",
  "documentType": "invoice",
  "vendorName": "Acme Pty Ltd",
  "invoiceDate": "2026-02-25",
  "totalAmount": 1285.40,
  "currency": "AUD",
  "fields": [
    {
      "name": "invoice_number",
      "value": "INV-2026-000123",
      "confidence": 98.7,
      "source": "textract"
    }
  ],
  "review": {
    "required": false,
    "reason": null
  }
}
Enter fullscreen mode Exit fullscreen mode

D. Audit trail (DynamoDB + events)

I keep an audit table with entries like:

  • correlation ID
  • S3 bucket/key/version
  • execution ARN
  • Textract JobId
  • timestamps (started/completed)
  • status transitions
  • retry count
  • reviewer ID + decision (if reviewed)

This makes troubleshooting and compliance conversations much easier.


6) Human review step (callback pattern)

I like to add a human review step because it turns the demo into a real workflow.

When I trigger human review

I send documents for human review if:

  • confidence is below threshold for required fields
  • required fields are missing
  • document type classification is ambiguous
  • business validations fail (for example, total does not match line items)

How the callback works

  1. Step Functions reaches HumanReviewRequired
  2. It invokes a Lambda using waitForTaskToken
  3. That Lambda stores a review task (with the task token) in DynamoDB and optionally emits a notification event
  4. A reviewer UI or ops tool loads the pending task
  5. Reviewer approves/edits/rejects
  6. API Gateway + Lambda calls:
    • SendTaskSuccess (approve / approved edits)
    • SendTaskFailure (reject / unrecoverable issue)

This lets me keep the workflow paused cleanly without polling the UI.


7) EventBridge for decoupled downstream integration

I use EventBridge in two places:

Ingress routing

  • S3 object-created events trigger the parent workflow

Egress publishing

  • The child workflow publishes custom events like:
    • document.processed
    • document.review.required
    • document.failed

This is a huge win because downstream systems can subscribe independently:

  • search indexing
  • analytics pipelines
  • case management
  • notifications
  • data quality dashboards

No tight coupling to the core document processing workflow.


Implementation Discussion

Below is a concrete implementation outline with code.


State machine design (parent + child)

I prefer splitting the logic into:

  • Parent workflow: batching and fan-out
  • Child workflow: per-document processing

This keeps each workflow easier to reason about and test.

Parent workflow (ASL fragment with Distributed Map)

{
  "Comment": "Parent workflow for document batch orchestration",
  "StartAt": "BuildManifest",
  "States": {
    "BuildManifest": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${BuildManifestFnArn}",
        "Payload.$": "$"
      },
      "OutputPath": "$.Payload",
      "Next": "ProcessDocumentsDistributed"
    },
    "ProcessDocumentsDistributed": {
      "Type": "Map",
      "Label": "ProcessDocuments",
      "ItemsPath": "$.documents",
      "MaxConcurrencyPath": "$.maxConcurrency",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        },
        "StartAt": "StartChildExecution",
        "States": {
          "StartChildExecution": {
            "Type": "Task",
            "Resource": "arn:aws:states:::states:startExecution.sync:2",
            "Arguments": {
              "StateMachineArn": "${ChildStateMachineArn}",
              "Input": {
                "document.$": "$$.Map.Item.Value",
                "batchContext.$": "$.batchContext"
              }
            },
            "End": true
          }
        }
      },
      "Next": "BatchSummary"
    },
    "BatchSummary": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${BatchSummaryFnArn}",
        "Payload.$": "$"
      },
      "OutputPath": "$.Payload",
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Notes on this design

  • MaxConcurrencyPath lets me tune concurrency dynamically per batch.
  • I can set lower concurrency for production if Textract quotas are tighter than my desired fan-out.
  • Child executions give me better isolation and clearer failure analysis.

Child workflow (ASL fragment for async Textract + human review + EventBridge)

{
  "Comment": "Child workflow for processing one document",
  "StartAt": "ValidateDocument",
  "States": {
    "ValidateDocument": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${ValidateDocumentFnArn}",
        "Payload.$": "$"
      },
      "OutputPath": "$.Payload",
      "Next": "StartTextract"
    },
    "StartTextract": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:textract:startDocumentAnalysis",
      "Arguments": {
        "DocumentLocation": {
          "S3Object": {
            "Bucket.$": "$.document.bucket",
            "Name.$": "$.document.key"
          }
        },
        "FeatureTypes": ["FORMS", "TABLES"],
        "ClientRequestToken.$": "$.idempotencyKey",
        "JobTag.$": "$.jobTag"
      },
      "ResultPath": "$.textractStart",
      "Next": "WaitBeforeStatusCheck",
      "Retry": [
        {
          "ErrorEquals": [
            "Textract.ThrottlingException",
            "Textract.ProvisionedThroughputExceededException",
            "States.TaskFailed"
          ],
          "IntervalSeconds": 2,
          "BackoffRate": 2.0,
          "MaxAttempts": 5
        }
      ]
    },
    "WaitBeforeStatusCheck": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "CheckTextractStatus"
    },
    "CheckTextractStatus": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:textract:getDocumentAnalysis",
      "Arguments": {
        "JobId.$": "$.textractStart.JobId",
        "MaxResults": 1
      },
      "ResultPath": "$.textractStatus",
      "Next": "TextractComplete?"
    },
    "TextractComplete?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.textractStatus.JobStatus",
          "StringEquals": "SUCCEEDED",
          "Next": "CollectTextractPages"
        },
        {
          "Variable": "$.textractStatus.JobStatus",
          "StringEquals": "FAILED",
          "Next": "MarkFailed"
        },
        {
          "Variable": "$.textractStatus.JobStatus",
          "StringEquals": "PARTIAL_SUCCESS",
          "Next": "CollectTextractPages"
        }
      ],
      "Default": "WaitBeforeStatusCheck"
    },
    "CollectTextractPages": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${CollectTextractPagesFnArn}",
        "Payload": {
          "jobId.$": "$.textractStart.JobId",
          "document.$": "$.document",
          "executionArn.$": "$$.Execution.Id"
        }
      },
      "OutputPath": "$.Payload",
      "Next": "NormalizeAndScore"
    },
    "NormalizeAndScore": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${NormalizeAndScoreFnArn}",
        "Payload.$": "$"
      },
      "OutputPath": "$.Payload",
      "Next": "NeedsHumanReview?"
    },
    "NeedsHumanReview?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.review.required",
          "BooleanEquals": true,
          "Next": "CreateHumanReviewTask"
        }
      ],
      "Default": "PublishProcessedEvent"
    },
    "CreateHumanReviewTask": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Arguments": {
        "FunctionName": "${CreateHumanReviewTaskFnArn}",
        "Payload": {
          "taskToken.$": "$$.Task.Token",
          "document.$": "$.document",
          "result.$": "$",
          "executionArn.$": "$$.Execution.Id"
        }
      },
      "TimeoutSeconds": 604800,
      "ResultPath": "$.humanReviewDecision",
      "Next": "ApplyHumanReviewDecision"
    },
    "ApplyHumanReviewDecision": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${ApplyHumanReviewDecisionFnArn}",
        "Payload": {
          "pipelineResult.$": "$",
          "reviewDecision.$": "$.humanReviewDecision.Payload"
        }
      },
      "OutputPath": "$.Payload",
      "Next": "PublishProcessedEvent"
    },
    "PublishProcessedEvent": {
      "Type": "Task",
      "Resource": "arn:aws:states:::events:putEvents",
      "Arguments": {
        "Entries": [
          {
            "Source": "com.example.documents",
            "DetailType": "document.processed",
            "EventBusName": "${EventBusName}",
            "Detail": {
              "documentId.$": "$.document.id",
              "bucket.$": "$.document.bucket",
              "key.$": "$.document.key",
              "status": "PROCESSED",
              "review.$": "$.review",
              "output.$": "$.output"
            }
          }
        ]
      },
      "Next": "FinalizeAudit"
    },
    "FinalizeAudit": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${FinalizeAuditFnArn}",
        "Payload.$": "$"
      },
      "OutputPath": "$.Payload",
      "End": true
    },
    "MarkFailed": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Arguments": {
        "FunctionName": "${MarkFailedFnArn}",
        "Payload.$": "$"
      },
      "OutputPath": "$.Payload",
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Lambda: Collect paginated Textract results and store raw output (Python)

This Lambda consolidates paginated GetDocumentAnalysis responses, writes a raw artifact to S3, and returns a pointer for downstream normalization.

import json
import os
from datetime import datetime, timezone

import boto3

textract = boto3.client("textract")
s3 = boto3.client("s3")

RAW_RESULTS_BUCKET = os.environ["RAW_RESULTS_BUCKET"]
RAW_RESULTS_PREFIX = os.environ.get("RAW_RESULTS_PREFIX", "textract-raw")


def lambda_handler(event, context):
    job_id = event["jobId"]
    document = event["document"]
    execution_arn = event["executionArn"]

    blocks = []
    next_token = None
    first_page_metadata = None
    final_status = None
    warnings = []

    while True:
        kwargs = {"JobId": job_id, "MaxResults": 1000}
        if next_token:
            kwargs["NextToken"] = next_token

        resp = textract.get_document_analysis(**kwargs)
        final_status = resp.get("JobStatus", final_status)

        if first_page_metadata is None:
            first_page_metadata = {
                "DocumentMetadata": resp.get("DocumentMetadata", {}),
                "AnalyzeDocumentModelVersion": resp.get("AnalyzeDocumentModelVersion"),
            }

        blocks.extend(resp.get("Blocks", []))
        warnings.extend(resp.get("Warnings", []))

        next_token = resp.get("NextToken")
        if not next_token:
            break

    now = datetime.now(timezone.utc).isoformat()
    raw_key = (
        f"{RAW_RESULTS_PREFIX}/"
        f"{document.get('tenant', 'unknown')}/"
        f"{document['id']}/"
        f"{job_id}.json"
    )

    payload = {
        "jobId": job_id,
        "document": document,
        "executionArn": execution_arn,
        "collectedAt": now,
        "jobStatus": final_status,
        "metadata": first_page_metadata,
        "warnings": warnings,
        "blocks": blocks,
    }

    s3.put_object(
        Bucket=RAW_RESULTS_BUCKET,
        Key=raw_key,
        Body=json.dumps(payload).encode("utf-8"),
        ContentType="application/json",
    )

    return {
        "document": document,
        "textract": {
            "jobId": job_id,
            "jobStatus": final_status,
            "rawResultsS3": {
                "bucket": RAW_RESULTS_BUCKET,
                "key": raw_key,
            },
            "blockCount": len(blocks),
            "warningsCount": len(warnings),
        },
    }
Enter fullscreen mode Exit fullscreen mode

Lambda: Normalize and score extracted data (Python)

This is where I transform Textract’s generic block model into a domain result that downstream systems can consume.

import json
import os
from decimal import Decimal

import boto3

s3 = boto3.client("s3")

NORMALIZED_BUCKET = os.environ["NORMALIZED_BUCKET"]
NORMALIZED_PREFIX = os.environ.get("NORMALIZED_PREFIX", "normalized")


def _load_json_from_s3(bucket: str, key: str) -> dict:
    obj = s3.get_object(Bucket=bucket, Key=key)
    return json.loads(obj["Body"].read())


def _extract_lines_and_words(blocks):
    # Minimal example; in production I usually build maps by block Id and relationships.
    lines = []
    for b in blocks:
        if b.get("BlockType") == "LINE":
            lines.append({
                "text": b.get("Text", ""),
                "confidence": b.get("Confidence", 0.0)
            })
    return lines


def _simple_invoice_heuristics(lines):
    # Demo heuristics only. Replace with rule engine / ML classifier as needed.
    text_blob = "\n".join([l["text"] for l in lines])
    avg_conf = sum(l["confidence"] for l in lines) / max(len(lines), 1)

    result = {
        "documentType": "unknown",
        "fields": [],
        "review": {
            "required": False,
            "reason": None
        }
    }

    if "invoice" in text_blob.lower():
        result["documentType"] = "invoice"

    # Example field extraction heuristic placeholder
    # In production, I'd parse key-value pairs and tables from FORM/TABLE blocks
    result["fields"].append({
        "name": "avg_line_confidence",
        "value": round(avg_conf, 2),
        "confidence": round(avg_conf, 2),
        "source": "textract"
    })

    # Review policy example
    if avg_conf < 90:
        result["review"]["required"] = True
        result["review"]["reason"] = "Average OCR confidence below threshold"

    return result


def lambda_handler(event, context):
    document = event["document"]
    textract_ptr = event["textract"]["rawResultsS3"]

    raw = _load_json_from_s3(textract_ptr["bucket"], textract_ptr["key"])
    blocks = raw["blocks"]

    lines = _extract_lines_and_words(blocks)
    derived = _simple_invoice_heuristics(lines)

    normalized = {
        "document": document,
        "textract": {
            "jobId": raw["jobId"],
            "jobStatus": raw["jobStatus"],
            "warnings": raw.get("warnings", []),
            "blockCount": len(blocks)
        },
        "classification": {
            "documentType": derived["documentType"]
        },
        "fields": derived["fields"],
        "review": derived["review"],
        "output": {
            "rawResultsS3": textract_ptr
        }
    }

    out_key = f"{NORMALIZED_PREFIX}/{document.get('tenant', 'unknown')}/{document['id']}.json"
    s3.put_object(
        Bucket=NORMALIZED_BUCKET,
        Key=out_key,
        Body=json.dumps(normalized).encode("utf-8"),
        ContentType="application/json"
    )

    normalized["output"]["normalizedS3"] = {
        "bucket": NORMALIZED_BUCKET,
        "key": out_key
    }

    return normalized
Enter fullscreen mode Exit fullscreen mode

Lambda: Create human review task (callback token storage) (Python)

This Lambda stores the task token so a reviewer can resume the workflow later.

import json
import os
import time
from datetime import datetime, timezone

import boto3

ddb = boto3.resource("dynamodb")
events = boto3.client("events")

REVIEW_TABLE = ddb.Table(os.environ["REVIEW_TABLE"])
EVENT_BUS_NAME = os.environ["EVENT_BUS_NAME"]


def lambda_handler(event, context):
    task_token = event["taskToken"]
    document = event["document"]
    result = event["result"]
    execution_arn = event["executionArn"]

    review_id = f"{document['id']}#{int(time.time())}"
    now = datetime.now(timezone.utc).isoformat()

    item = {
        "reviewId": review_id,
        "status": "PENDING",
        "createdAt": now,
        "documentId": document["id"],
        "executionArn": execution_arn,
        "taskToken": task_token,
        "document": document,
        "resultSummary": {
            "reason": result["review"]["reason"],
            "documentType": result.get("classification", {}).get("documentType", "unknown"),
            "normalizedOutput": result["output"].get("normalizedS3")
        }
    }

    REVIEW_TABLE.put_item(Item=item)

    events.put_events(
        Entries=[
            {
                "Source": "com.example.documents",
                "DetailType": "document.review.required",
                "EventBusName": EVENT_BUS_NAME,
                "Detail": json.dumps({
                    "reviewId": review_id,
                    "documentId": document["id"],
                    "reason": result["review"]["reason"]
                })
            }
        ]
    )

    # For waitForTaskToken, Lambda can return immediately after storing task metadata.
    # The workflow stays paused until SendTaskSuccess/SendTaskFailure is called.
    return {
        "reviewId": review_id,
        "status": "PENDING"
    }
Enter fullscreen mode Exit fullscreen mode

Lambda: Reviewer callback endpoint (API Gateway -> Lambda -> Step Functions) (Python)

This Lambda receives the human decision and resumes the waiting execution.

import json
import os
from datetime import datetime, timezone

import boto3
from botocore.exceptions import ClientError

ddb = boto3.resource("dynamodb")
sfn = boto3.client("stepfunctions")

REVIEW_TABLE = ddb.Table(os.environ["REVIEW_TABLE"])


def _response(status_code, body):
    return {
        "statusCode": status_code,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps(body)
    }


def lambda_handler(event, context):
    try:
        body = json.loads(event.get("body") or "{}")
        review_id = body["reviewId"]
        action = body["action"]  # approve | reject
        reviewer = body.get("reviewer", "unknown")
        corrected_fields = body.get("correctedFields", {})

        item_resp = REVIEW_TABLE.get_item(Key={"reviewId": review_id})
        item = item_resp.get("Item")
        if not item:
            return _response(404, {"message": "Review task not found"})

        if item["status"] != "PENDING":
            return _response(409, {"message": f"Review task already {item['status']}"})

        task_token = item["taskToken"]
        now = datetime.now(timezone.utc).isoformat()

        if action == "approve":
            sfn.send_task_success(
                taskToken=task_token,
                output=json.dumps({
                    "reviewId": review_id,
                    "decision": "APPROVED",
                    "reviewer": reviewer,
                    "reviewedAt": now,
                    "correctedFields": corrected_fields
                })
            )
            new_status = "APPROVED"
        elif action == "reject":
            sfn.send_task_failure(
                taskToken=task_token,
                error="HumanReviewRejected",
                cause=json.dumps({
                    "reviewId": review_id,
                    "reviewer": reviewer,
                    "reviewedAt": now
                })
            )
            new_status = "REJECTED"
        else:
            return _response(400, {"message": "Invalid action"})

        REVIEW_TABLE.update_item(
            Key={"reviewId": review_id},
            UpdateExpression="SET #s = :s, reviewer = :r, reviewedAt = :t",
            ExpressionAttributeNames={"#s": "status"},
            ExpressionAttributeValues={
                ":s": new_status,
                ":r": reviewer,
                ":t": now
            }
        )

        return _response(200, {"message": "Review decision recorded", "status": new_status})

    except KeyError as e:
        return _response(400, {"message": f"Missing field: {e.args[0]}"})
    except ClientError as e:
        return _response(500, {"message": "AWS error", "detail": str(e)})
    except Exception as e:
        return _response(500, {"message": "Unexpected error", "detail": str(e)})
Enter fullscreen mode Exit fullscreen mode

CDK example (Python) for S3 -> EventBridge -> Step Functions start

This is a simplified snippet to show the core wiring. In a production stack, I would break this into constructs (ingress, orchestration, review, storage, observability).

from aws_cdk import (
    Stack,
    aws_s3 as s3,
    aws_events as events,
    aws_events_targets as targets,
    aws_stepfunctions as sfn,
    aws_iam as iam,
)
from constructs import Construct


class DocumentPipelineIngressStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs):
        super().__init__(scope, construct_id, **kwargs)

        input_bucket = s3.Bucket(
            self,
            "InputBucket",
            event_bridge_enabled=True
        )

        parent_state_machine = sfn.StateMachine.from_state_machine_arn(
            self,
            "ParentStateMachine",
            "arn:aws:states:ap-southeast-2:123456789012:stateMachine:doc-parent"
        )

        rule = events.Rule(
            self,
            "S3ObjectCreatedRule",
            event_pattern=events.EventPattern(
                source=["aws.s3"],
                detail_type=["Object Created"],
                detail={
                    "bucket": {"name": [input_bucket.bucket_name]},
                    "object": {"key": [{"prefix": "raw/"}]}
                }
            )
        )

        # Start execution on matching events
        rule.add_target(targets.SfnStateMachine(parent_state_machine))
Enter fullscreen mode Exit fullscreen mode

In a real deployment, I also add:

  • explicit execution input transformation
  • IAM least privilege policies
  • DLQs / retry policies on targets where appropriate
  • environment-based prefix filtering (dev/test/prod)

Data model and audit design (recommended)

I strongly recommend treating auditability as a first-class feature.

DynamoDB audit table (example keys)

PK: DOC#<documentId>

SK: EVENT#<timestamp>#<eventType>

Example audit events

  • INGESTED
  • WORKFLOW_STARTED
  • TEXTRACT_STARTED
  • TEXTRACT_SUCCEEDED
  • NORMALIZED
  • HUMAN_REVIEW_REQUIRED
  • HUMAN_REVIEW_APPROVED
  • PUBLISHED
  • FAILED

This pattern gives me an append-only lineage timeline per document.


Error handling and retries (what I do on purpose)

This pipeline is asynchronous and distributed, so failures are normal. I design for them.

Where I retry

  • Textract throttling / throughput exceptions
  • transient Lambda errors
  • EventBridge PutEvents partial failures (Step Functions can fail the task and retry)
  • downstream S3 write failures

Where I do not blindly retry

  • invalid file format
  • missing required metadata
  • corrupted documents
  • deterministic business-rule violations

Failure routing

I emit a document.failed event and write a terminal audit entry with:

  • failure type
  • step name
  • error code
  • correlation IDs

That makes operations and replay much easier.


Cost and Throughput Tuning

This is where the architecture becomes production-grade.

1) Choose Step Functions workflow type intentionally

I use Standard for the main orchestration because:

  • async waits
  • human callback pauses
  • richer history and control

If I want to optimize cost for very high-volume short preprocessing tasks, I may split out a lightweight Express workflow in front, but I keep the long-running document path in Standard.

2) Tune Distributed Map concurrency to downstream capacity

Distributed Map can scale very high, but the bottleneck is usually downstream service quotas (Textract, Lambda concurrency, DynamoDB write capacity, etc.).

What I do:

  • make concurrency configurable (MaxConcurrencyPath)
  • start conservatively in production
  • load test with realistic document sizes and page counts
  • request quota increases before large launches

3) Reduce unnecessary state transitions

Polling is simple, but excessive polling increases state transitions and cost.

Practical tuning:

  • increase wait interval after the first few polls (progressive backoff)
  • use doc-size-aware polling intervals (longer waits for large PDFs)
  • consider SNS/SQS callback for long-running jobs

4) Store outputs once, reuse many times

I avoid re-running Textract when:

  • a replay is only needed for normalization logic changes
  • a downstream consumer fails
  • a human review needs to re-open a prior result

By storing raw output + normalized output + audit events, I can replay downstream steps without paying for re-extraction.

5) Separate hot path and review path

I do not force all documents through human review.
Most documents should:

  • extract
  • validate
  • publish
  • finish

Only low-confidence or exception cases enter the review path. This keeps cost and latency predictable.

6) Add lifecycle policies

For S3 cost management, I usually apply different retention windows:

  • raw uploads: longer retention (compliance dependent)
  • raw Textract artifacts: medium retention
  • normalized results: longer (business value)
  • debug-only artifacts: shorter

Security, Compliance, and Operational Notes

IAM least privilege

Give each component only what it needs:

  • Step Functions can call specific Lambdas / Textract / EventBridge
  • Lambdas can read/write only the intended buckets/prefixes
  • Reviewer API can only access review table + Step Functions callback APIs

Encryption

I enable encryption at rest for:

  • S3 buckets
  • DynamoDB tables
  • logs (when applicable)

If needed, I also use KMS for tighter key control and auditability.

PII considerations

Document pipelines often contain sensitive data. I plan for:

  • access controls by tenant
  • minimal logging (no full document contents in logs)
  • review UI redaction where needed
  • retention policies aligned to policy/legal requirements

Observability

I instrument:

  • Step Functions execution failure alarms
  • Lambda error/throttle alarms
  • custom metrics (documents processed, review rate, average pages/doc, latency)
  • audit events for business-level tracing

Demo Tips (if you are presenting this live)

This pattern demos extremely well if you show all three views:

  1. Upload a document

    • drag-and-drop into S3 (or app UI)
  2. Workflow progress

    • Step Functions execution graph
    • Distributed Map child runs for batches
  3. Outputs

    • raw Textract JSON pointer
    • normalized JSON in S3
    • audit trail item(s)
    • human review queue (triggered by a low-confidence sample)

If I want a smooth live demo, I prepare:

  • one “clean” invoice (no review needed)
  • one noisy/scanned doc (human review path)
  • one small batch manifest (3 to 10 docs) to show Distributed Map fan-out

Extensions I would add next

Once this base pipeline is running, it becomes a great platform for extensions:

  • Document classification before extraction (route to different extraction templates)
  • Schema validation per document type
  • OpenSearch indexing for searchable archives
  • Comprehend / Bedrock post-processing for summaries and entity normalization
  • Tenant-specific extraction configs (thresholds, required fields, routing)
  • Replay tooling for partial reprocessing from audit trail checkpoints

Closing Thoughts

I like this architecture because it is not just a toy OCR demo. It demonstrates a proper event-driven, auditable, scalable workflow that teams can actually build on.

The combination of:

  • S3 for durable ingress
  • Textract for extraction
  • Step Functions for orchestration
  • Distributed Map for batch scale
  • EventBridge for decoupling
  • callback-based human review for exceptions

gives a strong foundation for production document intelligence systems.

If I were publishing this as a developer advocacy post, I would also include a sample repository with:

  • CDK deployment
  • test documents
  • reviewer UI mock
  • load test script
  • dashboards/alarms

That turns the article into something readers can run and evolve.


References

  • Amazon Textract async APIs and async processing overview (start/get patterns, async job model, supported file types, SNS notification model for async operations)
  • GetDocumentAnalysis pagination (MaxResults, NextToken) and paginated retrieval behavior
  • Textract output storage and OutputConfig (custom S3 output and optional KMS for output)
  • Textract quotas and Service Quotas guidance
  • S3 -> EventBridge integration and S3 EventBridge event structure
  • Step Functions Distributed Map (when to use it, high concurrency, child workflows, map runs, input/history limits)
  • Step Functions callback/task token pattern and long waits for external/human steps
  • Step Functions optimized EventBridge integration (events:putEvents) and failure handling behavior
  • Step Functions pricing model (state transitions for Standard workflows)

Top comments (0)