This is one of my favorite AWS patterns to demo because it is both visually compelling and production-relevant. It shows event-driven architecture, orchestration, asynchronous AI/ML service integration, scale-out processing, human-in-the-loop review, and operational discipline in one workflow.
In this post, I will walk through an end-to-end implementation of a document processing pipeline built with:
- Amazon S3 for document ingress and result storage
- Amazon Textract for OCR and structured extraction
- AWS Step Functions for orchestration (including Distributed Map for batch scale)
- Amazon EventBridge for event routing and downstream integration
I will also cover:
- Async Textract orchestration
- Batch scaling with Distributed Map
- Result storage and audit trail
- Human review step
- Cost and throughput tuning
- Architecture and code walkthrough
Why this pattern is so effective
In real teams, document processing is rarely just “OCR a file and store JSON.” We usually need to handle:
- Multi-page PDFs and asynchronous processing
- Bursty uploads (for example, end-of-day batch drops)
- Traceability for compliance and audits
- Human review for low-confidence or ambiguous fields
- Clean integration points for downstream systems
This architecture solves those concerns in a way that is easy to demonstrate and scale.
What we are building
I am designing the pipeline around two operating modes:
-
Single-document event-driven processing
- A file lands in S3
- EventBridge triggers the workflow
- The workflow runs end-to-end for that document
-
Batch processing mode
- A manifest (JSON/CSV/list of document keys) is provided
- Step Functions uses Distributed Map to fan out child workflows
- Each child workflow processes one document independently
This keeps the core processing logic consistent while letting me scale from demos to production.
Architecture Overview
At a high level, the flow is:
- A document is uploaded to an S3 input bucket
- S3 emits an event to EventBridge
- EventBridge starts a Step Functions parent workflow
- The parent workflow prepares a manifest (single item or batch list)
- A Distributed Map launches child workflows for each document
- Each child workflow:
- validates input
- starts async Textract
- waits/polls for completion
- retrieves and normalizes results
- stores raw + normalized outputs
- writes audit records
- routes low-confidence cases to human review
- emits a processed event to EventBridge
- Downstream systems consume the processed event
End-to-End Walkthrough
1) Document ingress with S3 + EventBridge
I use S3 as the system of record for inbound documents. The upload path usually looks like:
s3://my-docs-incoming/raw/YYYY/MM/DD/<tenant>/<document>.pdf
I prefer a structured key naming convention because it helps with:
- lifecycle policies
- tenant scoping
- troubleshooting
- cost attribution
- replay operations
S3 can publish object events to EventBridge, and I use an EventBridge rule to filter only the prefixes and file types that should trigger processing (for example, PDFs in raw/).
Example EventBridge rule pattern (S3 object created)
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {
"name": ["my-docs-incoming"]
},
"object": {
"key": [{ "prefix": "raw/" }]
}
}
}
From there, EventBridge starts the Step Functions parent state machine.
2) Parent orchestration with Step Functions
I use a Standard workflow for the parent because:
- document jobs can run for a while
- I may have retries and waits
- I may pause for human review (callback pattern)
- I want richer execution history
The parent workflow does a few things:
- Parses the incoming event (or batch request)
- Creates a manifest for processing (even if only one document)
- Sets concurrency controls (important for Textract/Lambda quotas)
- Launches a Distributed Map
Why Distributed Map matters
For real batches, a normal Map state can become limiting because of concurrency and history size. Distributed Map gives me child workflow executions, better scaling, and cleaner observability for each document.
3) Child workflow per document (core pipeline)
Each child workflow processes a single document from start to finish.
Core steps in the child workflow
-
Validate input
- file type
- bucket/key exists
- metadata (tenant, doc type, correlation ID)
- idempotency key check
-
Start Textract asynchronously
- call
StartDocumentAnalysis - capture
JobId - store correlation in audit trail
- call
-
Wait for completion
- polling loop (simple and explicit for Step Functions)
- or callback/SNS-based completion (production optimization)
-
Retrieve paginated results
- call
GetDocumentAnalysisuntilNextTokenis exhausted
- call
-
Normalize + enrich
- convert Textract blocks into domain JSON
- score confidence thresholds
- apply business rules
-
Store outputs
- raw output
- normalized output
- processing summary
- audit entries
-
Human review (if needed)
- pause with task token callback
- reviewer approves/edits/rejects
-
Emit completion event
- publish a custom EventBridge event
- downstream systems subscribe independently
4) Async Textract orchestration (practical implementation)
Textract async processing is the correct choice for multi-page documents and larger workloads. The key point is that I do not try to synchronously block a Lambda while waiting for Textract.
Instead, I let Step Functions own the waiting and retry logic.
Two patterns I use in practice
Pattern A (implemented in this post): Step Functions polling loop
This is the simplest pattern to explain and demo.
StartDocumentAnalysisWait-
GetDocumentAnalysis(status check) - loop until
SUCCEEDED/FAILED
Pros:
- Easy to understand
- No extra callback plumbing
- Great for demos and many production cases
Cons:
- More state transitions
- Not the most efficient if jobs are very long
Pattern B (production optimization): SNS/SQS + callback
Textract async operations notify completion via SNS. I can correlate JobId, then call SendTaskSuccess to resume the Step Functions execution.
Pros:
- Fewer polling transitions
- More event-driven
Cons:
- More moving parts (SNS/SQS/Lambda/token correlation)
For this article, I am implementing Pattern A for clarity, and I will still preserve auditability and scalability.
5) Result storage and audit trail (what I store and why)
This is where many demos stop too early. I do not want a pipeline that only prints extraction results to logs.
I store four classes of artifacts:
A. Raw input (immutable)
- Original document in S3 (
raw/)
B. Raw Textract result snapshot
- Either Textract output JSON (if using
OutputConfig) - Or consolidated raw blocks written by my retrieval Lambda
C. Normalized business result
A clean JSON model that downstream systems can actually use, for example:
{
"documentId": "INV-2026-000123",
"documentType": "invoice",
"vendorName": "Acme Pty Ltd",
"invoiceDate": "2026-02-25",
"totalAmount": 1285.40,
"currency": "AUD",
"fields": [
{
"name": "invoice_number",
"value": "INV-2026-000123",
"confidence": 98.7,
"source": "textract"
}
],
"review": {
"required": false,
"reason": null
}
}
D. Audit trail (DynamoDB + events)
I keep an audit table with entries like:
- correlation ID
- S3 bucket/key/version
- execution ARN
- Textract JobId
- timestamps (started/completed)
- status transitions
- retry count
- reviewer ID + decision (if reviewed)
This makes troubleshooting and compliance conversations much easier.
6) Human review step (callback pattern)
I like to add a human review step because it turns the demo into a real workflow.
When I trigger human review
I send documents for human review if:
- confidence is below threshold for required fields
- required fields are missing
- document type classification is ambiguous
- business validations fail (for example, total does not match line items)
How the callback works
- Step Functions reaches
HumanReviewRequired - It invokes a Lambda using
waitForTaskToken - That Lambda stores a review task (with the task token) in DynamoDB and optionally emits a notification event
- A reviewer UI or ops tool loads the pending task
- Reviewer approves/edits/rejects
- API Gateway + Lambda calls:
-
SendTaskSuccess(approve / approved edits) -
SendTaskFailure(reject / unrecoverable issue)
-
This lets me keep the workflow paused cleanly without polling the UI.
7) EventBridge for decoupled downstream integration
I use EventBridge in two places:
Ingress routing
- S3 object-created events trigger the parent workflow
Egress publishing
- The child workflow publishes custom events like:
document.processeddocument.review.requireddocument.failed
This is a huge win because downstream systems can subscribe independently:
- search indexing
- analytics pipelines
- case management
- notifications
- data quality dashboards
No tight coupling to the core document processing workflow.
Implementation Discussion
Below is a concrete implementation outline with code.
State machine design (parent + child)
I prefer splitting the logic into:
- Parent workflow: batching and fan-out
- Child workflow: per-document processing
This keeps each workflow easier to reason about and test.
Parent workflow (ASL fragment with Distributed Map)
{
"Comment": "Parent workflow for document batch orchestration",
"StartAt": "BuildManifest",
"States": {
"BuildManifest": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${BuildManifestFnArn}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"Next": "ProcessDocumentsDistributed"
},
"ProcessDocumentsDistributed": {
"Type": "Map",
"Label": "ProcessDocuments",
"ItemsPath": "$.documents",
"MaxConcurrencyPath": "$.maxConcurrency",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
"ExecutionType": "STANDARD"
},
"StartAt": "StartChildExecution",
"States": {
"StartChildExecution": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Arguments": {
"StateMachineArn": "${ChildStateMachineArn}",
"Input": {
"document.$": "$$.Map.Item.Value",
"batchContext.$": "$.batchContext"
}
},
"End": true
}
}
},
"Next": "BatchSummary"
},
"BatchSummary": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${BatchSummaryFnArn}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"End": true
}
}
}
Notes on this design
-
MaxConcurrencyPathlets me tune concurrency dynamically per batch. - I can set lower concurrency for production if Textract quotas are tighter than my desired fan-out.
- Child executions give me better isolation and clearer failure analysis.
Child workflow (ASL fragment for async Textract + human review + EventBridge)
{
"Comment": "Child workflow for processing one document",
"StartAt": "ValidateDocument",
"States": {
"ValidateDocument": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${ValidateDocumentFnArn}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"Next": "StartTextract"
},
"StartTextract": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:textract:startDocumentAnalysis",
"Arguments": {
"DocumentLocation": {
"S3Object": {
"Bucket.$": "$.document.bucket",
"Name.$": "$.document.key"
}
},
"FeatureTypes": ["FORMS", "TABLES"],
"ClientRequestToken.$": "$.idempotencyKey",
"JobTag.$": "$.jobTag"
},
"ResultPath": "$.textractStart",
"Next": "WaitBeforeStatusCheck",
"Retry": [
{
"ErrorEquals": [
"Textract.ThrottlingException",
"Textract.ProvisionedThroughputExceededException",
"States.TaskFailed"
],
"IntervalSeconds": 2,
"BackoffRate": 2.0,
"MaxAttempts": 5
}
]
},
"WaitBeforeStatusCheck": {
"Type": "Wait",
"Seconds": 10,
"Next": "CheckTextractStatus"
},
"CheckTextractStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:textract:getDocumentAnalysis",
"Arguments": {
"JobId.$": "$.textractStart.JobId",
"MaxResults": 1
},
"ResultPath": "$.textractStatus",
"Next": "TextractComplete?"
},
"TextractComplete?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.textractStatus.JobStatus",
"StringEquals": "SUCCEEDED",
"Next": "CollectTextractPages"
},
{
"Variable": "$.textractStatus.JobStatus",
"StringEquals": "FAILED",
"Next": "MarkFailed"
},
{
"Variable": "$.textractStatus.JobStatus",
"StringEquals": "PARTIAL_SUCCESS",
"Next": "CollectTextractPages"
}
],
"Default": "WaitBeforeStatusCheck"
},
"CollectTextractPages": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${CollectTextractPagesFnArn}",
"Payload": {
"jobId.$": "$.textractStart.JobId",
"document.$": "$.document",
"executionArn.$": "$$.Execution.Id"
}
},
"OutputPath": "$.Payload",
"Next": "NormalizeAndScore"
},
"NormalizeAndScore": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${NormalizeAndScoreFnArn}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"Next": "NeedsHumanReview?"
},
"NeedsHumanReview?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.review.required",
"BooleanEquals": true,
"Next": "CreateHumanReviewTask"
}
],
"Default": "PublishProcessedEvent"
},
"CreateHumanReviewTask": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Arguments": {
"FunctionName": "${CreateHumanReviewTaskFnArn}",
"Payload": {
"taskToken.$": "$$.Task.Token",
"document.$": "$.document",
"result.$": "$",
"executionArn.$": "$$.Execution.Id"
}
},
"TimeoutSeconds": 604800,
"ResultPath": "$.humanReviewDecision",
"Next": "ApplyHumanReviewDecision"
},
"ApplyHumanReviewDecision": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${ApplyHumanReviewDecisionFnArn}",
"Payload": {
"pipelineResult.$": "$",
"reviewDecision.$": "$.humanReviewDecision.Payload"
}
},
"OutputPath": "$.Payload",
"Next": "PublishProcessedEvent"
},
"PublishProcessedEvent": {
"Type": "Task",
"Resource": "arn:aws:states:::events:putEvents",
"Arguments": {
"Entries": [
{
"Source": "com.example.documents",
"DetailType": "document.processed",
"EventBusName": "${EventBusName}",
"Detail": {
"documentId.$": "$.document.id",
"bucket.$": "$.document.bucket",
"key.$": "$.document.key",
"status": "PROCESSED",
"review.$": "$.review",
"output.$": "$.output"
}
}
]
},
"Next": "FinalizeAudit"
},
"FinalizeAudit": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${FinalizeAuditFnArn}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"End": true
},
"MarkFailed": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Arguments": {
"FunctionName": "${MarkFailedFnArn}",
"Payload.$": "$"
},
"OutputPath": "$.Payload",
"End": true
}
}
}
Lambda: Collect paginated Textract results and store raw output (Python)
This Lambda consolidates paginated GetDocumentAnalysis responses, writes a raw artifact to S3, and returns a pointer for downstream normalization.
import json
import os
from datetime import datetime, timezone
import boto3
textract = boto3.client("textract")
s3 = boto3.client("s3")
RAW_RESULTS_BUCKET = os.environ["RAW_RESULTS_BUCKET"]
RAW_RESULTS_PREFIX = os.environ.get("RAW_RESULTS_PREFIX", "textract-raw")
def lambda_handler(event, context):
job_id = event["jobId"]
document = event["document"]
execution_arn = event["executionArn"]
blocks = []
next_token = None
first_page_metadata = None
final_status = None
warnings = []
while True:
kwargs = {"JobId": job_id, "MaxResults": 1000}
if next_token:
kwargs["NextToken"] = next_token
resp = textract.get_document_analysis(**kwargs)
final_status = resp.get("JobStatus", final_status)
if first_page_metadata is None:
first_page_metadata = {
"DocumentMetadata": resp.get("DocumentMetadata", {}),
"AnalyzeDocumentModelVersion": resp.get("AnalyzeDocumentModelVersion"),
}
blocks.extend(resp.get("Blocks", []))
warnings.extend(resp.get("Warnings", []))
next_token = resp.get("NextToken")
if not next_token:
break
now = datetime.now(timezone.utc).isoformat()
raw_key = (
f"{RAW_RESULTS_PREFIX}/"
f"{document.get('tenant', 'unknown')}/"
f"{document['id']}/"
f"{job_id}.json"
)
payload = {
"jobId": job_id,
"document": document,
"executionArn": execution_arn,
"collectedAt": now,
"jobStatus": final_status,
"metadata": first_page_metadata,
"warnings": warnings,
"blocks": blocks,
}
s3.put_object(
Bucket=RAW_RESULTS_BUCKET,
Key=raw_key,
Body=json.dumps(payload).encode("utf-8"),
ContentType="application/json",
)
return {
"document": document,
"textract": {
"jobId": job_id,
"jobStatus": final_status,
"rawResultsS3": {
"bucket": RAW_RESULTS_BUCKET,
"key": raw_key,
},
"blockCount": len(blocks),
"warningsCount": len(warnings),
},
}
Lambda: Normalize and score extracted data (Python)
This is where I transform Textract’s generic block model into a domain result that downstream systems can consume.
import json
import os
from decimal import Decimal
import boto3
s3 = boto3.client("s3")
NORMALIZED_BUCKET = os.environ["NORMALIZED_BUCKET"]
NORMALIZED_PREFIX = os.environ.get("NORMALIZED_PREFIX", "normalized")
def _load_json_from_s3(bucket: str, key: str) -> dict:
obj = s3.get_object(Bucket=bucket, Key=key)
return json.loads(obj["Body"].read())
def _extract_lines_and_words(blocks):
# Minimal example; in production I usually build maps by block Id and relationships.
lines = []
for b in blocks:
if b.get("BlockType") == "LINE":
lines.append({
"text": b.get("Text", ""),
"confidence": b.get("Confidence", 0.0)
})
return lines
def _simple_invoice_heuristics(lines):
# Demo heuristics only. Replace with rule engine / ML classifier as needed.
text_blob = "\n".join([l["text"] for l in lines])
avg_conf = sum(l["confidence"] for l in lines) / max(len(lines), 1)
result = {
"documentType": "unknown",
"fields": [],
"review": {
"required": False,
"reason": None
}
}
if "invoice" in text_blob.lower():
result["documentType"] = "invoice"
# Example field extraction heuristic placeholder
# In production, I'd parse key-value pairs and tables from FORM/TABLE blocks
result["fields"].append({
"name": "avg_line_confidence",
"value": round(avg_conf, 2),
"confidence": round(avg_conf, 2),
"source": "textract"
})
# Review policy example
if avg_conf < 90:
result["review"]["required"] = True
result["review"]["reason"] = "Average OCR confidence below threshold"
return result
def lambda_handler(event, context):
document = event["document"]
textract_ptr = event["textract"]["rawResultsS3"]
raw = _load_json_from_s3(textract_ptr["bucket"], textract_ptr["key"])
blocks = raw["blocks"]
lines = _extract_lines_and_words(blocks)
derived = _simple_invoice_heuristics(lines)
normalized = {
"document": document,
"textract": {
"jobId": raw["jobId"],
"jobStatus": raw["jobStatus"],
"warnings": raw.get("warnings", []),
"blockCount": len(blocks)
},
"classification": {
"documentType": derived["documentType"]
},
"fields": derived["fields"],
"review": derived["review"],
"output": {
"rawResultsS3": textract_ptr
}
}
out_key = f"{NORMALIZED_PREFIX}/{document.get('tenant', 'unknown')}/{document['id']}.json"
s3.put_object(
Bucket=NORMALIZED_BUCKET,
Key=out_key,
Body=json.dumps(normalized).encode("utf-8"),
ContentType="application/json"
)
normalized["output"]["normalizedS3"] = {
"bucket": NORMALIZED_BUCKET,
"key": out_key
}
return normalized
Lambda: Create human review task (callback token storage) (Python)
This Lambda stores the task token so a reviewer can resume the workflow later.
import json
import os
import time
from datetime import datetime, timezone
import boto3
ddb = boto3.resource("dynamodb")
events = boto3.client("events")
REVIEW_TABLE = ddb.Table(os.environ["REVIEW_TABLE"])
EVENT_BUS_NAME = os.environ["EVENT_BUS_NAME"]
def lambda_handler(event, context):
task_token = event["taskToken"]
document = event["document"]
result = event["result"]
execution_arn = event["executionArn"]
review_id = f"{document['id']}#{int(time.time())}"
now = datetime.now(timezone.utc).isoformat()
item = {
"reviewId": review_id,
"status": "PENDING",
"createdAt": now,
"documentId": document["id"],
"executionArn": execution_arn,
"taskToken": task_token,
"document": document,
"resultSummary": {
"reason": result["review"]["reason"],
"documentType": result.get("classification", {}).get("documentType", "unknown"),
"normalizedOutput": result["output"].get("normalizedS3")
}
}
REVIEW_TABLE.put_item(Item=item)
events.put_events(
Entries=[
{
"Source": "com.example.documents",
"DetailType": "document.review.required",
"EventBusName": EVENT_BUS_NAME,
"Detail": json.dumps({
"reviewId": review_id,
"documentId": document["id"],
"reason": result["review"]["reason"]
})
}
]
)
# For waitForTaskToken, Lambda can return immediately after storing task metadata.
# The workflow stays paused until SendTaskSuccess/SendTaskFailure is called.
return {
"reviewId": review_id,
"status": "PENDING"
}
Lambda: Reviewer callback endpoint (API Gateway -> Lambda -> Step Functions) (Python)
This Lambda receives the human decision and resumes the waiting execution.
import json
import os
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
ddb = boto3.resource("dynamodb")
sfn = boto3.client("stepfunctions")
REVIEW_TABLE = ddb.Table(os.environ["REVIEW_TABLE"])
def _response(status_code, body):
return {
"statusCode": status_code,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body)
}
def lambda_handler(event, context):
try:
body = json.loads(event.get("body") or "{}")
review_id = body["reviewId"]
action = body["action"] # approve | reject
reviewer = body.get("reviewer", "unknown")
corrected_fields = body.get("correctedFields", {})
item_resp = REVIEW_TABLE.get_item(Key={"reviewId": review_id})
item = item_resp.get("Item")
if not item:
return _response(404, {"message": "Review task not found"})
if item["status"] != "PENDING":
return _response(409, {"message": f"Review task already {item['status']}"})
task_token = item["taskToken"]
now = datetime.now(timezone.utc).isoformat()
if action == "approve":
sfn.send_task_success(
taskToken=task_token,
output=json.dumps({
"reviewId": review_id,
"decision": "APPROVED",
"reviewer": reviewer,
"reviewedAt": now,
"correctedFields": corrected_fields
})
)
new_status = "APPROVED"
elif action == "reject":
sfn.send_task_failure(
taskToken=task_token,
error="HumanReviewRejected",
cause=json.dumps({
"reviewId": review_id,
"reviewer": reviewer,
"reviewedAt": now
})
)
new_status = "REJECTED"
else:
return _response(400, {"message": "Invalid action"})
REVIEW_TABLE.update_item(
Key={"reviewId": review_id},
UpdateExpression="SET #s = :s, reviewer = :r, reviewedAt = :t",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":s": new_status,
":r": reviewer,
":t": now
}
)
return _response(200, {"message": "Review decision recorded", "status": new_status})
except KeyError as e:
return _response(400, {"message": f"Missing field: {e.args[0]}"})
except ClientError as e:
return _response(500, {"message": "AWS error", "detail": str(e)})
except Exception as e:
return _response(500, {"message": "Unexpected error", "detail": str(e)})
CDK example (Python) for S3 -> EventBridge -> Step Functions start
This is a simplified snippet to show the core wiring. In a production stack, I would break this into constructs (ingress, orchestration, review, storage, observability).
from aws_cdk import (
Stack,
aws_s3 as s3,
aws_events as events,
aws_events_targets as targets,
aws_stepfunctions as sfn,
aws_iam as iam,
)
from constructs import Construct
class DocumentPipelineIngressStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs):
super().__init__(scope, construct_id, **kwargs)
input_bucket = s3.Bucket(
self,
"InputBucket",
event_bridge_enabled=True
)
parent_state_machine = sfn.StateMachine.from_state_machine_arn(
self,
"ParentStateMachine",
"arn:aws:states:ap-southeast-2:123456789012:stateMachine:doc-parent"
)
rule = events.Rule(
self,
"S3ObjectCreatedRule",
event_pattern=events.EventPattern(
source=["aws.s3"],
detail_type=["Object Created"],
detail={
"bucket": {"name": [input_bucket.bucket_name]},
"object": {"key": [{"prefix": "raw/"}]}
}
)
)
# Start execution on matching events
rule.add_target(targets.SfnStateMachine(parent_state_machine))
In a real deployment, I also add:
- explicit execution input transformation
- IAM least privilege policies
- DLQs / retry policies on targets where appropriate
- environment-based prefix filtering (dev/test/prod)
Data model and audit design (recommended)
I strongly recommend treating auditability as a first-class feature.
DynamoDB audit table (example keys)
PK: DOC#<documentId>
SK: EVENT#<timestamp>#<eventType>
Example audit events
INGESTEDWORKFLOW_STARTEDTEXTRACT_STARTEDTEXTRACT_SUCCEEDEDNORMALIZEDHUMAN_REVIEW_REQUIREDHUMAN_REVIEW_APPROVEDPUBLISHEDFAILED
This pattern gives me an append-only lineage timeline per document.
Error handling and retries (what I do on purpose)
This pipeline is asynchronous and distributed, so failures are normal. I design for them.
Where I retry
- Textract throttling / throughput exceptions
- transient Lambda errors
- EventBridge
PutEventspartial failures (Step Functions can fail the task and retry) - downstream S3 write failures
Where I do not blindly retry
- invalid file format
- missing required metadata
- corrupted documents
- deterministic business-rule violations
Failure routing
I emit a document.failed event and write a terminal audit entry with:
- failure type
- step name
- error code
- correlation IDs
That makes operations and replay much easier.
Cost and Throughput Tuning
This is where the architecture becomes production-grade.
1) Choose Step Functions workflow type intentionally
I use Standard for the main orchestration because:
- async waits
- human callback pauses
- richer history and control
If I want to optimize cost for very high-volume short preprocessing tasks, I may split out a lightweight Express workflow in front, but I keep the long-running document path in Standard.
2) Tune Distributed Map concurrency to downstream capacity
Distributed Map can scale very high, but the bottleneck is usually downstream service quotas (Textract, Lambda concurrency, DynamoDB write capacity, etc.).
What I do:
- make concurrency configurable (
MaxConcurrencyPath) - start conservatively in production
- load test with realistic document sizes and page counts
- request quota increases before large launches
3) Reduce unnecessary state transitions
Polling is simple, but excessive polling increases state transitions and cost.
Practical tuning:
- increase wait interval after the first few polls (progressive backoff)
- use doc-size-aware polling intervals (longer waits for large PDFs)
- consider SNS/SQS callback for long-running jobs
4) Store outputs once, reuse many times
I avoid re-running Textract when:
- a replay is only needed for normalization logic changes
- a downstream consumer fails
- a human review needs to re-open a prior result
By storing raw output + normalized output + audit events, I can replay downstream steps without paying for re-extraction.
5) Separate hot path and review path
I do not force all documents through human review.
Most documents should:
- extract
- validate
- publish
- finish
Only low-confidence or exception cases enter the review path. This keeps cost and latency predictable.
6) Add lifecycle policies
For S3 cost management, I usually apply different retention windows:
- raw uploads: longer retention (compliance dependent)
- raw Textract artifacts: medium retention
- normalized results: longer (business value)
- debug-only artifacts: shorter
Security, Compliance, and Operational Notes
IAM least privilege
Give each component only what it needs:
- Step Functions can call specific Lambdas / Textract / EventBridge
- Lambdas can read/write only the intended buckets/prefixes
- Reviewer API can only access review table + Step Functions callback APIs
Encryption
I enable encryption at rest for:
- S3 buckets
- DynamoDB tables
- logs (when applicable)
If needed, I also use KMS for tighter key control and auditability.
PII considerations
Document pipelines often contain sensitive data. I plan for:
- access controls by tenant
- minimal logging (no full document contents in logs)
- review UI redaction where needed
- retention policies aligned to policy/legal requirements
Observability
I instrument:
- Step Functions execution failure alarms
- Lambda error/throttle alarms
- custom metrics (documents processed, review rate, average pages/doc, latency)
- audit events for business-level tracing
Demo Tips (if you are presenting this live)
This pattern demos extremely well if you show all three views:
-
Upload a document
- drag-and-drop into S3 (or app UI)
-
Workflow progress
- Step Functions execution graph
- Distributed Map child runs for batches
-
Outputs
- raw Textract JSON pointer
- normalized JSON in S3
- audit trail item(s)
- human review queue (triggered by a low-confidence sample)
If I want a smooth live demo, I prepare:
- one “clean” invoice (no review needed)
- one noisy/scanned doc (human review path)
- one small batch manifest (3 to 10 docs) to show Distributed Map fan-out
Extensions I would add next
Once this base pipeline is running, it becomes a great platform for extensions:
- Document classification before extraction (route to different extraction templates)
- Schema validation per document type
- OpenSearch indexing for searchable archives
- Comprehend / Bedrock post-processing for summaries and entity normalization
- Tenant-specific extraction configs (thresholds, required fields, routing)
- Replay tooling for partial reprocessing from audit trail checkpoints
Closing Thoughts
I like this architecture because it is not just a toy OCR demo. It demonstrates a proper event-driven, auditable, scalable workflow that teams can actually build on.
The combination of:
- S3 for durable ingress
- Textract for extraction
- Step Functions for orchestration
- Distributed Map for batch scale
- EventBridge for decoupling
- callback-based human review for exceptions
gives a strong foundation for production document intelligence systems.
If I were publishing this as a developer advocacy post, I would also include a sample repository with:
- CDK deployment
- test documents
- reviewer UI mock
- load test script
- dashboards/alarms
That turns the article into something readers can run and evolve.
References
- Amazon Textract async APIs and async processing overview (start/get patterns, async job model, supported file types, SNS notification model for async operations)
-
GetDocumentAnalysispagination (MaxResults,NextToken) and paginated retrieval behavior - Textract output storage and
OutputConfig(custom S3 output and optional KMS for output) - Textract quotas and Service Quotas guidance
- S3 -> EventBridge integration and S3 EventBridge event structure
- Step Functions Distributed Map (when to use it, high concurrency, child workflows, map runs, input/history limits)
- Step Functions callback/task token pattern and long waits for external/human steps
- Step Functions optimized EventBridge integration (
events:putEvents) and failure handling behavior - Step Functions pricing model (state transitions for Standard workflows)

Top comments (0)