DEV Community

Lam Bùi
Lam Bùi

Posted on

Building an Intelligent Document Processing Pipeline with AWS: S3 Textract Comprehend DynamoDB

Introduction

In today's digital world, organizations are drowning in unstructured data. Documents, images, and PDFs contain valuable information that often remains untapped due to the manual effort required to extract and analyze it. What if we could automatically process these documents, extract meaningful insights, and store structured data for further analysis?

This blog post will guide you through building a complete intelligent document processing pipeline using AWS services. Our pipeline will automatically:

  • Extract text from images and PDFs using Amazon Textract
  • Analyze content for entities, key phrases, and sentiment using Amazon Comprehend
  • Store structured results in DynamoDB for easy querying and analysis
  • Process documents automatically when uploaded to S3

What We're Building

Our pipeline creates a seamless flow where:

  1. Documents are uploaded to an S3 bucket (images, PDFs, etc.)
  2. Lambda function triggers automatically when new files arrive
  3. Textract extracts text and identifies document layout
  4. Comprehend analyzes the extracted text for insights
  5. Results are stored in DynamoDB with structured metadata

Architecture Overview

System Requirements

  • Lambda Runtime: Python 3.10
  • Memory: 1024 MB (recommended)
  • Timeout: 120 seconds
  • Environment Variables:
    • DDB_TABLE: SmartDocResults (default)
    • LANG: en (default)

Step-by-Step Implementation

Step 1: Create DynamoDB Table

  1. Navigate to AWS Console → DynamoDB
  2. Click "Create table"
  3. Configure:
    • Table name: SmartDocResults
    • Partition key: doc_id (String)
    • Sort key: paragraph_id (String)
  4. Click "Create table"
  5. Wait for table status = "Active"

Step 2: Create S3 Bucket

  1. AWS Console → S3
  2. Click "Create bucket"
  3. Configure:
    • Bucket name: your-smart-doc-bucket (change to unique name)
    • Region: Choose your preferred region
  4. Click "Create bucket"
  5. Remember the bucket name for IAM policy

Step 3: Create IAM Policy

  1. AWS Console → IAM → Policies
  2. Click "Create policy"
  3. Switch to "JSON" tab
  4. Copy content from iam_policy.json and replace placeholders:
    • ACCOUNT_ID: Your AWS account ID
    • REGION: Your region (e.g., us-east-1)
    • BUCKET_NAME: S3 bucket name from step 2
  5. Click "Next: Tags""Next: Review"
  6. Name the policy: SmartDocLambdaPolicy
  7. Click "Create policy"

