DEV Community

Grady Dillon
Grady Dillon

Posted on • Originally published at Medium on

Build a RAG Pipeline That Actually Reads the Web

An illustration of an AI pipeline flow on a light blue background. On the left, messy web page mockups display HTML code and disruptive pop-ups like “Ad banner” and “Accept Cookies.” Arrows point from the clutter into a central box with a gear/checkmark logo labeled “WellMarked.” A single arrow shows clean, formatted markdown text (labeled “## Title”) flowing out of “WellMarked” into a “Vector store” cube, which connects to a chatbot bubble with an “AI” badge.
Transform web noise into AI knowledge. The flow shows how WellMarked strips away ads and cookie banners to convert raw HTML into clean data for your RAG pipeline.

Most RAG tutorials start with a folder of PDFs. That’s fine for demos, but the real world runs on URLs.

Your users want to ask questions about a competitor’s docs, a news article published this morning, a GitHub README, or a product page that didn’t exist when you trained your model. For all of that, you need to fetch and clean live web content before it ever touches an embedding model or an LLM.

The problem is that raw HTML is terrible LLM input. A typical article page is 80% navigation, cookie banners, footers, ads, and tracking scripts. Feed that to an embedding model and you’re wasting tokens, polluting your vector store, and hallucinating answers from sidebar text.

In this tutorial we’ll build a clean, production-ready RAG pipeline that:

  1. Fetches any URL and extracts just the article content as clean Markdown
  2. Chunks and embeds that content into a local vector store
  3. Answers natural-language questions grounded in what it actually read

We’ll use WellMarked for extraction, ChromaDB as the vector store, sentence-transformers for embeddings, and the OpenAI API for the final answer step. Everything except the OpenAI call is fully open-source and runs locally.

Prerequisites

pip install wellmarked chromadb sentence-transformers openai
Enter fullscreen mode Exit fullscreen mode

You’ll need:

  • A WellMarked API key — free tier gives you 500 requests/month, no card required. Get one at wellmarked.io.
  • An OpenAI API key (only used at the final QA step — swap in any LLM you prefer).

Set both as environment variables:

export WELLMARKED_API_KEY="wm_..."
export OPENAI_API_KEY="sk-..."
Enter fullscreen mode Exit fullscreen mode

The wellmarked SDK automatically picks up WELLMARKED_API_KEY, so you won't need to pass it explicitly in code.

Step 1: Extract a URL to Clean Markdown

Let’s start with a single URL to see what we’re working with.

from wellmarked import WellMarked

with WellMarked() as wm:
    result = wm.extract("https://en.wikipedia.org/wiki/Retrieval-augmented_generation")
    print(result.metadata.title)
    print(result.metadata.author)
    print(result.metadata.date)
    print()
    print(result.markdown[:500])
Enter fullscreen mode Exit fullscreen mode

Output:

Retrieval-augmented generation
None
2023-11-05

# Retrieval-augmented generation

**Retrieval-augmented generation** ( **RAG** ) is a technique that enables large language models (LLMs) to retrieve and incorporate new information from external data sources.[1] With RAG, LLMs first refer to a specified set of documents, then respond to user queries...
Enter fullscreen mode Exit fullscreen mode

That’s the article body — no menus, no “Read more” links, no cookie notice. The metadata object carries whatever structured information the page publishes (title, author, date). Fields can be None if the page doesn't include them, so always treat metadata as nullable.

Step 2: Extract Many URLs at Once with Bulk

For a real pipeline you’ll have a list of URLs. Rather than looping and making one request per URL, use the bulk endpoint. WellMarked processes them concurrently server-side and returns all results in one job.

from wellmarked import WellMarked, RateLimitError, UnprocessableEntityError

URLS = [
    "https://en.wikipedia.org/wiki/Retrieval-augmented_generation",
    "https://en.wikipedia.org/wiki/Vector_database",
    "https://en.wikipedia.org/wiki/Sentence_embedding",
    "https://en.wikipedia.org/wiki/Large_language_model",
    "https://en.wikipedia.org/wiki/Transformer_(deep_learning_architecture)",
]

with WellMarked() as wm:
    # Check quota before submitting — bulk jobs are reserved atomically.
    # If you don't have enough requests left, the whole job is rejected.
    usage = wm.get_usage()
    print(f"Quota: {usage.used}/{usage.limit} used — {usage.remaining} remaining")
    if usage.remaining < len(URLS):
        raise RuntimeError("Not enough quota for this batch.")
    # Submit and block until all URLs are done.
    job = wm.bulk(URLS)
    job = wm.wait_for_job(job.job_id)

    # Separate successes from failures.
    documents = []
    for item in job.results:
        if item.ok:
            documents.append({
                "url": item.url,
                "title": item.metadata.title,
                "markdown": item.markdown,
            })
        else:
            print(f"[WARN] {item.url} failed: {item.error}")

    print(f"\nExtracted {len(documents)} documents successfully.")
Enter fullscreen mode Exit fullscreen mode

wait_for_job polls every 2 seconds under the hood until status == "done". Each item in job.results has an ok property — True on success, False on failure. Per-URL failures (e.g. a page that times out or has no content) don't fail the whole job, so always check item.ok before accessing item.markdown.

Step 3: Chunk the Documents

Embedding models have a token limit, and long articles will exceed it. We need to split each document into overlapping chunks before embedding.

def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
    """Split text into word-count chunks with overlap."""
    words = text.split()
    chunks = []
    step = chunk_size - overlap
    for i in range(0, len(words), step):
        chunk = " ".join(words[i : i + chunk_size])
        if chunk:
            chunks.append(chunk)
    return chunks

all_chunks = []
all_metadata = []

for doc in documents:
    chunks = chunk_text(doc["markdown"], chunk_size=500, overlap=50)
    for i, chunk in enumerate(chunks):
        all_chunks.append(chunk)
        all_metadata.append({
            "url": doc["url"],
            "title": doc["title"] or "Untitled",
            "chunk_index": i,
        })

print(f"Total chunks to embed: {len(all_chunks)}")
Enter fullscreen mode Exit fullscreen mode

Chunk size and overlap are worth tuning for your use case. 500 words with 50-word overlap works well for Wikipedia-style prose. For dense technical docs you may want smaller chunks; for narrative text, larger ones.

Step 4: Embed and Store in ChromaDB

import chromadb
from sentence_transformers import SentenceTransformer

# Local embedding model - no API key, runs on CPU.
embedder = SentenceTransformer("all-MiniLM-L6-v2")
# Local ChromaDB - data lives in ./chroma_db.
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_or_create_collection(
    name="rag_demo",
    metadata={"hnsw:space": "cosine"},
)

# Embed in batches to avoid memory pressure.
BATCH_SIZE = 64
ids = [f"chunk_{i}" for i in range(len(all_chunks))]
for start in range(0, len(all_chunks), BATCH_SIZE):
    batch_texts = all_chunks[start : start + BATCH_SIZE]
    batch_meta = all_metadata[start : start + BATCH_SIZE]
    batch_ids = ids[start : start + BATCH_SIZE]
    batch_embeddings = embedder.encode(batch_texts, show_progress_bar=False).tolist()
    collection.upsert(
        ids=batch_ids,
        documents=batch_texts,
        embeddings=batch_embeddings,
        metadatas=batch_meta,
    )

print(f"Stored {collection.count()} chunks in ChromaDB.")
Enter fullscreen mode Exit fullscreen mode

all-MiniLM-L6-v2 is fast, small (80 MB), and surprisingly good for retrieval tasks. If you need higher quality and don't mind slower inference, swap in all-mpnet-base-v2.

Step 5: Query the Pipeline

Now the fun part. Given a natural-language question, we embed it, retrieve the most relevant chunks, and pass them to an LLM as context.

from openai import OpenAI

openai_client = OpenAI()

def ask(question: str, top_k: int = 5) -> str:
    # 1. Embed the question with the same model used at index time.
    question_embedding = embedder.encode(question).tolist()
    # 2. Retrieve the top-k most similar chunks.
    results = collection.query(
        query_embeddings=[question_embedding],
        n_results=top_k,
        include=["documents", "metadatas"],
    )
    chunks = results["documents"][0]
    metadatas = results["metadatas"][0]
    # 3. Build a grounded context block.
    context_parts = []
    for chunk, meta in zip(chunks, metadatas):
        source = f"[{meta['title']}]({meta['url']})"
        context_parts.append(f"Source: {source}\n\n{chunk}")
    context = "\n\n---\n\n".join(context_parts)
    # 4. Ask the LLM to answer using only the retrieved context.
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a helpful assistant. Answer the user's question using "
                    "only the provided context. If the context doesn't contain enough "
                    "information to answer, say so honestly. Cite your sources."
                ),
            },
            {
                "role": "user",
                "content": f"Context:\n\n{context}\n\nQuestion: {question}",
            },
        ],
    )
    return response.choices[0].message.content

