DEV Community

Cover image for AI Enrichment Pipeline: From Sample Classification to 100K-File Metadata Search with Bedrock and OpenSearch NextGen

AI Enrichment Pipeline: From Sample Classification to 100K-File Metadata Search with Bedrock and OpenSearch NextGen

Quick Recap: What We Built in Part 1

In Part 1, we built a metadata catalog on Apache Iceberg (S3 Tables) that makes unstructured files on FSx for ONTAP instantly searchable via Athena SQL — in under 2 seconds, at $5-15/month for 100K files, without bulk-copying raw files.

But basic metadata (file name, size, type) only gets you so far. What if you could ask: "Find all invoices from Q4" or "Show me files similar to this contract"?

That requires AI enrichment: automatically classifying files and generating vector embeddings for similarity search.

What We're Building

File on FSx for ONTAP
       │
       │ S3 Access Point (read)
       ▼
┌─────────────────────────────────────────┐
│  Bedrock Claude Vision                  │
│  "What is this file?"                   │
│  → classification: "invoice"            │
│  → confidence: 0.95                     │
│  → summary: "Invoice #INV-2024-..."     │
└──────────────────┬──────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────┐
│  Titan Embeddings V2                    │
│  "Represent this file as a vector"      │
│  → 1024-dimensional embedding           │
│  → normalized for cosine similarity     │
└──────────────────┬──────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────┐
│  S3 Tables (Iceberg)                    │
│  classification, confidence_score,      │
│  summary, embedding_vector              │
└──────────────────┬──────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────┐
│  OpenSearch Serverless NextGen          │
│  kNN vector search                      │
│  "Find files similar to X"              │
│  Scale-to-zero: $0 when idle            │
└─────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

AI Classification: Bedrock Claude Vision

How It Works

For image files (PNG, JPEG, TIFF), we send the file to Claude Vision with a simple prompt:

response = bedrock.invoke_model(
    modelId="anthropic.claude-3-haiku-20240307-v1:0",
    body=json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 512,
        "messages": [{"role": "user", "content": [
            {"type": "image", "source": {
                "type": "base64",
                "media_type": "image/png",
                "data": image_b64
            }},
            {"type": "text", "text": 
                'Classify this image. Respond JSON only: '
                '{"classification":"...","confidence_score":0.X,"summary":"..."}'}
        ]}]
    })
)
Enter fullscreen mode Exit fullscreen mode

Results (Measured)

File Classification Confidence Latency Cost
invoice_sample.png Invoice 0.95 ~3s $0.01
product_inspection.png Pie Chart 1.0 ~2s $0.01
sensor_dashboard.png IoT Sensor Dashboard 0.9 ~3s $0.01

Key insight: In this demo, Claude 3 Haiku classified sample images in ~2-3 seconds at roughly $0.01/image. Production accuracy and cost depend on image size, prompt length, model version, and document type.

Model version note: Model ID anthropic.claude-3-haiku-20240307-v1:0 was used at time of testing. Check Bedrock model IDs for the latest available version.

For Non-Image Files

File Type Enrichment Strategy Cost
PDF Extract text → summarize with Claude $0.01-0.05
CSV/Parquet Schema extraction + row count ~$0 (metadata only)
Audio Transcribe → summarize $0.02-0.10
Video Frame sampling → Vision $0.05-0.50
CAD/3D Metadata extraction only ~$0

Vector Embeddings: Titan Embeddings V2

Every file gets a 1024-dimensional vector embedding based on its content or AI-generated description:

response = bedrock.invoke_model(
    modelId="amazon.titan-embed-text-v2:0",
    body=json.dumps({
        "inputText": summary_text,  # AI-generated description
        "dimensions": 1024,
        "normalize": True
    })
)
embedding = json.loads(response["body"].read())["embedding"]
# → [0.023, -0.041, 0.089, ...] (1024 floats)
Enter fullscreen mode Exit fullscreen mode

Why 1024 Dimensions?