Least-privilege IAM Policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": "arn:aws:s3:::BUCKET_NAME/*"
        },
        {
            "Sid": "TextractAccess",
            "Effect": "Allow",
            "Action": [
                "textract:AnalyzeDocument"
            ],
            "Resource": "*"
        },
        {
            "Sid": "ComprehendAccess",
            "Effect": "Allow",
            "Action": [
                "comprehend:DetectEntities",
                "comprehend:DetectKeyPhrases",
                "comprehend:DetectSentiment"
            ],
            "Resource": "*"
        },
        {
            "Sid": "DynamoDBAccess",
            "Effect": "Allow",
            "Action": [
                "dynamodb:PutItem",
                "dynamodb:GetItem",
                "dynamodb:Query",
                "dynamodb:Scan"
            ],
            "Resource": "arn:aws:dynamodb:REGION:ACCOUNT_ID:table/SmartDocResults"
        },
        {
            "Sid": "CloudWatchLogs",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:REGION:ACCOUNT_ID:*"
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Create IAM Role for Lambda

  1. AWS Console → IAM → Roles
  2. Click "Create role"
  3. Select "AWS service""Lambda"
  4. Click "Next"
  5. In "Permissions" tab:
    • Find and select the SmartDocLambdaPolicy you just created
    • Check the policy
  6. Click "Next: Tags"
  7. Name the role: SmartDocLambdaRole
  8. Click "Create role"

Step 5: Create Lambda Function

  1. AWS Console → Lambda → Functions
  2. Click "Create function"
  3. Select "Author from scratch"
  4. Configure:
    • Function name: SmartDocProcessor
    • Runtime: Python 3.10
    • Architecture: x86_64
    • Change default execution role: Select "Use an existing role"SmartDocLambdaRole
  5. Click "Create function"

Step 6: Configure Lambda Function

  1. In Lambda function, scroll to "Code" section
  2. Delete default code and paste content from lambda_function.py
  3. Click "Deploy"
  4. Configure "Configuration":
    • General:
      • Memory: 1024 MB
      • Timeout: 2 minutes
    • Environment variables:
      • DDB_TABLE: SmartDocResults
      • LANG: en
import json
import boto3
import os
from datetime import datetime
from urllib.parse import unquote_plus
from typing import List, Dict, Any, Optional
import logging
from decimal import Decimal

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Initialize AWS clients
textract = boto3.client('textract')
comprehend = boto3.client('comprehend')
dynamodb = boto3.resource('dynamodb')

# Environment variables
DDB_TABLE = os.environ.get('DDB_TABLE', 'SmartDocResults')
LANG = os.environ.get('LANG', 'en')

def lambda_handler(event, context):
    """
    Main Lambda handler for S3 → Textract → Comprehend → DynamoDB pipeline
    """
    logger.info(f"Processing event: {json.dumps(event)}")

    # Get DynamoDB table
    table = dynamodb.Table(DDB_TABLE)

    # Process each S3 record
    for record in event.get('Records', []):
        try:
            # Extract S3 information
            bucket = record['s3']['bucket']['name']
            key = unquote_plus(record['s3']['object']['key'])
            doc_id = os.path.basename(key)

            logger.info(f"Processing document: {doc_id} from bucket: {bucket}")

            # Step 1: Extract text using Textract
            text_lines = extract_text_from_s3(bucket, key)
            if not text_lines:
                logger.warning(f"No text extracted from {doc_id}")
                continue

            # Step 2: Split into paragraphs
            paragraphs = split_paragraphs(text_lines)
            logger.info(f"Found {len(paragraphs)} paragraphs in {doc_id}")

            # Step 3: Process each paragraph with Comprehend
            for paragraph_id, paragraph in enumerate(paragraphs, 1):
                if len(paragraph) >= 20:  # Only process paragraphs with >= 20 characters
                    logger.info(f"Processing paragraph {paragraph_id} (length: {len(paragraph)})")

                    # Analyze with Comprehend
                    entities = detect_entities_safe(paragraph, LANG)
                    key_phrases = detect_key_phrases_safe(paragraph, LANG)
                    sentiment = safe_detect_sentiment(paragraph, LANG)

                    # Convert float values to Decimal for DynamoDB
                    entities = convert_floats_to_decimal(entities)
                    key_phrases = convert_floats_to_decimal(key_phrases)

                    # Save to DynamoDB
                    item = {
                        'doc_id': doc_id,
                        'paragraph_id': str(paragraph_id),  # Convert to string
                        'content': paragraph,
                        'entities': entities,
                        'key_phrases': key_phrases,
                        'sentiment': sentiment,
                        'created_at': datetime.utcnow().isoformat() + 'Z'
                    }

                    table.put_item(Item=item)
                    logger.info(f"Saved paragraph {paragraph_id} to DynamoDB")
                else:
                    logger.info(f"Skipping paragraph {paragraph_id} (too short: {len(paragraph)} chars)")

            logger.info(f"Successfully processed document: {doc_id}")

        except Exception as e:
            logger.error(f"Error processing record {record}: {str(e)}")
            # Continue processing other records
            continue

    return {
        'statusCode': 200,
        'body': json.dumps('Processing completed')
    }

def extract_text_from_s3(bucket: str, key: str) -> List[str]:
    """
    Extract text from S3 object using Textract synchronous API
    """
    try:
        response = textract.analyze_document(
            Document={
                'S3Object': {
                    'Bucket': bucket,
                    'Name': key
                }
            },
            FeatureTypes=['LAYOUT']
        )

        # Extract LINE blocks and sort by reading order
        lines = []
        line_blocks = [block for block in response['Blocks'] if block['BlockType'] == 'LINE']

        # Sort by reading order (top to bottom, left to right)
        line_blocks.sort(key=lambda x: (x['Geometry']['BoundingBox']['Top'], 
                                      x['Geometry']['BoundingBox']['Left']))

        for block in line_blocks:
            if 'Text' in block:
                lines.append(block['Text'])

        logger.info(f"Extracted {len(lines)} lines from document")
        return lines

    except Exception as e:
        logger.error(f"Error extracting text from {bucket}/{key}: {str(e)}")
        return []

def split_paragraphs(lines: List[str]) -> List[str]:
    """
    Split lines into paragraphs based on spacing and punctuation rules
    """
    if not lines:
        return []

    paragraphs = []
    current_paragraph = []

    for line in lines:
        line = line.strip()
        if not line:
            # Empty line - end current paragraph
            if current_paragraph:
                paragraphs.append(' '.join(current_paragraph))
                current_paragraph = []
        elif line.endswith('.') and len(line) > 1:
            # Line ends with period - add to current paragraph and end it
            current_paragraph.append(line)
            paragraphs.append(' '.join(current_paragraph))
            current_paragraph = []
        else:
            # Regular line - add to current paragraph
            current_paragraph.append(line)

    # Add final paragraph if exists
    if current_paragraph:
        paragraphs.append(' '.join(current_paragraph))

    return paragraphs

def detect_entities_safe(text: str, language_code: str) -> List[Dict[str, Any]]:
    """
    Safely detect entities using Comprehend with error handling
    """
    try:
        response = comprehend.detect_entities(
            Text=text,
            LanguageCode=language_code
        )
        return response['Entities']
    except Exception as e:
        logger.error(f"Error detecting entities: {str(e)}")
        return []

def detect_key_phrases_safe(text: str, language_code: str) -> List[Dict[str, Any]]:
    """
    Safely detect key phrases using Comprehend with error handling
    """
    try:
        response = comprehend.detect_key_phrases(
            Text=text,
            LanguageCode=language_code
        )
        return response['KeyPhrases']
    except Exception as e:
        logger.error(f"Error detecting key phrases: {str(e)}")
        return []

def safe_detect_sentiment(text: str, language_code: str) -> str:
    """
    Safely detect sentiment with chunking for large texts
    Comprehend has a 5000 byte limit for DetectSentiment
    """
    try:
        # Check if text is small enough for single call
        if len(text.encode('utf-8')) <= 4500:
            response = comprehend.detect_sentiment(
                Text=text,
                LanguageCode=language_code
            )
            return response['Sentiment']

        # For large texts, chunk and aggregate
        logger.info(f"Text too large for single sentiment call, chunking...")
        chunks = chunk_text(text, 4000)  # Leave some buffer
        sentiments = []

        for chunk in chunks:
            try:
                response = comprehend.detect_sentiment(
                    Text=chunk,
                    LanguageCode=language_code
                )
                sentiments.append(response['Sentiment'])
            except Exception as e:
                logger.error(f"Error detecting sentiment for chunk: {str(e)}")
                sentiments.append('UNKNOWN')

        # Aggregate sentiments (simple majority vote)
        return aggregate_sentiments(sentiments)

    except Exception as e:
        logger.error(f"Error detecting sentiment: {str(e)}")
        return 'UNKNOWN'

def chunk_text(text: str, max_bytes: int) -> List[str]:
    """
    Split text into chunks that don't exceed max_bytes
    """
    chunks = []
    current_chunk = ""

    for word in text.split():
        test_chunk = current_chunk + " " + word if current_chunk else word

        if len(test_chunk.encode('utf-8')) <= max_bytes:
            current_chunk = test_chunk
        else:
            if current_chunk:
                chunks.append(current_chunk)
            current_chunk = word

    if current_chunk:
        chunks.append(current_chunk)

    return chunks

def aggregate_sentiments(sentiments: List[str]) -> str:
    """
    Aggregate multiple sentiment results into a single sentiment
    """
    if not sentiments:
        return 'UNKNOWN'

    # Count sentiment occurrences
    sentiment_counts = {}
    for sentiment in sentiments:
        sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1

    # Return the most common sentiment
    return max(sentiment_counts, key=sentiment_counts.get)

def convert_floats_to_decimal(data: Any) -> Any:
    """
    Recursively convert float values to Decimal for DynamoDB compatibility
    """
    if isinstance(data, dict):
        return {key: convert_floats_to_decimal(value) for key, value in data.items()}
    elif isinstance(data, list):
        return [convert_floats_to_decimal(item) for item in data]
    elif isinstance(data, float):
        return Decimal(str(data))
    else:
        return data
Enter fullscreen mode Exit fullscreen mode

Step 7: Add S3 Trigger

  1. In Lambda function → "Add trigger"
  2. Select "S3"
  3. Configure:
    • Bucket: Select bucket created in step 2
    • Event type: All object create events
    • Prefix: (leave empty or add folder path)
    • Suffix: (leave empty)
  4. Check "Recursive invocation" if you want to process files in subfolders
  5. Click "Add"

Step 8: Test with Sample File

  1. Upload a test file to your S3 bucket:
    • Supported file types: JPG, PNG, PDF (1 page)
    • Size: < 10MB (Textract sync limit)
  2. Check CloudWatch Logs:
    • Lambda function → "Monitor""View CloudWatch logs"
    • Look for log group: /aws/lambda/SmartDocProcessor
  3. Check DynamoDB:
    • DynamoDB → Tables → SmartDocResults"Items"
    • View created items  ### Step 9: Verify Results

Sample DynamoDB Item

{
  "doc_id": "sample-document.pdf",
  "paragraph_id": "1",
  "content": "This is the first paragraph of the document with important information.",
  "entities": [
    {
      "Text": "sample-document.pdf",
      "Type": "OTHER",
      "Score": 0.99
    }
  ],
  "key_phrases": [
    {
      "Text": "important information",
      "Score": 0.99
    }
  ],
  "sentiment": "POSITIVE",
  "created_at": "2024-01-15T10:30:00.000Z"
}
Enter fullscreen mode Exit fullscreen mode

How to Verify

  1. CloudWatch Logs: View logs for debugging and monitoring processing
  2. DynamoDB Console: View items with complete information
  3. S3 Console: Confirm file was uploaded

Troubleshooting

Common Issues:

  1. Permission denied: Check IAM role has sufficient permissions
  2. Textract error: File too large or unsupported format
  3. DynamoDB error: Check table name and permissions
  4. Timeout: Increase memory or timeout for Lambda
  5. Comprehend error: Check text language and size limits

Debug Tips:

  • View CloudWatch Logs for specific errors
  • Verify IAM policy has correct placeholders
  • Confirm S3 bucket and DynamoDB table exist
  • Test Textract and Comprehend services separately

Next Steps & Advanced Features

1. Multi-page PDF Processing

  • Use Textract async API (StartDocumentAnalysis)
  • Combine with SNS for completion notifications
  • Modify Lambda to handle SNS events

2. Custom Comprehend Models

  • Create custom entity recognizers for specific domains
  • Use custom classification models
  • Improve analysis accuracy

3. Security Enhancements

  • Use KMS for DynamoDB data encryption
  • Enable S3 server-side encryption
  • Deploy Lambda in VPC for high security
  • Use IAM roles instead of access keys

4. Monitoring & Alerting

  • Set up CloudWatch alarms
  • Create dashboards to monitor pipeline
  • Use X-Ray for request tracing

Conclusion

You've successfully built an intelligent document processing pipeline that can automatically extract and analyze content from various document types. This solution provides:

  • Automated processing of uploaded documents
  • Intelligent text extraction using AWS Textract
  • Content analysis with entity recognition, key phrase extraction, and sentiment analysis
  • Structured data storage in DynamoDB for easy querying
  • Scalable architecture that can handle varying workloads

This pipeline can be extended for various use cases such as:

  • Document classification and routing
  • Content moderation
  • Data extraction for business intelligence
  • Automated compliance checking
  • Customer support ticket analysis

The combination of AWS services provides a robust, scalable solution that can grow with your needs while maintaining security and cost-effectiveness.

Top comments (0)