# Try it out.
answer = ask("What problem does RAG solve compared to standard fine-tuning?")
print(answer)
Enter fullscreen mode Exit fullscreen mode

Example output:

RAG addresses a core limitation of standard fine-tuning: knowledge staleness and
hallucination. Fine-tuning bakes information into model weights, meaning the model
can't be updated cheaply as the world changes and may confabulate facts it wasn't
trained on. RAG sidesteps this by retrieving relevant documents at inference time
and conditioning the answer on that retrieved content, keeping the model's
parametric knowledge as a reasoning engine rather than a fact store.
Enter fullscreen mode Exit fullscreen mode

Step 6: Handle Errors Gracefully

The wellmarked SDK raises typed exceptions for every failure mode. Here's how to handle the ones you'll encounter in production:

from wellmarked import (
    WellMarked,
    RateLimitError,
    UnprocessableEntityError,
    APIConnectionError,
)

def safe_extract(wm: WellMarked, url: str) -> str | None:
    try:
        result = wm.extract(url)
        return result.markdown
    except RateLimitError as e:
        # Monthly quota exhausted. retry_after is seconds until reset.
        print(f"Quota exhausted. Resets in {e.retry_after}s ({e.retry_after // 86400} days).")
        return None
    except UnprocessableEntityError as e:
        if e.code == "no_content":
            # Page exists but has no extractable article content.
            # Common for login walls, redirect chains, or blank SPAs.
            print(f"No content found at {url} - try render_js=True for JS-heavy pages.")
        elif e.code == "target_timeout":
            print(f"Target URL timed out: {url}")
        else:
            print(f"Extraction failed ({e.code}): {e.message}")
        return None
    except APIConnectionError:
        print(f"Network error reaching WellMarked - check connectivity.")
        return None