Dimensions Cost Accuracy Storage Use Case
256 Lowest Good 1KB/file High-volume, cost-sensitive
512 Low Better 2KB/file General purpose
1024 Medium High 4KB/file Recommended balance
1536 Higher Highest 6KB/file Maximum precision

1024 dimensions was a practical default for this PoC. Validate 256/512/1024/1536 dimensions against your own top-k relevance and storage/cost targets (~4KB per file × 100K files = 400MB total at 1024-dim).

Pricing note: Titan Embeddings V2 charges per 1K input tokens ($0.00002). The cost is the same whether you request 256, 512, or 1024 dimensions — so there's no cost penalty for choosing higher dimensions.

Embedding Storage in Iceberg

Embeddings are stored as binary type in the Iceberg table:

import struct

# Convert float list to binary for Iceberg storage
embedding_bytes = struct.pack(f"{len(embedding)}f", *embedding)

# Write to Iceberg table
arrow_table = pa.table({
    "file_id": [file_id],
    "embedding_vector": [embedding_bytes],
    "enrichment_status": ["completed"],
    ...
})
table.append(arrow_table)
Enter fullscreen mode Exit fullscreen mode

Important: Append-Only Writes and Deduplication

Iceberg on S3 Tables uses append-only writes. If you enrich the same file twice (e.g., after a retry), you'll get duplicate records. Use this dedup pattern in queries:

WITH ranked AS (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY file_id ORDER BY modified_at DESC) as rn
  FROM "s3tablescatalog/fsxn-metadata-catalog"."metadata"."unstructured_files"
)
SELECT * FROM ranked WHERE rn = 1 AND is_deleted = false;
Enter fullscreen mode Exit fullscreen mode

S3 Tables auto-compaction handles the storage overhead of duplicates over time.

Vector Search: OpenSearch Serverless NextGen

The Scale-to-Zero Revolution

Before May 2026, OpenSearch Serverless had a minimum cost of ~$350/month (2 OCUs always running). This made it impractical for PoC and dev environments.

OpenSearch Serverless NextGen (GA May 2026) introduces scale-to-zero:

State Cost Latency
Idle (no queries) $0/month
Cold start (first query) $0.24/OCU-hour 10-30 seconds
Warm (subsequent queries) $0.24/OCU-hour ~54ms

This changes the economics completely: you can keep vector search compute cost near zero until you actually need it.

kNN Search Implementation

from opensearchpy import OpenSearch
from requests_aws4auth import AWS4Auth

# Generate query embedding
query_embedding = get_embedding("find invoice or payment documents")

# kNN search
results = client.search(index="fsxn-metadata-embeddings", body={
    "size": 5,
    "query": {
        "knn": {
            "embedding_vector": {
                "vector": query_embedding,
                "k": 5
            }
        }
    }
})
Enter fullscreen mode Exit fullscreen mode

Note: Vector search requires OpenSearch — you cannot perform kNN queries directly on the embedding_vector binary column in Athena. The Iceberg table stores embeddings for durability; OpenSearch provides the search interface.

Search Results (Measured)

Query: "find invoice or payment documents"

Results:
  1. invoice_sample.png (score: 0.6749)
     Classification: Invoice
     Summary: "Invoice #INV-2024-..."

  2. (other similar files ranked by cosine similarity)
Enter fullscreen mode Exit fullscreen mode

Score interpretation:

  • 0.9+: Near-identical content
  • 0.7-0.9: Highly similar
  • 0.5-0.7: Related topic
  • < 0.5: Weak or no relation

Our score of 0.67 for "invoice or payment documents" → invoice_sample.png is reasonable — the query is broad, and the match is correct.

Improving search scores: Use more specific queries ("Q4 2024 invoice from vendor ABC" vs "find invoices"), enrich files with more detailed summaries, or increase embedding dimensions to 1536 for higher precision at ~50% more storage cost.

These score bands are demo heuristics, not universal thresholds. Calibrate thresholds with labeled examples for each document type and business workflow.

The Complete Pipeline

Processing Flow

New file detected (FPolicy event or batch scan)
       │
       ▼
