DEV Community

tokozen
tokozen

Posted on

Building a RAG Pipeline That Stays Fresh with Live Web Data

Building a RAG Pipeline That Stays Fresh with Live Web Data

You build a RAG pipeline, embed your documents, stand up a vector store, and it works great. Then three months later, users start complaining that the answers are wrong. Your product pricing changed. A regulation was updated. A library released a breaking version. The documents you indexed at setup time are now lying to your users.

The fix is not to re-index more aggressively. The fix is to stop treating the web as a one-time data source and start treating it as a live feed that your pipeline can query at retrieval time.

Here is how to wire that up.

The Core Problem with Static RAG

Standard RAG looks like this: ingest documents, chunk them, embed them, store vectors, retrieve on query, generate. Every step happens at ingest time except retrieval and generation. That is fine for a corporate knowledge base with a weekly update cycle. It breaks down when:

  • You are answering questions about prices, regulations, or news
  • Your users ask about things that happened after your last ingest
  • The authoritative source is a website that updates continuously

The mental model shift is to treat retrieval as a two-stage process. First, check your local vector store for stable reference content (your docs, FAQs, internal data). Second, run a live web query for anything time-sensitive and merge those results into the context window before generation.

What the Pipeline Looks Like

Here is the revised flow:

  1. User sends a query
  2. Classify the query: does it need fresh data?
  3. If yes, fire a web search, scrape the top results, pull clean text
  4. Combine local retrieval results with fresh web content
  5. Pack both into the prompt context and generate

The classification step does not need to be fancy. A simple heuristic works: if the query contains words like "current", "latest", "today", "now", "price", or a named entity that changes frequently, route it to the live path. You can also ask the LLM itself with a one-shot classifier prompt.

The scraping step is where most implementations fall apart. Raw HTML is terrible context. You want clean, structured text. Anakin's Scrape API handles this well: you give it a URL and it returns the page content as clean markdown or plain text, stripping nav, ads, and boilerplate. That matters a lot because every token of garbage HTML in your context window is a token not used for actual reasoning.

A Concrete Implementation

import os
import requests
from openai import OpenAI

ANAKIN_API_KEY = os.environ["ANAKIN_API_KEY"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

client = OpenAI(api_key=OPENAI_API_KEY)

def needs_fresh_data(query: str) -> bool:
    trigger_words = ["current", "latest", "today", "now", "price", "recent", "2024", "2025"]
    return any(word in query.lower() for word in trigger_words)

def web_search(query: str, num_results: int = 3) -> list[dict]:
    resp = requests.get(
        "https://api.anakin.ai/v1/search",
        headers={"Authorization": f"Bearer {ANAKIN_API_KEY}"},
        params={"q": query, "limit": num_results},
        timeout=10,
    )
    resp.raise_for_status()
    return resp.json().get("results", [])

def scrape_url(url: str) -> str:
    resp = requests.post(
        "https://api.anakin.ai/v1/scrape",
        headers={"Authorization": f"Bearer {ANAKIN_API_KEY}", "Content-Type": "application/json"},
        json={"url": url, "format": "markdown"},
        timeout=15,
    )
    if resp.status_code != 200:
        return ""
    return resp.json().get("content", "")[:3000]  # cap tokens per source

def build_context(query: str, local_chunks: list[str]) -> str:
    sections = []

    if local_chunks:
        sections.append("## Internal Knowledge\n" + "\n\n".join(local_chunks))

    if needs_fresh_data(query):
        search_results = web_search(query)
        web_sections = []
        for result in search_results:
            url = result.get("url", "")
            title = result.get("title", url)
            content = scrape_url(url)
            if content:
                web_sections.append(f"### {title}\nSource: {url}\n\n{content}")
        if web_sections:
            sections.append("## Live Web Results\n" + "\n\n".join(web_sections))

    return "\n\n".join(sections)

def answer(query: str, local_chunks: list[str]) -> str:
    context = build_context(query, local_chunks)
    system_prompt = (
        "You are a helpful assistant. Answer the user's question using the provided context. "
        "If you cite information from a web source, mention the source URL. "
        "If the context does not contain enough information, say so clearly."
    )
    user_message = f"Context:\n{context}\n\nQuestion: {query}"
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ],
        temperature=0.2,
    )
    return response.choices[0].message.content

# Example usage
local_knowledge = [
    "Our refund policy allows returns within 30 days of purchase.",
    "Enterprise plans include SSO and audit logs.",
]
query = "What is the current price of GPT-4o API calls?"
print(answer(query, local_knowledge))
Enter fullscreen mode Exit fullscreen mode

A few things worth noting in this code:

  • The scrape_url function caps content at 3000 characters per source. You will want to tune this based on how many sources you pull and what your context window budget looks like. With three sources at 3000 chars each plus your local chunks, you are comfortably under 16k tokens.
  • needs_fresh_data is intentionally simple. In production you would replace this with a proper classifier or even a quick LLM call with a boolean output schema.
  • The system prompt tells the model to cite sources when using web content. This matters for user trust and for debugging when something goes wrong.

Keeping Latency Manageable

The obvious concern with adding live web requests to a RAG pipeline is latency. A search call plus three scrape calls adds up. A few approaches that help:

  • Run search and scrape in parallel using asyncio or concurrent.futures. The three scrape calls should happen simultaneously, not serially.
  • Cache search results for identical or near-identical queries with a short TTL, maybe 5 to 15 minutes depending on how time-sensitive your domain is.
  • Set aggressive timeouts. A scrape that takes more than 10 seconds is probably not worth waiting for. Fail gracefully and use whatever you got.
  • Limit scraping to the top two results rather than five. Marginal sources past the second result rarely change the answer quality, but they do add latency.

With parallelism and reasonable timeouts, you can usually keep the live web path under two seconds of added latency for the retrieval step.

Where to Go from Here

The pattern above handles the common case: detect, search, scrape, merge. But there is a more interesting direction when you need depth rather than breadth. Anakin's Agentic Search does the research loop for you, returning a synthesized answer with sources rather than raw search results. That can be a better fit when the query needs multi-step reasoning across sources rather than a simple "what does this page say."

The thing I would tackle next is making the freshness classifier smarter. Right now it is a keyword list. A better version would use embeddings to detect semantic similarity to a set of "time-sensitive topics" defined for your specific domain. That gets you higher precision without much more code.

Static RAG is a starting point, not a destination. The web is your database. Treat retrieval accordingly.

Top comments (0)