Enter fullscreen mode Exit fullscreen mode

For JS-rendered pages (single-page apps, dynamic dashboards), add render_js=True to the extract call. This requires a Pro or Enterprise plan and uses Playwright under the hood:

result = wm.extract("https://some-spa.example.com/article", render_js=True)
Enter fullscreen mode Exit fullscreen mode

Bonus: Async Version for Production

If you’re running this inside a FastAPI endpoint, an async agent loop, or any other async context, swap WellMarked for AsyncWellMarked:

import asyncio
from wellmarked import AsyncWellMarked

async def build_index(urls: list[str]) -> list[dict]:
    async with AsyncWellMarked() as wm:
        job = await wm.bulk(urls)
        job = await wm.wait_for_job(job.job_id)
    return [
        {"url": item.url, "title": item.metadata.title, "markdown": item.markdown}
        for item in job.results
        if item.ok
    ]

documents = asyncio.run(build_index(URLS))
Enter fullscreen mode Exit fullscreen mode

Every method on AsyncWellMarked is a coroutine. The interface is otherwise identical to the sync client, so migrating is a drop-in swap.

Putting It All Together

Here’s the complete pipeline in one script:

import os
import asyncio
import chromadb
from sentence_transformers import SentenceTransformer
from openai import OpenAI
from wellmarked import AsyncWellMarked, RateLimitError

