DEV Community

Cover image for Your ETL Pipeline Wasn't Built for AI — Here's How to Fix It in 2026
Gabriel Henrique
Gabriel Henrique

Posted on

Your ETL Pipeline Wasn't Built for AI — Here's How to Fix It in 2026

Your ETL Pipeline Wasn't Built for AI — Here's How to Fix It in 2026

You've got a beautiful data pipeline. It extracts from your sources, transforms everything cleanly, loads into the warehouse on schedule. Tests pass. Stakeholders are happy. Life is good.

Then someone says: "Can we plug this into our LLM?"

And suddenly your beautiful pipeline is useless.

Not because it's broken — it works perfectly for what it was designed to do. The problem is that traditional ETL was designed for SQL queries, dashboards, and human analysts. LLMs need something fundamentally different: context, meaning, and vectors. And if your pipeline doesn't produce those, your AI is flying blind.

This is the silent crisis in data engineering right now. Companies are spending millions on LLM infrastructure while their underlying data pipelines are still shipping rows and columns to a warehouse that an AI can barely reason about.

Let's fix that.


What Does an LLM Actually Need From Your Data?

When a human analyst queries your warehouse, they write SQL. They're smart enough to know that status = 'churned' means a customer who cancelled their subscription. They bring their own context.

An LLM doesn't have that luxury — at least not without help. When you ask a model "why are enterprise customers churning?", it can't just run SELECT * FROM churn_events. It needs semantically relevant context — passages, records, or summaries that are meaning-close to the question being asked.

That's where RAG (Retrieval-Augmented Generation) comes in.

Think of RAG like this: instead of the LLM trying to remember everything (it can't — its context window is finite), you build a library. Every time the LLM needs to answer a question, it walks into that library, finds the most relevant pages, and reads them before answering.

Your job as a data engineer is to build and maintain that library.

And a library isn't a database. A library is organized by meaning, not by rows and columns.


The AI-Native Pipeline: What's Different

Here's the shift in mindset. Traditional ETL produces:

raw data → clean tables → warehouse → SQL queries
Enter fullscreen mode Exit fullscreen mode

An AI-native pipeline produces:

raw data → cleaned chunks → embeddings → vector store → semantic retrieval
Enter fullscreen mode Exit fullscreen mode

The new ingredients are chunks, embeddings, and a vector store. Let's break each down.

Chunks

You can't feed an entire database table into an LLM. Even if you could, it would be wasteful and noisy. Instead, you break your data into chunks — small, meaningful pieces of text that can be retrieved independently.

A chunk might be:

  • A paragraph from a customer support ticket
  • A 3-sentence description of a product
  • A summarized row of metadata about a sales event

The art here is in the chunking strategy. Too small, and a chunk loses its context. Too large, and you're wasting tokens and retrieval precision. In practice, 512–1024 tokens with ~10% overlap between chunks is a solid starting point.

Embeddings

An embedding is a list of numbers — a vector — that represents the meaning of a piece of text. Two texts with similar meanings will have vectors that are close together in space, even if they use completely different words.

"Customer stopped paying" and "subscription was cancelled due to billing failure" have very different words. But in vector space, they're neighbors.

That's the magic. And it's what makes semantic search possible.

Vector Store

A vector store is a database optimized for one special kind of query: "give me the N vectors most similar to this query vector." Systems like pgvector, Qdrant, Chroma, and Weaviate are built exactly for this.


Building the Pipeline: A Practical Walkthrough

Let's get concrete. Here's a complete AI-native ingestion pipeline in Python.

Step 1: Load and Chunk Your Data

from langchain.text_splitter import RecursiveCharacterTextSplitter

# Imagine this comes from your warehouse, S3, or an API
raw_documents = [
    {"id": "ticket_001", "text": "Customer says dashboard is not loading. Error 502. Happened after the deploy on June 3rd. They're on the Enterprise plan."},
    {"id": "ticket_002", "text": "User cannot export reports to CSV. The button is greyed out. They say it worked last week. Basic plan."},
    # ... thousands more
]

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=50,
    length_function=len,
)

chunks = []
for doc in raw_documents:
    splits = splitter.split_text(doc["text"])
    for i, chunk in enumerate(splits):
        chunks.append({
            "id": f"{doc['id']}_chunk_{i}",
            "text": chunk,
            "source_id": doc["id"]
        })

print(f"Created {len(chunks)} chunks from {len(raw_documents)} documents")
Enter fullscreen mode Exit fullscreen mode

The RecursiveCharacterTextSplitter is smart — it tries to split on paragraph breaks, then sentences, then words. It keeps semantic boundaries intact wherever it can.

Step 2: Generate Embeddings

from openai import OpenAI

client = OpenAI()  # uses OPENAI_API_KEY env var

def embed_batch(texts: list[str], model="text-embedding-3-large") -> list[list[float]]:
    """Embed a batch of texts. Returns list of vectors."""
    response = client.embeddings.create(
        input=texts,
        model=model,
        dimensions=1024  # trade-off: smaller = cheaper, slightly less precise
    )
    return [item.embedding for item in response.data]

# Process in batches to respect rate limits
BATCH_SIZE = 100
embedded_chunks = []