┌─ Is it an image? ──────────────────────────┐
│  YES → Claude Vision classification        │
│  NO  → Metadata-only (file type, size)     │
└────────────────────────────────────────────┘
       │
       ▼
┌─ Generate embedding ──────────────────────┐
│  Input: classification + summary text     │
│  Output: 1024-dim normalized vector       │
└───────────────────────────────────────────┘
       │
       ▼
┌─ Write to S3 Tables (Iceberg) ────────────┐
│  classification, confidence_score,        │
│  summary, embedding_vector,               │
│  enrichment_status = "completed"          │
│  index_status = "pending"                 │
└───────────────────────────────────────────┘
       │
       ▼
┌─ Index in OpenSearch ─────────────────────┐
│  file_id, embedding_vector, metadata      │
│  (for kNN similarity search)              │
│  index_status = "indexed" / "stale" / "failed" │
└───────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Error Handling

Error Strategy Result
Bedrock ThrottlingException Exponential backoff (1s, 2s, 4s) Retry up to 3 times
Bedrock ModelNotReadyException Wait 5s, retry Model warming up (first invocation)
File read failure (S3 AP) Mark as failed, retry next cycle No data loss
Low confidence (< 0.3) Mark as low_confidence Human review queue
Lambda timeout (large files) Fallback to ECS Fargate No timeout limit

Monitoring the Pipeline

How do you know when something goes wrong? Set up these CloudWatch alarms:

Metric Source Alert Condition Action
DLQ message count CloudWatch (SQS) > 0 Inspect DLQ messages, redrive
Lambda error rate CloudWatch (Lambda) > 5% for 5 min Check logs, Iceberg commit conflict?
Bedrock throttling CloudWatch (Bedrock) > 10/min Reduce request rate, adjust backoff
Enrichment backlog Athena query (pending count) > 1000 Increase Lambda concurrency or batch size
OpenSearch index size OpenSearch metrics > 80% capacity Add shards or rotate index
# Quick health check: DLQ + Lambda errors in one command
aws cloudwatch get-metric-data --metric-data-queries '[
  {"Id":"dlq","MetricStat":{"Metric":{"Namespace":"AWS/SQS","MetricName":"ApproximateNumberOfMessagesVisible","Dimensions":[{"Name":"QueueName","Value":"fsxn-metadata-sync-dlq"}]},"Period":300,"Stat":"Sum"}},
  {"Id":"errors","MetricStat":{"Metric":{"Namespace":"AWS/Lambda","MetricName":"Errors","Dimensions":[{"Name":"FunctionName","Value":"fsxn-metadata-sync"}]},"Period":300,"Stat":"Sum"}}
]' --start-time $(date -u -v-1H +%Y-%m-%dT%H:%M:%S) --end-time $(date -u +%Y-%m-%dT%H:%M:%S)
Enter fullscreen mode Exit fullscreen mode

For detailed operational monitoring guidance, see the Operational Monitoring section in the architecture document.

Cost at Scale

Volume AI Cost Embedding Cost OpenSearch Total
100 files/day $1/day $0.002/day $0 (idle) ~$30/month
1,000 files/day $10/day $0.02/day ~$42/month ~$342/month
10,000 files/day $100/day $0.20/day ~$84/month ~$3,084/month

At 10K files/day, consider batch processing during off-hours and Provisioned Throughput for Bedrock to reduce per-request cost.

Cost optimization tip: Not all files need AI enrichment. A common pattern: images → Vision classification, documents → text extraction + embedding, data files (CSV/Parquet) → metadata only (no AI cost). This can reduce AI costs by 60-80% depending on your file mix.

Batch Inference: For initial bulk enrichment (10K+ files), Bedrock Batch Inference can reduce costs by ~50% compared to real-time invocations. Use real-time for incremental new files, batch for backfill.

# Batch Inference example — submit a batch job for bulk classification
import boto3, json

bedrock = boto3.client("bedrock", region_name="ap-northeast-1")

# 1. Prepare input JSONL file in S3 (one request per line)
# Each line: {"recordId":"file-001","modelInput":{"anthropic_version":"bedrock-2023-05-31",...}}