URLS = [
    "https://en.wikipedia.org/wiki/Retrieval-augmented_generation",
    "https://en.wikipedia.org/wiki/Vector_database",
    "https://en.wikipedia.org/wiki/Sentence_embedding",
    "https://en.wikipedia.org/wiki/Large_language_model",
    "https://en.wikipedia.org/wiki/Transformer_(deep_learning_architecture)",
]
embedder = SentenceTransformer("all-MiniLM-L6-v2")
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("rag_demo", metadata={"hnsw:space": "cosine"})
openai_client = OpenAI()

def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
    words = text.split()
    return [
        " ".join(words[i : i + chunk_size])
        for i in range(0, len(words), chunk_size - overlap)
        if words[i : i + chunk_size]
    ]

async def ingest(urls: list[str]) -> None:
    async with AsyncWellMarked() as wm:
        try:
            usage = await wm.get_usage()
            print(f"Quota: {usage.remaining} requests remaining")
            if usage.remaining < len(urls):
                raise RuntimeError("Insufficient quota.")
            job = await wm.bulk(urls)
            job = await wm.wait_for_job(job.job_id)
        except RateLimitError as e:
            raise RuntimeError(f"Rate limited. Retry in {e.retry_after}s.") from e
    chunks, metadatas, ids = [], [], []
    for item in job.results:
        if not item.ok:
            print(f"[WARN] {item.url}: {item.error}")
            continue
        for i, chunk in enumerate(chunk_text(item.markdown)):
            chunks.append(chunk)
            metadatas.append({"url": item.url, "title": item.metadata.title or "Untitled", "chunk_index": i})
            ids.append(f"{item.url}::chunk_{i}")
    embeddings = embedder.encode(chunks, batch_size=64, show_progress_bar=True).tolist()
    collection.upsert(ids=ids, documents=chunks, embeddings=embeddings, metadatas=metadatas)
    print(f"Indexed {len(chunks)} chunks from {len(job.results)} documents.")

def ask(question: str, top_k: int = 5) -> str:
    results = collection.query(
        query_embeddings=[embedder.encode(question).tolist()],
        n_results=top_k,
        include=["documents", "metadatas"],
    )
    context = "\n\n---\n\n".join(
        f"Source: [{m['title']}]({m['url']})\n\n{doc}"
        for doc, m in zip(results["documents"][0], results["metadatas"][0])
    )
    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "Answer using only the provided context. Cite sources. If unsure, say so."},
            {"role": "user", "content": f"Context:\n\n{context}\n\nQuestion: {question}"},
        ],
    )
    return response.choices[0].message.content

if __name__ == " __main__":
    asyncio.run(ingest(URLS))
    print(ask("What problem does RAG solve compared to standard fine-tuning?"))
    print(ask("How do vector databases store and retrieve embeddings?"))
Enter fullscreen mode Exit fullscreen mode

What’s Next

This pipeline reads from a static list of URLs, but the same pattern extends naturally:

  • Agent-driven ingestion  — let an LLM agent decide which URLs to fetch based on a user’s question, extract them on the fly, and answer from the fresh content.
  • Incremental updates  — ChromaDB’s upsert is idempotent on ID. Re-run ingest with updated URLs and only changed chunks will be replaced.
  • JS-rendered pages  — add render_js=True to handle SPAs, paywalled previews, and dynamically loaded content (requires Pro or Enterprise).
  • Scheduled indexing  — wrap ingest() in a cron job to keep your index fresh as the web changes.

The extraction layer is intentionally thin. WellMarked’s job is to hand you clean Markdown; what you do with it — chunking strategy, embedding model, vector store, retrieval algorithm — is entirely up to you.

WellMarked is a URL-to-Markdown API built for AI pipelines. Free tier: 500 requests/month, no card required. wellmarked.io — pip install wellmarked

Top comments (0)