Introduction
In today's digital world, organizations are drowning in unstructured data. Documents, images, and PDFs contain valuable information that often remains untapped due to the manual effort required to extract and analyze it. What if we could automatically process these documents, extract meaningful insights, and store structured data for further analysis?
This blog post will guide you through building a complete intelligent document processing pipeline using AWS services. Our pipeline will automatically:
- Extract text from images and PDFs using Amazon Textract
- Analyze content for entities, key phrases, and sentiment using Amazon Comprehend
- Store structured results in DynamoDB for easy querying and analysis
- Process documents automatically when uploaded to S3
What We're Building
Our pipeline creates a seamless flow where:
- Documents are uploaded to an S3 bucket (images, PDFs, etc.)
- Lambda function triggers automatically when new files arrive
- Textract extracts text and identifies document layout
- Comprehend analyzes the extracted text for insights
- Results are stored in DynamoDB with structured metadata
Architecture Overview
System Requirements
- Lambda Runtime: Python 3.10
- Memory: 1024 MB (recommended)
- Timeout: 120 seconds
-
Environment Variables:
-
DDB_TABLE
: SmartDocResults (default) -
LANG
: en (default)
-
Step-by-Step Implementation
Step 1: Create DynamoDB Table
- Navigate to AWS Console → DynamoDB
- Click "Create table"
- Configure:
-
Table name:
SmartDocResults
-
Partition key:
doc_id
(String) -
Sort key:
paragraph_id
(String)
-
Table name:
- Click "Create table"
- Wait for table status = "Active"
Step 2: Create S3 Bucket
- AWS Console → S3
- Click "Create bucket"
- Configure:
-
Bucket name:
your-smart-doc-bucket
(change to unique name) -
Region: Choose your preferred region
-
Bucket name:
- Click "Create bucket"
- Remember the bucket name for IAM policy
Step 3: Create IAM Policy
- AWS Console → IAM → Policies
- Click "Create policy"
- Switch to "JSON" tab
- Copy content from
iam_policy.json
and replace placeholders:-
ACCOUNT_ID
: Your AWS account ID -
REGION
: Your region (e.g., us-east-1) -
BUCKET_NAME
: S3 bucket name from step 2
-
- Click "Next: Tags" → "Next: Review"
- Name the policy:
SmartDocLambdaPolicy
- Click "Create policy"
Least-privilege IAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3Access",
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": "arn:aws:s3:::BUCKET_NAME/*"
},
{
"Sid": "TextractAccess",
"Effect": "Allow",
"Action": [
"textract:AnalyzeDocument"
],
"Resource": "*"
},
{
"Sid": "ComprehendAccess",
"Effect": "Allow",
"Action": [
"comprehend:DetectEntities",
"comprehend:DetectKeyPhrases",
"comprehend:DetectSentiment"
],
"Resource": "*"
},
{
"Sid": "DynamoDBAccess",
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:Query",
"dynamodb:Scan"
],
"Resource": "arn:aws:dynamodb:REGION:ACCOUNT_ID:table/SmartDocResults"
},
{
"Sid": "CloudWatchLogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:REGION:ACCOUNT_ID:*"
}
]
}
Step 4: Create IAM Role for Lambda
- AWS Console → IAM → Roles
- Click "Create role"
- Select "AWS service" → "Lambda"
- Click "Next"
- In "Permissions" tab:
- Find and select the
SmartDocLambdaPolicy
you just created - Check the policy
- Find and select the
- Click "Next: Tags"
- Name the role:
SmartDocLambdaRole
- Click "Create role"
Step 5: Create Lambda Function
- AWS Console → Lambda → Functions
- Click "Create function"
- Select "Author from scratch"
- Configure:
-
Function name:
SmartDocProcessor
- Runtime: Python 3.10
-
Architecture: x86_64
-
Change default execution role: Select "Use an existing role" →
SmartDocLambdaRole
-
Function name:
- Click "Create function"
Step 6: Configure Lambda Function
- In Lambda function, scroll to "Code" section
- Delete default code and paste content from
lambda_function.py
- Click "Deploy"
- Configure "Configuration":
-
General:
- Memory: 1024 MB
-
Timeout: 2 minutes
-
Environment variables:
-
DDB_TABLE
:SmartDocResults
-
LANG
:en
-
-
General:
import json
import boto3
import os
from datetime import datetime
from urllib.parse import unquote_plus
from typing import List, Dict, Any, Optional
import logging
from decimal import Decimal
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize AWS clients
textract = boto3.client('textract')
comprehend = boto3.client('comprehend')
dynamodb = boto3.resource('dynamodb')
# Environment variables
DDB_TABLE = os.environ.get('DDB_TABLE', 'SmartDocResults')
LANG = os.environ.get('LANG', 'en')
def lambda_handler(event, context):
"""
Main Lambda handler for S3 → Textract → Comprehend → DynamoDB pipeline
"""
logger.info(f"Processing event: {json.dumps(event)}")
# Get DynamoDB table
table = dynamodb.Table(DDB_TABLE)
# Process each S3 record
for record in event.get('Records', []):
try:
# Extract S3 information
bucket = record['s3']['bucket']['name']
key = unquote_plus(record['s3']['object']['key'])
doc_id = os.path.basename(key)
logger.info(f"Processing document: {doc_id} from bucket: {bucket}")
# Step 1: Extract text using Textract
text_lines = extract_text_from_s3(bucket, key)
if not text_lines:
logger.warning(f"No text extracted from {doc_id}")
continue
# Step 2: Split into paragraphs
paragraphs = split_paragraphs(text_lines)
logger.info(f"Found {len(paragraphs)} paragraphs in {doc_id}")
# Step 3: Process each paragraph with Comprehend
for paragraph_id, paragraph in enumerate(paragraphs, 1):
if len(paragraph) >= 20: # Only process paragraphs with >= 20 characters
logger.info(f"Processing paragraph {paragraph_id} (length: {len(paragraph)})")
# Analyze with Comprehend
entities = detect_entities_safe(paragraph, LANG)
key_phrases = detect_key_phrases_safe(paragraph, LANG)
sentiment = safe_detect_sentiment(paragraph, LANG)
# Convert float values to Decimal for DynamoDB
entities = convert_floats_to_decimal(entities)
key_phrases = convert_floats_to_decimal(key_phrases)
# Save to DynamoDB
item = {
'doc_id': doc_id,
'paragraph_id': str(paragraph_id), # Convert to string
'content': paragraph,
'entities': entities,
'key_phrases': key_phrases,
'sentiment': sentiment,
'created_at': datetime.utcnow().isoformat() + 'Z'
}
table.put_item(Item=item)
logger.info(f"Saved paragraph {paragraph_id} to DynamoDB")
else:
logger.info(f"Skipping paragraph {paragraph_id} (too short: {len(paragraph)} chars)")
logger.info(f"Successfully processed document: {doc_id}")
except Exception as e:
logger.error(f"Error processing record {record}: {str(e)}")
# Continue processing other records
continue
return {
'statusCode': 200,
'body': json.dumps('Processing completed')
}
def extract_text_from_s3(bucket: str, key: str) -> List[str]:
"""
Extract text from S3 object using Textract synchronous API
"""
try:
response = textract.analyze_document(
Document={
'S3Object': {
'Bucket': bucket,
'Name': key
}
},
FeatureTypes=['LAYOUT']
)
# Extract LINE blocks and sort by reading order
lines = []
line_blocks = [block for block in response['Blocks'] if block['BlockType'] == 'LINE']
# Sort by reading order (top to bottom, left to right)
line_blocks.sort(key=lambda x: (x['Geometry']['BoundingBox']['Top'],
x['Geometry']['BoundingBox']['Left']))
for block in line_blocks:
if 'Text' in block:
lines.append(block['Text'])
logger.info(f"Extracted {len(lines)} lines from document")
return lines
except Exception as e:
logger.error(f"Error extracting text from {bucket}/{key}: {str(e)}")
return []
def split_paragraphs(lines: List[str]) -> List[str]:
"""
Split lines into paragraphs based on spacing and punctuation rules
"""
if not lines:
return []
paragraphs = []
current_paragraph = []
for line in lines:
line = line.strip()
if not line:
# Empty line - end current paragraph
if current_paragraph:
paragraphs.append(' '.join(current_paragraph))
current_paragraph = []
elif line.endswith('.') and len(line) > 1:
# Line ends with period - add to current paragraph and end it
current_paragraph.append(line)
paragraphs.append(' '.join(current_paragraph))
current_paragraph = []
else:
# Regular line - add to current paragraph
current_paragraph.append(line)
# Add final paragraph if exists
if current_paragraph:
paragraphs.append(' '.join(current_paragraph))
return paragraphs
def detect_entities_safe(text: str, language_code: str) -> List[Dict[str, Any]]:
"""
Safely detect entities using Comprehend with error handling
"""
try:
response = comprehend.detect_entities(
Text=text,
LanguageCode=language_code
)
return response['Entities']
except Exception as e:
logger.error(f"Error detecting entities: {str(e)}")
return []
def detect_key_phrases_safe(text: str, language_code: str) -> List[Dict[str, Any]]:
"""
Safely detect key phrases using Comprehend with error handling
"""
try:
response = comprehend.detect_key_phrases(
Text=text,
LanguageCode=language_code
)
return response['KeyPhrases']
except Exception as e:
logger.error(f"Error detecting key phrases: {str(e)}")
return []
def safe_detect_sentiment(text: str, language_code: str) -> str:
"""
Safely detect sentiment with chunking for large texts
Comprehend has a 5000 byte limit for DetectSentiment
"""
try:
# Check if text is small enough for single call
if len(text.encode('utf-8')) <= 4500:
response = comprehend.detect_sentiment(
Text=text,
LanguageCode=language_code
)
return response['Sentiment']
# For large texts, chunk and aggregate
logger.info(f"Text too large for single sentiment call, chunking...")
chunks = chunk_text(text, 4000) # Leave some buffer
sentiments = []
for chunk in chunks:
try:
response = comprehend.detect_sentiment(
Text=chunk,
LanguageCode=language_code
)
sentiments.append(response['Sentiment'])
except Exception as e:
logger.error(f"Error detecting sentiment for chunk: {str(e)}")
sentiments.append('UNKNOWN')
# Aggregate sentiments (simple majority vote)
return aggregate_sentiments(sentiments)
except Exception as e:
logger.error(f"Error detecting sentiment: {str(e)}")
return 'UNKNOWN'
def chunk_text(text: str, max_bytes: int) -> List[str]:
"""
Split text into chunks that don't exceed max_bytes
"""
chunks = []
current_chunk = ""
for word in text.split():
test_chunk = current_chunk + " " + word if current_chunk else word
if len(test_chunk.encode('utf-8')) <= max_bytes:
current_chunk = test_chunk
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = word
if current_chunk:
chunks.append(current_chunk)
return chunks
def aggregate_sentiments(sentiments: List[str]) -> str:
"""
Aggregate multiple sentiment results into a single sentiment
"""
if not sentiments:
return 'UNKNOWN'
# Count sentiment occurrences
sentiment_counts = {}
for sentiment in sentiments:
sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1
# Return the most common sentiment
return max(sentiment_counts, key=sentiment_counts.get)
def convert_floats_to_decimal(data: Any) -> Any:
"""
Recursively convert float values to Decimal for DynamoDB compatibility
"""
if isinstance(data, dict):
return {key: convert_floats_to_decimal(value) for key, value in data.items()}
elif isinstance(data, list):
return [convert_floats_to_decimal(item) for item in data]
elif isinstance(data, float):
return Decimal(str(data))
else:
return data
Step 7: Add S3 Trigger
- In Lambda function → "Add trigger"
- Select "S3"
- Configure:
- Bucket: Select bucket created in step 2
-
Event type:
All object create events
- Prefix: (leave empty or add folder path)
-
Suffix: (leave empty)
- Check "Recursive invocation" if you want to process files in subfolders
- Click "Add"
Step 8: Test with Sample File
- Upload a test file to your S3 bucket:
- Supported file types: JPG, PNG, PDF (1 page)
-
Size: < 10MB (Textract sync limit)
- Check CloudWatch Logs:
- Lambda function → "Monitor" → "View CloudWatch logs"
- Look for log group:
/aws/lambda/SmartDocProcessor
- Check DynamoDB:
- DynamoDB → Tables →
SmartDocResults
→ "Items" - View created items
### Step 9: Verify Results
- DynamoDB → Tables →
Sample DynamoDB Item
{
"doc_id": "sample-document.pdf",
"paragraph_id": "1",
"content": "This is the first paragraph of the document with important information.",
"entities": [
{
"Text": "sample-document.pdf",
"Type": "OTHER",
"Score": 0.99
}
],
"key_phrases": [
{
"Text": "important information",
"Score": 0.99
}
],
"sentiment": "POSITIVE",
"created_at": "2024-01-15T10:30:00.000Z"
}
How to Verify
- CloudWatch Logs: View logs for debugging and monitoring processing
- DynamoDB Console: View items with complete information
- S3 Console: Confirm file was uploaded
Troubleshooting
Common Issues:
- Permission denied: Check IAM role has sufficient permissions
- Textract error: File too large or unsupported format
- DynamoDB error: Check table name and permissions
- Timeout: Increase memory or timeout for Lambda
- Comprehend error: Check text language and size limits
Debug Tips:
- View CloudWatch Logs for specific errors
- Verify IAM policy has correct placeholders
- Confirm S3 bucket and DynamoDB table exist
- Test Textract and Comprehend services separately
Next Steps & Advanced Features
1. Multi-page PDF Processing
- Use Textract async API (
StartDocumentAnalysis
) - Combine with SNS for completion notifications
- Modify Lambda to handle SNS events
2. Custom Comprehend Models
- Create custom entity recognizers for specific domains
- Use custom classification models
- Improve analysis accuracy
3. Security Enhancements
- Use KMS for DynamoDB data encryption
- Enable S3 server-side encryption
- Deploy Lambda in VPC for high security
- Use IAM roles instead of access keys
4. Monitoring & Alerting
- Set up CloudWatch alarms
- Create dashboards to monitor pipeline
- Use X-Ray for request tracing
Conclusion
You've successfully built an intelligent document processing pipeline that can automatically extract and analyze content from various document types. This solution provides:
- Automated processing of uploaded documents
- Intelligent text extraction using AWS Textract
- Content analysis with entity recognition, key phrase extraction, and sentiment analysis
- Structured data storage in DynamoDB for easy querying
- Scalable architecture that can handle varying workloads
This pipeline can be extended for various use cases such as:
- Document classification and routing
- Content moderation
- Data extraction for business intelligence
- Automated compliance checking
- Customer support ticket analysis
The combination of AWS services provides a robust, scalable solution that can grow with your needs while maintaining security and cost-effectiveness.
Top comments (0)