# 2. Create batch inference job
response = bedrock.create_model_invocation_job(
    jobName="metadata-enrichment-backfill",
    modelId="anthropic.claude-3-haiku-20240307-v1:0",
    roleArn="arn:aws:iam::<ACCOUNT>:role/BedrockBatchRole",
    inputDataConfig={
        "s3InputDataConfig": {
            "s3Uri": "s3://my-bucket/batch-input/enrichment-requests.jsonl"
        }
    },
    outputDataConfig={
        "s3OutputDataConfig": {
            "s3Uri": "s3://my-bucket/batch-output/"
        }
    }
)
# Job runs asynchronously — results written to S3 when complete
# Typical processing: 10K files in ~30 minutes at ~50% cost reduction
Enter fullscreen mode Exit fullscreen mode

The batch input JSONL contains prompts, file references, or extracted/redacted text depending on your design. It does not require copying the original raw files from FSx for ONTAP to S3. If images are included as base64, treat the JSONL as temporary processing data.

Batch job monitoring: Use EventBridge rules to detect Bedrock batch job state changes (COMPLETED, FAILED). Route to SNS → Lambda to automatically write results back to S3 Tables.

Prompt Caching: If using the same system prompt across all classifications (recommended), Bedrock's Prompt Caching feature can reduce input token costs by up to 90% for repeated prompts.

EMR Spark for large-scale backfill: For initial backfill or re-enrichment of 100K+ files, Spark on EMR Serverless or EMR on EC2 can be used as an alternative to Lambda/Fargate. EMR 7.13.0+ supports Glue as an Iceberg REST catalog, enabling distributed metadata writes with Lake Formation governance. Verified 2026-06-02: SELECT, COUNT, and time travel all work on EMR Serverless 7.13.0. Use Lambda for incremental (event-driven) processing and Spark for bulk operations.

Search Index Consistency

OpenSearch is a derived index, not the system of record. S3 Tables / Iceberg remains the metadata source of truth.

Recommended controls:

  • Store iceberg_snapshot_id in OpenSearch documents for traceability
  • Store embedding_model_id and prompt_version in both Iceberg and OpenSearch
  • Reconcile OpenSearch index against latest Iceberg view periodically
  • Mark index_status: pending / indexed / stale / failed
  • If search returns a stale result, fall back to Athena query on the base table

FPolicy Event Design

For incremental metadata sync via ONTAP FPolicy:

  • Use batch scan for initial backfill (not FPolicy)
  • Use FPolicy only for incremental changes after initial catalog population
  • Prefer create / close-with-modification / rename / delete events
  • Avoid read / open events (excessive volume, no catalog value)
  • Apply path and extension filters to reduce event noise
  • Add backpressure via SQS batching (not fan-out Lambda per event)

FPolicy can significantly impact file system throughput if configured too broadly. Filter to only the operations and paths that matter for catalog updates.

Hybrid Search Pattern

For production discovery, vector search should be combined with lexical filters and keyword search:

  • Lexical search: file_name, path, classification, summary, tags
  • Vector search: embedding similarity (kNN)
  • Filters: tenant_id, sensitivity_level, file_type, path_classification, last_modified

OpenSearch supports both search and vector collection types. Use a single index with both text fields and vector fields for hybrid queries. S3 Tables / Iceberg remains the metadata source of truth; OpenSearch is the serving index.

For sensitive workloads, use VPC interface endpoints for Bedrock Runtime and S3 VPC endpoints for batch input/output. See genai/bedrock-private-connectivity.md.

Storage Tier Impact During Backfill

Initial AI enrichment may read cold files from capacity pool storage, causing higher latency and throughput consumption.

Recommended controls:

  • Run backfill during off-hours (minimize impact on production NFS/SMB)
  • Limit Lambda concurrency during backfill
  • Enrich only selected file types first (images → documents → data files)
  • Monitor FSx capacity pool read activity via CloudWatch
  • Separate backfill cost from steady-state cost in planning

Backfill vs Incremental Cost Model

Separate cost planning for:

