DEV Community

Cover image for I Built an ETL Pipeline That Actually Thinks & And Cut Token Costs by 52% (And Here's What I Learned)
Seenivasa Ramadurai
Seenivasa Ramadurai

Posted on

I Built an ETL Pipeline That Actually Thinks & And Cut Token Costs by 52% (And Here's What I Learned)

LangGraph + LLM + TOON: What happens when you give your data pipeline a brain and teach it to be efficient?

Look, I'll be honest with you. I got a little carried away last weekend. You know that feeling when you discover a shiny new technology and think "I wonder if I can use this for everything?" Yeah, that happened to me with LangGraph, GPT-4, and a token optimization format called TOON.

I built an ETL pipeline that uses AI to make decisions. Not just running predefined rules, but actually thinking about the data flowing through it. And then I made it 52% cheaper by using TOON instead of JSON.

Let me walk you through what I built, what worked surprisingly well, and where I probably went a bit too far.

The Problem with Traditional ETL Pipelines
We've all been there. You write an ETL pipeline that looks something like this:

if customer_field is None:
    raise ValidationError("Missing customer name")

if amount > 3000:
    segment = "premium"
elif amount > 1000:
    segment = "standard"
else:
    segment = "basic"

# And then it gets worse...
if email.endswith("@techcorp.com"):
    industry = "Technology"
elif email.endswith("@bank.com"):
    industry = "Finance"
# ... 47 more conditions
Enter fullscreen mode Exit fullscreen mode

It works fine... until it doesn't. Then you're stuck adding endless if statements for every edge case. Your "simple" pipeline becomes a 2,000 line monster of business logic that nobody wants to touch.

What If Your Pipeline Could Actually Understand Context?

Here's where things get interesting. Instead of hardcoding "if email contains @techcorp.com then industry = Technology," what if the AI could look at the email domain, the company name, the transaction patterns, and infer what industry they're in?

That's exactly what I built. And then I made it cost-efficient with TOON.

Wait, What's TOON?

Before I show you the pipeline, let me explain the secret sauce that made this affordable.

TOON (Token Oriented Object Notation) is a compact data format that reduces LLM token usage by 50%+. Here's the magic:

JSON format (3,780 chars):

{
  "records": [
    {"id": 1, "customer": "Krishna K", "amount": 2450.50, "email": "krishna@techcorp.com"},
    {"id": 2, "customer": "Navin T", "amount": 1875.25, "email": "navin@retail.net"},
    {"id": 3, "customer": "Aaron B", "amount": 3200.00, "email": "aaron@global.io"}
  ]
}
Enter fullscreen mode Exit fullscreen mode

TOON format (1,893 chars - 50% smaller!):

records[3]{id,customer,amount,email}:
1,Krishna K,2450.5,krishna@techcorp.com
2,Navin T,1875.25,navin@retail.net
3,Aaron B,3200.0,aaron@global.io
Same data. Half the tokens. Half the cost. The LLM understands both just fine.

The Architecture: Where AI Actually Adds Value
My pipeline has six stages, and here's where I let the AI do its thing:

1. Extract Node (Traditional-No AI)

Just pulls 100 customer records from a JSON file. Nothing fancy here. Sometimes boring is good.

2. Validate Node (🤖 AI-Powered + TOON)

Instead of writing regex patterns and null checks, I send batches of records to GPT-4o-mini with a simple prompt: "Hey, look at these records and tell me what's wrong with them."

💰 TOON saves 50% on every batch sent to the LLM

batch_data = toon_encode({"records": batch})

JSON: 3,780 chars → TOON: 1,893 chars

validation_prompt = f"""Analyze these customer records for quality issues.
Records (TOON format):
{batch_data}
"""
Enter fullscreen mode Exit fullscreen mode

The AI catches things I didn't expect:

"This looks like test data with dummy values"
"Email format is invalid missing domain"
"Amount is suspiciously round might be placeholder data"

3. Transform Node (🤖 AI-Powered The Star of the Show)

This is where the magic happens. For each customer record, the AI:

Infers industry from email domains: Sees @fintech-startup.io and understands it's a technology/finance startup

Estimates company size: Recognizes patterns like @freelance.me (freelancer) vs @globalcorp.com (enterprise)
Scores risk intelligently: Looks at purchase patterns, company size, and transaction amounts together

Generates business insights: "High-value enterprise client, strong upsell potential"
The AI isn't just following rules it's using contextual understanding. That's genuinely useful.

4. Load Node (Traditional-No AI)

Writes the data to the destination. Again, boring is beautiful here.

5. Monitor Node (🤖 AI-Powered + TOON)

After processing, the AI analyzes the entire pipeline run and tells me:

Data quality score (0-100)
Top issues it found
Actionable recommendations
Business insights from the data
It's like having a data analyst review your pipeline execution every single time.

6. Report Node (Traditional)

Generates JSON and text reports. No AI needed.

The Visual Flow

Real Results That Made Me Go "Whoa"
When I ran this on 100 customer records, here's what the AI figured out without any hardcoded rules:

🧑 Krishna K
💰 Amount: $2,450.50
📊 Segment: standard
🏢 Industry: Technology ← AI inferred from @techcorp.com
📈 Company Size: mid-market ← AI analyzed domain pattern
⚠️ Risk: low
🤖 AI Insight: "Repeat enterprise buyer, upsell potential"

🧑 Navin T
💰 Amount: $4,500.00
📊 Segment: premium
🏢 Industry: Finance ← AI understood fintech-startup.io
📈 Company Size: startup
⚠️ Risk: medium
🤖 AI Insight: "Fintech founder, high growth potential"

🧑 Aaron B
💰 Amount: $3,200.00
📊 Segment: premium
🏢 Industry: Consulting ← From consulting.biz domain
📈 Company Size: enterprise
⚠️ Risk: low
🤖 AI Insight: "Enterprise client, strong retention"
The AI connected dots that would've taken me dozens of if-statements to code.

The TOON Savings: Real Numbers
Here's what my pipeline printed at the end:

💰 TOON TOKEN SAVINGS SUMMARY

📊 JSON format would use: 21,230 characters
✨ TOON format used: 10,179 characters
💰 Characters saved: 11,051 (52.1% reduction)
🎯 Estimated token savings: ~2,762 tokens

🚀 At 10,000 runs/day: ~$1,512/year saved!

52% reduction. Just by changing the data format. The LLM still understands it perfectly.

The Honest Truth: When This Actually Makes Sense
Okay, real talk time. After building this, here's my brutally honest assessment:

The Cost Reality Check

Let me show you the math that made me reconsider everything:

Without TOON:

100 records × 3 LLM calls × ~1000 tokens = 300,000 tokens
GPT-4o-mini: ~$0.15 per 1M input tokens
Total: ~$0.05 per run

With TOON (52% reduction):

100 records × 3 LLM calls × ~500 tokens = 150,000 tokens
Total: ~$0.02 per run
TOON saves 60% on token costs!

At Scale:

What I'd Do Differently: The Hybrid Approach

If I were building this for production, here's what I'd change:

def smart_validate(records):
    # Use CODE for simple, fast checks (FREE!)
    basic_errors = []
    for record in records:
        if not record.get('amount'):
            basic_errors.append("Missing amount")
        if not record.get('email') or '@' not in record['email']:
            basic_errors.append("Invalid email")

    # Only use AI for complex analysis
    if basic_errors:
        # ONE AI call with TOON to understand patterns
        errors_toon = toon_encode({"errors": basic_errors[:10]})
        ai_analysis = llm.invoke(
            f"Analyze these validation errors and suggest root causes:\n{errors_toon}"
        )
        return ai_analysis
Enter fullscreen mode Exit fullscreen mode

The hybrid principle:

  • Use TOON format - 50%+ token reduction, instant savings
  • Use code for simple checks - Free, fast, deterministic
  • Use AI for complex analysis - Where it adds real value
  • Batch AI calls aggressively - 1 call vs 100 calls
  • Cache results Don't re-process same patterns
  • The Comparison: Traditional vs LLM vs Hybrid

Why This Is Still Worth Building?

Despite the limitations, here's why I'm glad I built this:

Proof of concept - It shows what's possible
Educational - LangGraph's framework is excellent for learning agent orchestration
Small-scale production - Perfect for internal tools processing hundreds of records
Data exploration - Amazing for understanding messy datasets
Cost-optimized with TOON - 52% cheaper than naive implementation

The Setup (If You Want to Try It)

pip install langgraph langchain-openai python-dotenv toon-python
export OPENAI_API_KEY='your-key-here'

Start simple:

  1. Use AI for one enrichment step (like industry classification)
  2. Add TOON to cut costs in half
  3. Measure the cost and latency
  4. Decide if the value justifies the expense
  5. Scale from there

The Bigger Picture

The ETL landscape is being transformed by AI and multi-agent systems. Companies like LinkedIn, Elastic, and Replit are using LangGraph for production agents in 2024—but they're using it strategically, for specific problems where AI adds real value.

The future isn't "AI-powered ETL" or "traditional ETL." It's knowing when to use each.

Your data pipeline doesn't need to think about everything. It just needs to think about the right things.

And when it does think, make sure it's doing so efficiently. That's where TOON comes in.

Setup and Initialization

"""
Author: Sreeni Ramadurai
Date: DEC-13-
"""

from typing import TypedDict, List, Dict, Any
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from datetime import datetime
from dotenv import load_dotenv
import json
import os

# TOON - Token Oriented Object Notation for 45-70% token reduction
try:
    from toon_python import encode as toon_encode
    TOON_AVAILABLE = True
    print("✅ TOON (Token Oriented Object Notation) loaded - 45-70% token savings!")
except ImportError:
    TOON_AVAILABLE = False
    print("⚠️  TOON not available. Install with: pip install toon-python")
    def toon_encode(data):
        return json.dumps(data, indent=2)  # Fallback to JSON

# Load environment variables from TOON .env file
load_dotenv("/Users/sreenir/TOON/.env", override=True)

# Get API key and strip any quotes
api_key = os.getenv("OPENAI_API_KEY", "")
if api_key:
    api_key = api_key.strip('"').strip("'")

if not api_key:
    print("⚠️  No OPENAI_API_KEY found. Set it with: export OPENAI_API_KEY='your-key'")
    print("   Or run: OPENAI_API_KEY='your-key' python etl_pipeline.py")
    llm = None
else:
    print(f"✅ OpenAI API key loaded successfully")
    # Initialize the LLM (OpenAI GPT-4)
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, api_key=api_key)

# Define the pipeline state - this tracks data through all stages
class ETLState(TypedDict):
    raw_data: List[Dict[str, Any]]
    validated_data: List[Dict[str, Any]]
    transformed_data: List[Dict[str, Any]]
    loaded_count: int
    validation_errors: List[str]
    transformation_log: List[str]
    pipeline_status: str
    execution_time: Dict[str, float]
    ai_analysis: Dict[str, Any]  # LLM-generated insights and recommendations
    report_path: str  # Path to generated ETL report
    toon_savings: Dict[str, int]  # Track TOON token savings

# Global tracker for TOON savings
toon_savings_tracker = {"json_chars": 0, "toon_chars": 0}

# ============== ETL NODES ==============

def extract_node(state: ETLState) -> ETLState:
    """
    EXTRACT: Reads data from JSON file containing customer records
    In production, this would connect to real data sources (APIs, databases, etc.)
    """
    print("📥 EXTRACTING data from source...")

    # Get the directory where this script is located
    script_dir = os.path.dirname(os.path.abspath(__file__))
    data_file = os.path.join(script_dir, "customers.json")

    # Load customer data from JSON file
    with open(data_file, "r") as f:
        raw_data = json.load(f)

    print(f"✅ Extracted {len(raw_data)} records from customers.json")

    return {
        "raw_data": raw_data,
        "execution_time": {"extract": 0.5}
    }

def validate_node(state: ETLState) -> ETLState:
    """
    VALIDATE: Uses LLM for intelligent data quality validation
    The LLM analyzes each record and identifies quality issues
    """
    print("\n🔍 VALIDATING data quality with LLM intelligence...")

    validated_data = []
    validation_errors = []
    raw_data = state["raw_data"]

    # Check if LLM is available
    if llm is None:
        print("⚠️  LLM not available - using basic validation")
        # Fallback to basic validation
        for record in raw_data:
            errors = []
            if not record.get("customer"):
                errors.append(f"Record {record['id']}: Missing customer name")
            if record.get("amount") is None:
                errors.append(f"Record {record['id']}: Missing amount")
            elif record.get("amount", 0) < 0:
                errors.append(f"Record {record['id']}: Negative amount")
            if errors:
                validation_errors.extend(errors)
            else:
                validated_data.append(record)
    else:
        # 🤖 LLM-POWERED VALIDATION
        batch_size = 20
        print(f"   📝 Validating {len(raw_data)} records using AI...")

        for i in range(0, len(raw_data), batch_size):
            batch = raw_data[i:i + batch_size]

            # 🤖 LLM PROMPT FOR INTELLIGENT VALIDATION
            # Use TOON format for 45-70% token reduction!
            batch_data = toon_encode({"records": batch}) if TOON_AVAILABLE else json.dumps(batch, indent=2)
            json_size = len(json.dumps(batch, indent=2))
            toon_size = len(batch_data)
            toon_savings_tracker["json_chars"] += json_size
            toon_savings_tracker["toon_chars"] += toon_size
            if TOON_AVAILABLE:
                print(f"      💰 TOON saved {json_size - toon_size} chars ({100*(1-toon_size/json_size):.0f}% reduction)")

            validation_prompt = f"""You are a data quality validator. Analyze these customer records and identify data quality issues.

Records to validate (TOON format - compact notation):
{batch_data}

For each record, check:
1. Is "customer" name present and valid (not empty, not just spaces)?
2. Is "amount" present and a valid positive number?
3. Is "email" a valid email format (contains @ and domain)?
4. Is "date" a valid date in YYYY-MM-DD format?
5. Are there any suspicious patterns (like test data, dummy values)?

Respond with a JSON object containing:
{{
    "valid_ids": [list of record IDs that passed all validations],
    "errors": [
        {{"id": record_id, "error": "description of the issue"}},
        ...
    ]
}}

Be strict but fair. Only flag real issues."""

            messages = [
                SystemMessage(content="You are a strict data quality validator. Identify all data quality issues. Respond with valid JSON only."),
                HumanMessage(content=validation_prompt)
            ]

            try:
                # 🤖 ACTUAL LLM CALL FOR VALIDATION
                response = llm.invoke(messages)

                # Parse JSON from response
                content = response.content.strip()
                if content.startswith("```

"):
                    content = content.split("

```")[1]
                    if content.startswith("json"):
                        content = content[4:]
                content = content.strip()

                validation_result = json.loads(content)
                valid_ids = set(validation_result.get("valid_ids", []))

                # Process validation results
                for record in batch:
                    if record["id"] in valid_ids:
                        validated_data.append(record)

                # Collect errors
                for error in validation_result.get("errors", []):
                    validation_errors.append(f"Record {error['id']}: {error['error']}")

                print(f"   🤖 AI validated batch {i//batch_size + 1}: {len(valid_ids)} valid, {len(validation_result.get('errors', []))} errors")

            except Exception as e:
                print(f"⚠️  LLM validation failed for batch: {e}")
                # Fallback: add all records from failed batch
                for record in batch:
                    if record.get("customer") and record.get("amount") is not None and record.get("amount", 0) >= 0:
                        validated_data.append(record)
                    else:
                        validation_errors.append(f"Record {record['id']}: Failed validation (fallback)")

    print(f"✅ Validated {len(validated_data)} records")
    print(f"⚠️  Found {len(validation_errors)} validation errors")

    return {
        "validated_data": validated_data,
        "validation_errors": validation_errors,
        "execution_time": {**state["execution_time"], "validate": 1.5}
    }

def transform_node(state: ETLState) -> ETLState:
    """
    TRANSFORM: Uses LLM for intelligent data enrichment and categorization
    The LLM analyzes customer data and provides smart insights
    """
    print("\n🔄 TRANSFORMING data with LLM intelligence...")

    transformed_data = []
    transformation_log = []

    # Check if LLM is available
    if llm is None:
        print("⚠️  LLM not available - using rule-based transformation")

    # Process only first 10 records for faster demo (remove limit for production)
    batch_size = 10
    validated_data = state["validated_data"][:10]  # Limit to 10 for fast demo
    print(f"   📝 Processing {len(validated_data)} records (limited for demo speed)")

    for i in range(0, len(validated_data), batch_size):
        batch = validated_data[i:i + batch_size]
        enrichment_map = {}

        # Only call LLM if available
        if llm is not None:
            # Use TOON format for 45-70% token reduction!
            batch_data = toon_encode({"records": batch}) if TOON_AVAILABLE else json.dumps(batch, indent=2)
            json_size = len(json.dumps(batch, indent=2))
            toon_size = len(batch_data)
            toon_savings_tracker["json_chars"] += json_size
            toon_savings_tracker["toon_chars"] += toon_size
            if TOON_AVAILABLE:
                print(f"      💰 TOON saved {json_size - toon_size} chars ({100*(1-toon_size/json_size):.0f}% reduction)")

            # Use LLM to analyze and enrich the batch
            prompt = f"""Analyze these customer order records and provide enrichment data.
For each record, determine:
1. customer_segment: "premium" (amount > 3000), "standard" (1000-3000), "basic" (< 1000)
2. industry: Based on the email domain and notes, classify the industry (e.g., "Technology", "Healthcare", "Finance", "Retail", "Manufacturing", "Services", etc.)
3. company_size: Based on email domain, estimate company size ("enterprise", "mid-market", "small-business", "freelancer")
4. risk_score: "low", "medium", or "high" based on order patterns and notes

Records to analyze (TOON format - compact notation):
{batch_data}

Respond ONLY with a JSON array containing objects with these fields for each record:
- id: the record id
- customer_segment: string
- industry: string (the industry classification)
- company_size: string  
- risk_score: string
- insight: a brief 5-10 word business insight about this customer

Example response format:
[{{"id": 1, "customer_segment": "premium", "industry": "Technology", "company_size": "enterprise", "risk_score": "low", "insight": "High-value enterprise client, upsell potential"}}]
"""

            messages = [
                SystemMessage(content="You are a data enrichment AI. Analyze customer data and provide structured insights. Always respond with valid JSON only."),
                HumanMessage(content=prompt)
            ]

        try:
            response = llm.invoke(messages)
            # Extract JSON from response (handle markdown code blocks)
            content = response.content.strip()
            if content.startswith("```

"):
                content = content.split("

```")[1]
                if content.startswith("json"):
                    content = content[4:]
            content = content.strip()
            llm_enrichments = json.loads(content)
            enrichment_map = {e["id"]: e for e in llm_enrichments}
            print(f"   🤖 AI enriched batch {i//batch_size + 1}")
        except Exception as e:
            print(f"⚠️  LLM enrichment failed for batch: {e}")

        # Merge LLM insights with transformed data
        for record in batch:
            enrichment = enrichment_map.get(record["id"], {})

            transformed_record = {
                "order_id": record["id"],
                "customer_name": record["customer"].strip().title(),
                "order_amount": round(float(record["amount"]), 2),
                "order_date": record["date"],
                "customer_email": record["email"].lower(),
                "processed_at": datetime.now().isoformat(),
                # LLM-powered enrichments
                "customer_segment": enrichment.get("customer_segment", "unknown"),
                "industry": enrichment.get("industry", "Unknown"),
                "company_size": enrichment.get("company_size", "unknown"),
                "risk_score": enrichment.get("risk_score", "medium"),
                "ai_insight": enrichment.get("insight", "No insight available")
            }

            transformed_data.append(transformed_record)
            transformation_log.append(f"🤖 AI-enriched record {record['id']}: {enrichment.get('insight', 'processed')}")

    print(f"✅ Transformed and AI-enriched {len(transformed_data)} records")

    return {
        "transformed_data": transformed_data,
        "transformation_log": transformation_log,
        "execution_time": {**state["execution_time"], "transform": 2.0}
    }

def load_node(state: ETLState) -> ETLState:
    """
    LOAD: Writes transformed data to destination (database, data warehouse, etc.)
    """
    print("\n💾 LOADING data to destination...")

    # In production, this would write to a database or data warehouse
    # For demo, we'll just simulate the load
    loaded_count = len(state["transformed_data"])

    print(f"✅ Loaded {loaded_count} records to destination")

    return {
        "loaded_count": loaded_count,
        "pipeline_status": "success",
        "execution_time": {**state["execution_time"], "load": 0.6}
    }

def monitor_node(state: ETLState) -> ETLState:
    """
    MONITOR: Uses LLM to analyze pipeline execution and provide intelligent insights
    """
    print("\n📊 MONITORING pipeline with AI analysis...")

    total_time = sum(state["execution_time"].values())
    validation_errors = state.get("validation_errors", [])
    ai_analysis = None

    # Only use LLM if available
    if llm is not None:
        # Use TOON format for 45-70% token reduction!
        errors_data = toon_encode({"errors": validation_errors[:10]}) if TOON_AVAILABLE else json.dumps(validation_errors[:10], indent=2)
        sample_data = toon_encode({"samples": state.get('transformed_data', [])[:3]}) if TOON_AVAILABLE else json.dumps(state.get('transformed_data', [])[:3], indent=2)

        json_size = len(json.dumps(validation_errors[:10], indent=2)) + len(json.dumps(state.get('transformed_data', [])[:3], indent=2))
        toon_size = len(errors_data) + len(sample_data)
        toon_savings_tracker["json_chars"] += json_size
        toon_savings_tracker["toon_chars"] += toon_size
        if TOON_AVAILABLE:
            print(f"   💰 TOON saved {json_size - toon_size} chars ({100*(1-toon_size/json_size):.0f}% reduction) for analysis")

        # Use LLM to analyze the pipeline execution and provide recommendations
        analysis_prompt = f"""Analyze this ETL pipeline execution and provide actionable insights:

PIPELINE METRICS:
- Total Records Extracted: {len(state.get('raw_data', []))}
- Valid Records: {len(state.get('validated_data', []))}
- Validation Errors: {len(validation_errors)}
- Records Successfully Loaded: {state.get('loaded_count', 0)}
- Success Rate: {(state.get('loaded_count', 0) / max(len(state.get('raw_data', [])), 1)) * 100:.1f}%

VALIDATION ERRORS (TOON format):
{errors_data}

SAMPLE TRANSFORMED DATA (TOON format):
{sample_data}

Provide a JSON response with:
1. "health_status": "healthy", "warning", or "critical"
2. "data_quality_score": 0-100
3. "top_issues": list of top 3 data quality issues found
4. "recommendations": list of 3 actionable recommendations to improve data quality
5. "business_insight": one key business insight from the data
6. "next_steps": suggested next steps for the data team
"""

        messages = [
            SystemMessage(content="You are a data quality analyst AI. Analyze ETL pipeline results and provide actionable insights. Respond with valid JSON only."),
            HumanMessage(content=analysis_prompt)
        ]

        try:
            response = llm.invoke(messages)
            # Extract JSON from response (handle markdown code blocks)
            content = response.content.strip()
            if content.startswith("```

"):
                content = content.split("

```")[1]
                if content.startswith("json"):
                    content = content[4:]
            content = content.strip()
            ai_analysis = json.loads(content)
            print("   🤖 AI analysis completed successfully!")
        except Exception as e:
            print(f"⚠️  AI analysis failed: {e}")

    # Fallback if LLM not available or failed
    if ai_analysis is None:
        ai_analysis = {
            "health_status": "unknown",
            "data_quality_score": 0,
            "top_issues": ["LLM analysis not available"],
            "recommendations": ["Set OPENAI_API_KEY to enable AI insights"],
            "business_insight": "Enable AI for intelligent insights",
            "next_steps": "Configure API key and re-run"
        }

    # Print the comprehensive report
    print(f"\n{'='*70}")
    print("🤖 AI-POWERED PIPELINE EXECUTION REPORT")
    print(f"{'='*70}")
    print(f"\n📈 METRICS:")
    print(f"   Total Records Extracted: {len(state.get('raw_data', []))}")
    print(f"   Valid Records: {len(state.get('validated_data', []))}")
    print(f"   Validation Errors: {len(validation_errors)}")
    print(f"   Records Loaded: {state.get('loaded_count', 0)}")
    print(f"   Total Execution Time: {total_time:.2f}s")
    print(f"   Pipeline Status: {state.get('pipeline_status', 'unknown')}")

    print(f"\n🤖 AI ANALYSIS:")
    print(f"   Health Status: {ai_analysis.get('health_status', 'unknown').upper()}")
    print(f"   Data Quality Score: {ai_analysis.get('data_quality_score', 'N/A')}/100")

    print(f"\n⚠️  TOP DATA QUALITY ISSUES:")
    for issue in ai_analysis.get('top_issues', []):
        print(f"{issue}")

    print(f"\n💡 AI RECOMMENDATIONS:")
    for rec in ai_analysis.get('recommendations', []):
        print(f"{rec}")

    print(f"\n📊 BUSINESS INSIGHT:")
    print(f"   {ai_analysis.get('business_insight', 'No insight available')}")

    print(f"\n🚀 SUGGESTED NEXT STEPS:")
    print(f"   {ai_analysis.get('next_steps', 'No suggestions available')}")

    if validation_errors:
        print(f"\n⚠️  VALIDATION ERRORS (showing first 10):")
        for error in validation_errors[:10]:
            print(f"   - {error}")
        if len(validation_errors) > 10:
            print(f"   ... and {len(validation_errors) - 10} more errors")

    print(f"{'='*70}\n")

    return {
        **state,
        "ai_analysis": ai_analysis
    }

def report_node(state: ETLState) -> ETLState:
    """
    REPORT: Generates a comprehensive ETL status report and saves it to file
    """
    print("\n📄 GENERATING ETL STATUS REPORT...")

    # Get current timestamp for report
    report_time = datetime.now()

    # Calculate metrics
    total_records = len(state.get('raw_data', []))
    valid_records = len(state.get('validated_data', []))
    loaded_records = state.get('loaded_count', 0)
    error_count = len(state.get('validation_errors', []))
    success_rate = (loaded_records / max(total_records, 1)) * 100

    ai_analysis = state.get('ai_analysis', {})

    # Build the report
    report = {
        "report_metadata": {
            "report_id": f"ETL-{report_time.strftime('%Y%m%d-%H%M%S')}",
            "generated_at": report_time.isoformat(),
            "pipeline_name": "Modern ETL Pipeline with LangGraph + LLM"
        },
        "execution_summary": {
            "status": state.get('pipeline_status', 'unknown'),
            "total_records_extracted": total_records,
            "valid_records": valid_records,
            "records_loaded": loaded_records,
            "validation_errors": error_count,
            "success_rate_percent": round(success_rate, 2),
            "execution_time": state.get('execution_time', {})
        },
        "ai_analysis": {
            "health_status": ai_analysis.get('health_status', 'unknown'),
            "data_quality_score": ai_analysis.get('data_quality_score', 0),
            "top_issues": ai_analysis.get('top_issues', []),
            "recommendations": ai_analysis.get('recommendations', []),
            "business_insight": ai_analysis.get('business_insight', ''),
            "next_steps": ai_analysis.get('next_steps', '')
        },
        "validation_errors": state.get('validation_errors', []),
        "sample_transformed_data": state.get('transformed_data', [])[:5]
    }

    # Save report to JSON file
    script_dir = os.path.dirname(os.path.abspath(__file__))
    report_filename = f"etl_report_{report_time.strftime('%Y%m%d_%H%M%S')}.json"
    report_path = os.path.join(script_dir, report_filename)

    with open(report_path, 'w') as f:
        json.dump(report, f, indent=2, default=str)

    print(f"✅ Report saved to: {report_path}")

    # Also create a human-readable summary
    summary_filename = f"etl_summary_{report_time.strftime('%Y%m%d_%H%M%S')}.txt"
    summary_path = os.path.join(script_dir, summary_filename)

    with open(summary_path, 'w') as f:
        f.write("=" * 70 + "\n")
        f.write("ETL PIPELINE STATUS REPORT\n")
        f.write("=" * 70 + "\n\n")
        f.write(f"Report ID: {report['report_metadata']['report_id']}\n")
        f.write(f"Generated: {report_time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
        f.write("-" * 70 + "\n")
        f.write("EXECUTION SUMMARY\n")
        f.write("-" * 70 + "\n")
        f.write(f"Status: {report['execution_summary']['status'].upper()}\n")
        f.write(f"Total Records: {total_records}\n")
        f.write(f"Valid Records: {valid_records}\n")
        f.write(f"Loaded Records: {loaded_records}\n")
        f.write(f"Errors: {error_count}\n")
        f.write(f"Success Rate: {success_rate:.1f}%\n\n")
        f.write("-" * 70 + "\n")
        f.write("AI ANALYSIS\n")
        f.write("-" * 70 + "\n")
        f.write(f"Health Status: {ai_analysis.get('health_status', 'unknown').upper()}\n")
        f.write(f"Data Quality Score: {ai_analysis.get('data_quality_score', 0)}/100\n\n")
        f.write("Top Issues:\n")
        for issue in ai_analysis.get('top_issues', []):
            f.write(f"{issue}\n")
        f.write("\nRecommendations:\n")
        for rec in ai_analysis.get('recommendations', []):
            f.write(f"{rec}\n")
        f.write(f"\nBusiness Insight:\n  {ai_analysis.get('business_insight', 'N/A')}\n")
        f.write("\n" + "=" * 70 + "\n")

    print(f"✅ Summary saved to: {summary_path}")

    return {
        **state,
        "report_path": report_path
    }

# ============== CONDITIONAL ROUTING ==============

def should_continue_to_transform(state: ETLState) -> str:
    """
    Decides whether to proceed to transformation or skip to monitoring
    based on data quality
    """
    valid_count = len(state.get("validated_data", []))
    total_count = len(state.get("raw_data", []))

    # If we have at least some valid data, proceed with transformation
    if valid_count > 0:
        print(f"✅ Quality check passed: {valid_count}/{total_count} records valid")
        return "has_valid_data"  # Edge label for the diagram
    else:
        print(f"❌ Quality check failed: No valid records to process")
        return "no_valid_data"   # Edge label for the diagram

# ============== BUILD THE GRAPH ==============

def create_etl_pipeline():
    """
    Constructs the LangGraph ETL pipeline
    """
    # Initialize the graph with our state schema
    workflow = StateGraph(ETLState)

    # Add all nodes
    workflow.add_node("extract", extract_node)
    workflow.add_node("validate", validate_node)
    workflow.add_node("transform", transform_node)
    workflow.add_node("load", load_node)
    workflow.add_node("monitor", monitor_node)
    workflow.add_node("report", report_node)

    # Define the flow
    workflow.add_edge(START, "extract")
    workflow.add_edge("extract", "validate")

    # Conditional routing based on validation results with labeled edges
    workflow.add_conditional_edges(
        "validate",
        should_continue_to_transform,
        {
            "has_valid_data": "transform",   # Shows "has_valid_data" label on edge
            "no_valid_data": "monitor"       # Shows "no_valid_data" label on edge
        }
    )

    workflow.add_edge("transform", "load")
    workflow.add_edge("load", "monitor")
    workflow.add_edge("monitor", "report")
    workflow.add_edge("report", END)

    return workflow.compile()

# ============== RUN THE PIPELINE ==============

if __name__ == "__main__":
    print("🚀 Starting MODERN ETL Pipeline with LangGraph + LLM\n")
    print("=" * 70)
    print("This pipeline uses OpenAI GPT-4 for intelligent data transformation")
    print("and analysis - making it truly MODERN and adaptive!")
    print("=" * 70 + "\n")

    # Create and compile the pipeline
    pipeline = create_etl_pipeline()

    # Initialize state
    initial_state = {
        "raw_data": [],
        "validated_data": [],
        "transformed_data": [],
        "loaded_count": 0,
        "validation_errors": [],
        "transformation_log": [],
        "pipeline_status": "running",
        "execution_time": {},
        "ai_analysis": {},
        "report_path": ""
    }

    # Execute the pipeline
    final_state = pipeline.invoke(initial_state)

    print("✨ Pipeline execution completed!")

    # Display AI-enriched transformed data preview
    print("\n" + "=" * 70)
    print("📋 AI-ENRICHED DATA PREVIEW (First 5 Records)")
    print("=" * 70)
    for record in final_state.get("transformed_data", [])[:5]:
        print(f"\n🧑 {record.get('customer_name', 'Unknown')}")
        print(f"   💰 Amount: ${record.get('order_amount', 0):,.2f}")
        print(f"   📊 Segment: {record.get('customer_segment', 'unknown')}")
        print(f"   🏢 Industry: {record.get('industry', 'Unknown')}")
        print(f"   📈 Company Size: {record.get('company_size', 'unknown')}")
        print(f"   ⚠️  Risk: {record.get('risk_score', 'unknown')}")
        print(f"   🤖 AI Insight: {record.get('ai_insight', 'N/A')}")

    # Print TOON savings summary
    if TOON_AVAILABLE and toon_savings_tracker["json_chars"] > 0:
        print("\n" + "=" * 70)
        print("💰 TOON TOKEN SAVINGS SUMMARY")
        print("=" * 70)
        json_total = toon_savings_tracker["json_chars"]
        toon_total = toon_savings_tracker["toon_chars"]
        saved = json_total - toon_total
        pct = 100 * (1 - toon_total / json_total)
        print(f"   📊 JSON format would use:  {json_total:,} characters")
        print(f"   ✨ TOON format used:       {toon_total:,} characters")
        print(f"   💰 Characters saved:       {saved:,} ({pct:.1f}% reduction)")
        print(f"   🎯 Estimated token savings: ~{saved // 4:,} tokens")
        print(f"   💵 At $0.15/1M tokens:     ~${(saved // 4) * 0.15 / 1000000:.4f} saved per run")
        print(f"\n   🚀 At 10,000 runs/day: ~${(saved // 4) * 0.15 / 1000000 * 10000 * 365:.2f}/year saved!")

    # Generate and save the LangGraph diagram
    print("\n" + "=" * 70)
    print("📊 GENERATING LANGGRAPH DIAGRAM")
    print("=" * 70)
    try:
        # Get the graph image
        graph_image = pipeline.get_graph().draw_mermaid_png()

        # Save to file
        graph_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "etl_pipeline_graph.png")
        with open(graph_path, "wb") as f:
            f.write(graph_image)
        print(f"✅ Graph saved to: {graph_path}")

        # Also print Mermaid diagram text
        print("\n📝 Mermaid Diagram Code:")
        print(pipeline.get_graph().draw_mermaid())
    except Exception as e:
        print(f"⚠️  Could not generate graph image: {e}")
        print("   Try: pip install grandalf")


Enter fullscreen mode Exit fullscreen mode

Report (output)

Final Thoughts

I spent a weekend building an ETL pipeline that uses GPT-4 to make intelligent decisions about data, and then optimized it with TOON to cut costs by 52%.

It's probably overkill for most use cases, still too expensive for massive scale, and definitely slower than traditional approaches.

But for the right problem when you need genuine contextual understanding, when you're working with ambiguous data, when you're processing reasonable volumes it's genuinely magical to watch an AI understand your data instead of just processing it.

The key is knowing the difference between "this is cool" and "this is the right tool for the job."

Sometimes they're the same thing. Usually they're not.

But when they are? That's when the magic happens.

Thanks
Sreeni Ramadorai

Top comments (0)