for i in range(0, len(chunks), BATCH_SIZE):
    batch = chunks[i:i + BATCH_SIZE]
    texts = [c["text"] for c in batch]
    vectors = embed_batch(texts)

    for chunk, vector in zip(batch, vectors):
        embedded_chunks.append({**chunk, "vector": vector})

print(f"Embedded {len(embedded_chunks)} chunks")
Enter fullscreen mode Exit fullscreen mode

Two things to notice here: we're batching (OpenAI has rate limits and batching is cheaper), and we're using dimensions=1024 instead of the default 3072. For most use cases, 1024 dimensions give you 95% of the precision at a third of the cost. Worth it.

Step 3: Store in a Vector Database

Here's the same code using pgvector (PostgreSQL with vector support) — a great choice if you're already running Postgres and don't want another managed service:

import psycopg2
import json

conn = psycopg2.connect("postgresql://user:password@localhost:5432/mydb")
cur = conn.cursor()

# One-time setup: enable the extension and create the table
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
cur.execute("""
    CREATE TABLE IF NOT EXISTS doc_chunks (
        id TEXT PRIMARY KEY,
        source_id TEXT,
        content TEXT,
        embedding vector(1024),
        created_at TIMESTAMPTZ DEFAULT NOW()
    );
""")
cur.execute("""
    CREATE INDEX IF NOT EXISTS doc_chunks_embedding_idx 
    ON doc_chunks USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);
""")
conn.commit()

# Insert the embedded chunks
for chunk in embedded_chunks:
    cur.execute("""
        INSERT INTO doc_chunks (id, source_id, content, embedding)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (id) DO UPDATE SET
            content = EXCLUDED.content,
            embedding = EXCLUDED.embedding;
    """, (
        chunk["id"],
        chunk["source_id"],
        chunk["text"],
        json.dumps(chunk["vector"])
    ))

conn.commit()
cur.close()
conn.close()
print("All chunks stored in pgvector!")
Enter fullscreen mode Exit fullscreen mode

The ivfflat index is what makes queries fast at scale. Without it, every query does a full table scan. With it, Postgres clusters vectors into "lists" and searches only the most promising ones — approximate nearest neighbor search, blazing fast.

Step 4: Retrieval at Query Time

def retrieve_relevant_chunks(query: str, top_k: int = 5) -> list[dict]:
    """Given a natural language query, find the most relevant stored chunks."""

    # Embed the query using the same model
    query_vector = embed_batch([query])[0]

    conn = psycopg2.connect("postgresql://user:password@localhost:5432/mydb")
    cur = conn.cursor()

    cur.execute("""
        SELECT id, source_id, content,
               1 - (embedding <=> %s::vector) AS similarity
        FROM doc_chunks
        ORDER BY embedding <=> %s::vector
        LIMIT %s;
    """, (json.dumps(query_vector), json.dumps(query_vector), top_k))

    results = [
        {"id": row[0], "source_id": row[1], "content": row[2], "similarity": row[3]}
        for row in cur.fetchall()
    ]

    cur.close()
    conn.close()
    return results

# Try it out
results = retrieve_relevant_chunks("why are enterprise users reporting errors after deploys?")
for r in results:
    print(f"[{r['similarity']:.3f}] {r['content'][:100]}...")
Enter fullscreen mode Exit fullscreen mode

The <=> operator is pgvector's cosine distance. 1 - cosine_distance = cosine_similarity. The results will be the chunks most semantically close to your query — even if they don't share a single keyword with it.


Practical Tips for 2026

1. Don't skip metadata. Store the source ID, timestamp, author, and any other context alongside your vectors. Metadata filtering (e.g., "only search tickets from Enterprise customers in the last 30 days") is often more important than the semantic search itself.

2. Re-embed when the model changes. If you upgrade from text-embedding-3-small to text-embedding-3-large, you need to re-embed everything. Different models produce incompatible vector spaces. Build this into your pipeline versioning from day one.

3. Evaluate retrieval quality separately from generation quality. The #1 mistake is blaming the LLM when the real problem is your retrieval. If the right chunks aren't being retrieved, the best model in the world will give you garbage. Use tools like RAGAS to measure retrieval precision/recall independently.

4. pgvector is enough for most teams. Unless you're storing hundreds of millions of vectors, you don't need a dedicated vector database. pgvector in your existing Postgres is simpler to operate, cheaper, and lets you join vectors with your regular tables. Optimize later if you need to.

5. Chunking is your most impactful lever. Changing the LLM might give you 5% better answers. Fixing your chunking strategy might give you 40%. It's unglamorous, but it's where the results are.


The Bigger Picture

The shift to AI-native data engineering isn't about throwing away what you've built. It's about extending it.

Your bronze/silver/gold lakehouse layers? Still valid — but add a "semantic layer" where data is chunked, embedded, and indexed for retrieval. Your Airflow DAGs? Still valid — add a daily job that re-embeds new documents and updates the vector store. Your data quality checks? Still valid — add checks for embedding freshness and retrieval coverage.

Think of it as adding a new output format to your pipelines. You've always produced clean tables. Now you also produce vector indexes. Same discipline, new artifact.

The engineers who learn to build both will be the ones building the AI systems that actually work — the ones where the model has the context it needs to be genuinely useful, not just impressively fluent.

Your pipeline deserves to be as smart as the AI it's feeding.


Abs,

Gabriel Henrique Cardoso Antonio
🔗 gabrielh.dev

Top comments (0)