Phase Scope Cost driver Optimization
Initial backfill All existing files (e.g., 100K) Bedrock AI at scale Batch Inference (~50% savings)
Daily incremental New/modified files (e.g., 1000/day) Real-time Lambda + Bedrock Selective enrichment by file type
Re-enrichment After prompt/model change Full re-scan of enriched files Batch + compare confidence delta
OpenSearch reindex After schema/embedding change Index rebuild Off-hours, parallel shards

The largest cost spike is typically the initial backfill, not steady-state. Plan Bedrock Batch Inference and off-peak scheduling for the first catalog population.

For adjustable assumptions, see verification-evidence/cost-assumptions.yaml.

Try It Yourself

# Enrich pending files with AI
python3 demo/scripts/demo-enrich.py \
  --table-bucket-arn <TABLE_BUCKET_ARN> \
  --ap-alias <AP_ALIAS> \
  --max-files 10

# Search by natural language
python3 demo/scripts/demo-search.py \
  --query "find documents about contracts or agreements"
Enter fullscreen mode Exit fullscreen mode

AI Safety and Human Review Boundary

AI enrichment should not be treated as authoritative classification for regulated data without human review.

For regulated industries: AI enrichment is assistive metadata generation, not authoritative classification. Final regulatory classification must be confirmed by data owners, security, legal, and compliance teams. This system provides AI-generated signals to accelerate human review — it does not replace it.

Deterministic vs AI boundary: AI generates classifications and summaries, but pipeline state transitions, retry logic, deduplication, access controls, and audit evidence are deterministic and version-controlled. The deterministic pipeline guarantees reproducibility; AI provides enrichment quality.

Recommended controls:

  • Human review queue for low-confidence classifications (< 0.7)
  • Sampling review for high-confidence results (periodic spot-check)
  • False negative testing for PII detection
  • Model/prompt version recorded in metadata (enriched_at + model ID)
  • Re-enrichment policy when model or prompt changes

Recommended metadata columns for AI lineage:

  • classification_model_id — which model produced the classification
  • embedding_model_id — which model produced the embedding
  • prompt_version — version of the classification prompt
  • enrichment_code_version — version of the enrichment Lambda/script
  • enriched_at — timestamp of enrichment
  • human_review_status — pending / approved / rejected
  • human_reviewed_by — reviewer identity (if applicable)
  • human_reviewed_at — review timestamp

Evaluation Plan

For production use, do not rely only on model-reported confidence. Create a labeled validation set and measure:

  • Classification accuracy (overall and per document type)
  • Precision / recall per category
  • False positive rate for PII detection
  • False negative rate for PII detection
  • Embedding search top-k relevance (nDCG@5, MRR)
  • Human review acceptance rate
  • Cost per accepted classification

Business acceptance metrics (beyond model accuracy):

  • Time saved per analyst for file discovery
  • Dataset discovery lead-time reduction (days → hours target)
  • Business owner approval rate for AI classifications
  • Cost per useful search result
  • False negative risk by document category (which misses matter most?)
  • Governance coverage (% of assets searchable in BI/AI tools)

The 7/7 PII detection result was measured on a controlled synthetic sample. Production use requires evaluation with domain-specific documents, false-positive/false-negative review, human approval workflow, and legal/compliance sign-off.

Snowflake users: Snowflake can now directly query S3 Tables Iceberg metadata via Glue REST + VENDED_CREDENTIALS (verified 2026-06-05). Additionally, you can sync redacted metadata into Snowflake-managed tables for Cortex Search / Snowflake Intelligence business-facing discovery. In this PoC, OpenSearch remains the AWS-native vector search component.

What's Next

In Part 3, we'll cover:

  • Lake Formation governance: Column-level access control on metadata
  • PII detection and anonymization: Comprehend (English) + Bedrock Claude (Japanese)
  • Cross-platform access: What works and what doesn't with Databricks and Snowflake
  • Data Clean Room pattern: Separate tables for sensitive vs. anonymized metadata

Full code: github.com/Yoshiki0705/fsxn-lakehouse-integrations

Top comments (0)