How I Built a Smart Q&A Bot for a Local Music Venue in a Weekend — A Practical Introduction to RAG Systems
A step-by-step, beginner-friendly tutorial covering embeddings, vector databases, LLM orchestration, API design, automated evaluation, and Docker deployment — all from scratch with Python.
The Story Behind This Project
A few weeks ago I picked up a small freelance gig on Upwork. The client was "GigFinder", a network of independent music venues across Berlin. Their problem was simple to describe but hard to solve: hundreds of upcoming shows in a database, yet visitors kept emailing the same questions — "Any blues shows next Friday?", "What's on at Kreuzberg venues this month?", "Are there any free acoustic gigs for families?"
They wanted a smart assistant that could answer those questions in plain English, pulling facts from their real, constantly updated show listings — not a generic chatbot that would hallucinate fake events.
I told them I could build a proof-of-concept over the weekend. Spoiler: I delivered it Sunday evening, running inside a Docker container, accessible through a clean REST API, with automated quality metrics.
The technology that made it possible is called RAG — Retrieval-Augmented Generation. This article walks you through every single step, explains every concept, and by the end you'll have a fully working system you can adapt to any domain.
Table of Contents
- What Exactly Is RAG?
- The Toolbox: LangChain, FAISS, Mistral, FastAPI
- Understanding Embeddings — The Key Idea Behind Everything
- Setting Up the Project
- Fetching Real Data from an External API
- Cleaning and Structuring Messy Real-World Data
- Chunking: Why Size Matters in Retrieval
- Building the Vector Index with FAISS
- The RAG Chain: Connecting Retrieval to Generation
- Wrapping It in a REST API with FastAPI
- Evaluating Quality with RAGAS
- Writing Unit Tests Across the Entire Pipeline
- Packaging Everything in Docker
- Architecture Overview
- Full File Recap — Copy-Paste-Ready Code
- Resources and Further Reading
1. What Exactly Is RAG? (And Why Should You Care?)
The Open-Book Exam Analogy
The easiest way to understand RAG is to think about two kinds of exams.
Closed-book exam (= a plain LLM). You rely only on what you memorized. You might remember the gist of things, but details can be fuzzy or just plain wrong. That's what happens when you ask ChatGPT about your specific, private, rapidly changing data — it sounds right, but it might be making things up.
Open-book exam (= a RAG system). You bring a perfectly organized binder. Before answering each question, you flip to the right page, find the relevant facts, and then write your answer using both your general knowledge AND the specific information in front of you. That's RAG.
RAG = Retrieval-Augmented Generation. Two steps:
- Retrieve the most relevant pieces of information from your data.
- Generate a natural-language answer using those pieces as context.
A RAG system has two phases: Retrieval (finding relevant documents) and Generation (producing an answer). Source: Azure Databricks
Why Not Just Use an LLM Directly?
| Problem | Without RAG | With RAG |
|---|---|---|
| Hallucination | Invents plausible-sounding but false info | Answers grounded in real data |
| Stale knowledge | Training cutoff — doesn't know new events | Fresh data injected at query time |
| No specificity | Generic answers about "music events in general" | Precise answers about YOUR events |
RAG vs. Traditional Search
Traditional search uses keyword matching: the word "blues" in your query must literally appear in the document. RAG uses semantic search: the system understands that "any live blues acts?" and "upcoming rhythm and blues performances" mean the same thing, even without shared words. This is possible thanks to embeddings.
2. The Toolbox: LangChain, FAISS, Mistral, FastAPI
Here's each tool and what it does in our project:
LangChain — The orchestrator. It chains together retrieval and generation into a clean pipeline. Without it, you'd write a lot of glue code yourself.
FAISS (Facebook AI Similarity Search) — The memory. Stores all event vectors and finds the most similar ones in milliseconds. Free, local, no cloud needed. See also: FAISS: The Missing Manual (Pinecone).
Mistral AI — The brain. We use it for two things: converting text into vectors (embeddings) and generating natural-language answers. Free tier available.
FastAPI — The interface. Turns our Python code into a REST API with automatic Swagger documentation.
Supporting cast: Docker (packaging), RAGAS (evaluation), pytest (testing).
3. Understanding Embeddings — The Key Idea Behind Everything
This concept is so important it needs its own section. If you understand embeddings, everything else clicks.
What Is an Embedding?
An embedding is a list of numbers (a vector) that represents the meaning of a piece of text. The crucial property: texts with similar meanings produce vectors that are close together.
"blues concert Friday night" → [0.82, -0.14, 0.51, ...]
"live blues music this weekend" → [0.79, -0.12, 0.49, ...] ← Very close!
"pottery workshop for beginners" → [-0.33, 0.71, -0.28, ...] ← Far away
Similar meanings cluster together. Source: Google ML Crash Course
A Tiny Code Example: Cosine Similarity
Let's see how we measure "closeness" between two vectors:
import numpy as np
First we define the function. Cosine similarity measures the angle between two vectors — if they point in the same direction, they have similar meaning:
def cosine_similarity(a, b):
"""
Returns a value between -1 and 1.
1.0 = identical direction (same meaning)
0.0 = perpendicular (unrelated)
"""
dot_product = np.dot(a, b)
magnitude = np.linalg.norm(a) * np.linalg.norm(b)
if magnitude == 0:
return 0.0
return dot_product / magnitude
Now let's test it with some fake vectors:
v_blues = np.array([0.82, -0.14, 0.51, 0.23])
v_music = np.array([0.79, -0.12, 0.49, 0.21])
v_pottery = np.array([-0.33, 0.71, -0.28, 0.64])
print(f"Blues ↔ Music: {cosine_similarity(v_blues, v_music):.4f}") # ~0.99
print(f"Blues ↔ Pottery: {cosine_similarity(v_blues, v_pottery):.4f}") # ~-0.17
See? "Blues" and "Music" are nearly identical in vector space, while "Pottery" is far away. That's how semantic search works — we don't match keywords, we match meaning.
Don't worry — you won't implement this yourself in the actual project. FAISS handles it. But understanding the concept makes debugging much easier.
Deeper reading: Google's Embeddings Guide and Pinecone's What Are Embeddings.
4. Setting Up the Project
Let's create a clean, reproducible project. Anyone who clones your repo should be able to run everything just by reading the README.
Create the folder structure
mkdir music-venue-rag && cd music-venue-rag
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
mkdir -p src tests data
touch src/__init__.py src/data_fetcher.py src/preprocessing.py
touch src/chunking.py src/vectorstore.py src/rag_chain.py
touch src/api.py src/evaluation.py
touch tests/__init__.py tests/test_pipeline.py
touch .env .gitignore requirements.txt Dockerfile README.md
Install dependencies
pip install \
langchain==0.3.25 \
langchain-community==0.3.24 \
langchain-mistralai==0.2.13 \
faiss-cpu==1.11.0 \
fastapi==0.115.12 \
uvicorn==0.34.3 \
requests==2.32.3 \
pandas==2.2.3 \
python-dotenv==1.1.0 \
httpx==0.28.1 \
ragas==0.2.15 \
pytest==8.3.5 \
beautifulsoup4==4.13.4
Then freeze your versions so the project is reproducible:
pip freeze > requirements.txt
Set up your secret keys
Create a .env file (this file should never be committed to Git):
# .env
MISTRAL_API_KEY=your_key_here
OPENAGENDA_API_KEY=your_key_here
And protect it with .gitignore:
venv/
.env
__pycache__/
data/faiss_index/
*.pyc
Why
faiss-cpu? Portability. It works everywhere — your laptop, Docker, a colleague's machine. For a few thousand vectors, it's plenty fast. Usefaiss-gpuonly if you have millions of vectors and an NVIDIA GPU.
5. Fetching Real Data from an External API
For this project I used the OpenAgenda API — a free, open platform that aggregates event listings from thousands of cultural organizations. Let's build the data fetcher step by step.
Step 5.1 — The imports and setup
# src/data_fetcher.py
import os
import json
import time
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Load API keys from .env file
load_dotenv()
Nothing fancy here — requests for HTTP calls, dotenv to load our API key safely.
Step 5.2 — Fetching events from one agenda
An "agenda" on OpenAgenda is a collection of events (a venue, a city, etc.). The API returns max 100 events per page, so we need to paginate:
def fetch_events_from_agenda(
agenda_uid: int,
api_key: str,
max_events: int = 300,
months_back: int = 12,
) -> list[dict]:
"""
Retrieve events from one OpenAgenda agenda.
Parameters:
agenda_uid — Find this in the URL on openagenda.com
api_key — Your OpenAgenda API key
max_events — Cap to keep things manageable during dev
months_back — How far back to look (12 months max recommended)
"""
# Only fetch events from the last N months
since = (
datetime.now() - timedelta(days=30 * months_back)
).strftime("%Y-%m-%dT00:00:00Z")
url = f"https://api.openagenda.com/v2/agendas/{agenda_uid}/events"
collected = []
offset = 0
Now the main loop. We keep fetching pages until we run out of events or hit our limit:
while len(collected) < max_events:
# Build query parameters for this page
params = {
"key": api_key,
"timings[gte]": since, # Only events after this date
"size": min(100, max_events - len(collected)),
"offset": offset,
"sort": "timingsStart.desc", # Newest first
}
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status() # Will raise an error for 4xx/5xx
payload = resp.json()
except requests.exceptions.HTTPError as e:
print(f" [HTTP ERROR] {e}")
break
except requests.exceptions.ConnectionError:
print(" [CONNECTION ERROR] Can't reach OpenAgenda")
break
except requests.exceptions.Timeout:
print(" [TIMEOUT] Request took too long")
break
except requests.exceptions.RequestException as e:
print(f" [ERROR] {e}")
break
Why so many except blocks? Because real APIs fail in different ways, and you want to know why.
Now we extract the events from the response and decide whether to keep paginating:
events = payload.get("events", [])
if not events:
break # No more events available
collected.extend(events)
offset += len(events)
# Did we fetch everything?
if offset >= payload.get("total", 0):
break
# Be nice — don't hammer the API
time.sleep(0.5)
print(f" -> Fetched {len(collected)} events from agenda {agenda_uid}")
return collected
Step 5.3 — Fetching from multiple agendas
A city usually has several agendas (one per venue, one per cultural association, etc.). We loop over all of them:
def fetch_all_events(
agenda_uids: list[int],
max_per_agenda: int = 200,
) -> list[dict]:
"""Fetch events from multiple agendas and combine them."""
api_key = os.getenv("OPENAGENDA_API_KEY")
if not api_key:
raise ValueError(
"OPENAGENDA_API_KEY is missing! "
"Get one free at https://openagenda.com and add it to .env"
)
all_events = []
for uid in agenda_uids:
print(f"Fetching agenda {uid}...")
all_events.extend(
fetch_events_from_agenda(uid, api_key, max_per_agenda)
)
print(f"\nTotal events collected: {len(all_events)}")
return all_events
Step 5.4 — Saving raw data to disk
Always save your raw data. You don't want to re-fetch from the API every time you test something:
if __name__ == "__main__":
# Replace with real agenda UIDs from openagenda.com
AGENDAS = [82837550]
events = fetch_all_events(AGENDAS, max_per_agenda=250)
os.makedirs("data", exist_ok=True)
with open("data/raw_events.json", "w", encoding="utf-8") as f:
json.dump(events, f, ensure_ascii=False, indent=2)
print(f"Saved {len(events)} events to data/raw_events.json")
What does the raw data look like?
Each event comes back as a rich JSON object:
{
"uid": 48291037,
"title": { "fr": "Nuit du Blues", "en": "Blues Night" },
"description": { "fr": "<p>Soirée <strong>blues</strong> avec...</p>" },
"location": {
"name": "Zenith Arena",
"address": "15 Berliner Str.",
"city": "Berlin",
"latitude": 52.52, "longitude": 13.40
},
"timings": [
{ "begin": "2025-08-22T20:00:00+0200", "end": "2025-08-23T01:00:00+0200" }
],
"keywords": { "fr": ["blues", "concert", "live"] }
}
Notice the HTML in descriptions, the multilingual fields, the nested objects. That's real-world data — messy. Let's clean it up.
6. Cleaning and Structuring Messy Real-World Data
Golden rule: garbage in, garbage out. If your descriptions still have <strong> tags when they get embedded, the model will encode that noise.
Step 6.1 — Stripping HTML
# src/preprocessing.py
import re
import json
from datetime import datetime
from bs4 import BeautifulSoup
The first utility removes HTML tags. We use BeautifulSoup because regex fails on edge cases (nested tags, unclosed tags, etc.):
def strip_html(text: str) -> str:
"""
Remove all HTML tags and decode entities.
Examples:
"<p>A <b>great</b> show!&</p>" → "A great show!&"
"" → ""
None → ""
"""
if not text:
return ""
cleaned = BeautifulSoup(text, "html.parser").get_text(separator=" ")
# Collapse multiple spaces into one
return re.sub(r"\s+", " ", cleaned).strip()
Step 6.2 — Handling multilingual fields
OpenAgenda stores titles and descriptions as dictionaries like {"fr": "...", "en": "..."}. We need a helper that extracts the French version (or falls back to whatever's available):
def extract_multilingual(field, preferred_lang: str = "fr") -> str:
"""
Extract text from a potentially multilingual field.
Handles three cases:
- dict like {"fr": "Bonjour", "en": "Hello"} → returns "Bonjour"
- plain string → returns as-is
- None → returns ""
"""
if isinstance(field, dict):
return field.get(preferred_lang, "") or next(iter(field.values()), "")
if isinstance(field, str):
return field
return ""
Step 6.3 — Parsing dates
Events have timing data in ISO format. We parse it into something human-readable:
def parse_timings(timings: list[dict]) -> dict:
"""
Convert raw timing data into usable date info.
Returns both the raw ISO string (for filtering)
and a friendly formatted string (for display in answers).
"""
fallback = {
"start_iso": None, "end_iso": None,
"weekday": None, "display": "Date not available",
}
if not timings:
return fallback
try:
begin = datetime.fromisoformat(timings[0]["begin"])
weekdays = [
"Monday", "Tuesday", "Wednesday", "Thursday",
"Friday", "Saturday", "Sunday",
]
return {
"start_iso": timings[0]["begin"],
"end_iso": timings[0].get("end", ""),
"weekday": weekdays[begin.weekday()],
"display": begin.strftime(
f"{weekdays[begin.weekday()]}, %B %d %Y at %H:%M"
),
}
except (ValueError, KeyError, IndexError):
return fallback
Notice the try/except — real data has missing or malformed dates. We don't crash; we just say "Date not available."
Step 6.4 — Processing a single event
Now we bring it all together. This function takes one raw API event and produces a clean document with two parts:
- text — a single string combining all info, optimized for embedding
- metadata — structured fields for filtering and display
def process_event(raw: dict) -> dict | None:
"""
Transform one raw event into a clean document.
Returns None if there's not enough data (no title AND no description).
"""
# Extract and clean text fields
title = strip_html(extract_multilingual(raw.get("title")))
description = strip_html(extract_multilingual(raw.get("description")))
details = strip_html(extract_multilingual(raw.get("longDescription")))
# Skip events with nothing useful
if not title and not description:
return None
# Extract location info
loc = raw.get("location") or {}
venue = loc.get("name", "Unknown venue")
address = loc.get("address", "")
city = loc.get("city", "")
lat = loc.get("latitude")
lon = loc.get("longitude")
# Parse dates
dates = parse_timings(raw.get("timings", []))
# Extract keywords/tags
kw_field = raw.get("keywords")
keywords = []
if isinstance(kw_field, dict):
keywords = kw_field.get("fr", []) or []
elif isinstance(kw_field, list):
keywords = kw_field
Now the important part — building the unified text. This is what FAISS will index. We want it to contain ALL searchable information:
# Build the text that will be embedded
# Including everything means a search for "jazz Friday Kreuzberg"
# can match on title, description, venue, date, or tags
parts = [f"Event: {title}"]
if description: parts.append(f"Description: {description}")
if details: parts.append(f"Details: {details}")
if venue: parts.append(f"Venue: {venue}")
if city: parts.append(f"City: {city}")
if address: parts.append(f"Address: {address}")
parts.append(f"Date: {dates['display']}")
if keywords: parts.append(f"Tags: {', '.join(keywords)}")
return {
"text": "\n".join(parts),
"metadata": {
"uid": raw.get("uid"),
"title": title,
"venue": venue,
"city": city,
"address": address,
"start_date": dates["start_iso"],
"end_date": dates["end_iso"],
"weekday": dates["weekday"],
"formatted_date": dates["display"],
"keywords": keywords,
"lat": lat,
"lon": lon,
},
}
Step 6.5 — Processing all events at once
def process_all_events(raw_events: list[dict]) -> list[dict]:
"""Process a batch, filtering out incomplete ones."""
results = []
skipped = 0
for ev in raw_events:
doc = process_event(ev)
if doc:
results.append(doc)
else:
skipped += 1
print(f"Processed: {len(results)} events | Skipped: {skipped}")
return results
And the standalone runner:
if __name__ == "__main__":
with open("data/raw_events.json", encoding="utf-8") as f:
raw = json.load(f)
processed = process_all_events(raw)
with open("data/processed_events.json", "w", encoding="utf-8") as f:
json.dump(processed, f, ensure_ascii=False, indent=2)
print(f"Saved to data/processed_events.json")
7. Chunking: Why Size Matters in Retrieval
Why do we need to split text?
Two reasons:
- Embedding models have input limits (often 512–8192 tokens).
- Smaller chunks = more precise retrieval. Imagine embedding a 2000-word description as one vector. A query like "blues concert" might match it, but what if "blues" was just mentioned once in a paragraph about food options? You'd retrieve a mostly irrelevant block. Smaller chunks mean the match is more targeted.
Overlapping chunks prevent info loss at boundaries. Source: Pinecone
Step 7.1 — The chunking function
# src/chunking.py
from langchain.text_splitter import RecursiveCharacterTextSplitter
We use LangChain's RecursiveCharacterTextSplitter. It tries to split at paragraph breaks first, then sentences, then words — respecting natural text boundaries:
def create_chunks(
processed_events: list[dict],
chunk_size: int = 500,
chunk_overlap: int = 100,
) -> list[dict]:
"""
Split each event's text into overlapping chunks.
chunk_size: Max characters per chunk (500 is good for event texts)
chunk_overlap: Characters shared between consecutive chunks (prevents
losing info that sits at a boundary)
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
# Where to prefer splitting:
separators=["\n\n", "\n", ". ", ", ", " ", ""],
length_function=len,
)
Now we loop over events. Each chunk inherits all metadata from its parent event, so we can always trace back where it came from:
all_chunks = []
for event in processed_events:
pieces = splitter.split_text(event["text"])
for idx, piece_text in enumerate(pieces):
all_chunks.append({
"text": piece_text,
"metadata": {
**event["metadata"], # Copy all parent metadata
"chunk_index": idx, # Which chunk is this?
"total_chunks": len(pieces), # How many total?
},
})
avg = len(all_chunks) / max(len(processed_events), 1)
print(f"Created {len(all_chunks)} chunks from "
f"{len(processed_events)} events (avg {avg:.1f} per event)")
return all_chunks
How to choose chunk_size?
| chunk_size | Precision | Context per chunk | Best for |
|---|---|---|---|
| 200 | Very high | Limited | FAQs, short docs |
| 500 | Good balance | Good | Event descriptions |
| 1000 | Lower | Rich | Long articles |
8. Building the Vector Index with FAISS
Now we convert text chunks into vectors and store them. This is the heart of the retrieval system.
Quick overview: FAISS index types
| Index | How it works | When to use |
|---|---|---|
IndexFlatL2 |
Compares query to every vector (exact) | < 50K vectors (our case) |
IndexIVFFlat |
Clusters vectors, searches only nearby | 50K–5M vectors |
IndexHNSWFlat |
Graph-based approximate search | Speed-critical, any size |
For our POC, IndexFlatL2 (the default in LangChain) is perfect — exact results, zero configuration.
Step 8.1 — Create the embedding model
# src/vectorstore.py
import os
import json
from dotenv import load_dotenv
from langchain_mistralai import MistralAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
load_dotenv()
First, a function to initialize the Mistral embedding model:
def get_embeddings() -> MistralAIEmbeddings:
"""
Create the Mistral embedding model.
Uses 'mistral-embed' which outputs 1024-dimensional vectors.
"""
key = os.getenv("MISTRAL_API_KEY")
if not key:
raise ValueError("Set MISTRAL_API_KEY in your .env file")
return MistralAIEmbeddings(model="mistral-embed", api_key=key)
Step 8.2 — Build the index
This is where text becomes vectors. LangChain handles the API batching and index creation:
def build_index(chunks: list[dict], save_dir: str = "data/faiss_index") -> FAISS:
"""
Build a FAISS vector store and save it to disk.
Steps:
1. Wrap each chunk as a LangChain Document (text + metadata)
2. Call Mistral API to generate embeddings
3. FAISS creates the index automatically
4. Save to disk so we don't re-embed on restart
"""
# Step 1: Wrap as Documents
docs = [
Document(page_content=c["text"], metadata=c["metadata"])
for c in chunks
]
print(f"Building index from {len(docs)} documents...")
# Step 2 + 3: Embed and build index
embeddings = get_embeddings()
store = FAISS.from_documents(documents=docs, embedding=embeddings)
# Step 4: Save to disk (creates index.faiss + index.pkl)
os.makedirs(save_dir, exist_ok=True)
store.save_local(save_dir)
print(f"Index saved ({store.index.ntotal} vectors)")
return store
Step 8.3 — Load from disk (for fast restarts)
def load_index(save_dir: str = "data/faiss_index") -> FAISS:
"""Load a previously saved index. Much faster than re-embedding."""
embeddings = get_embeddings()
store = FAISS.load_local(
save_dir, embeddings, allow_dangerous_deserialization=True
)
print(f"Loaded index: {store.index.ntotal} vectors")
return store
Step 8.4 — Search function
def search(store: FAISS, query: str, k: int = 5) -> list[tuple]:
"""Find the k most similar documents. Lower score = more similar."""
return store.similarity_search_with_score(query, k=k)
Step 8.5 — Test it works
if __name__ == "__main__":
from chunking import create_chunks
with open("data/processed_events.json", encoding="utf-8") as f:
processed = json.load(f)
chunks = create_chunks(processed)
store = build_index(chunks)
# Quick test
print("\n--- Search Test ---")
for q in ["blues concert", "family outdoor", "electronic late night"]:
hits = search(store, q, k=2)
print(f'\n"{q}"')
for doc, score in hits:
print(f" [{score:.3f}] {doc.metadata.get('title', '?')}")
9. The RAG Chain: Connecting Retrieval to Generation
Now we connect FAISS (retrieval) to Mistral (generation) through LangChain.
The flow
User Question → Embed → FAISS Search → Top-k chunks → Prompt + Context → Mistral → Answer
Step 9.1 — The prompt template
This is arguably the most important piece of the entire system. It tells the LLM how to behave:
# src/rag_chain.py
import os
from dotenv import load_dotenv
from langchain_mistralai import ChatMistralAI
from langchain.prompts import ChatPromptTemplate
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain_community.vectorstores import FAISS
load_dotenv()
The prompt:
PROMPT = """You are a helpful assistant for a network of music venues.
Your job is to answer questions about upcoming shows based ONLY on
the context provided below.
Rules:
1. Use ONLY information from the context. Do not use prior knowledge.
2. If the context doesn't have enough info, say so. Never invent events.
3. Include specifics when available: names, dates, venues, prices.
4. Be friendly and conversational.
5. If several events match, list them clearly.
CONTEXT:
{context}
QUESTION:
{input}
ANSWER:"""
Key design decisions:
- "ONLY on the context" → prevents hallucination
- "say so honestly" → avoids making stuff up when data is thin
- "specifics" → pushes the model to cite real facts
- "friendly" → good user experience
Step 9.2 — Create the LLM
def create_llm(
model: str = "mistral-small-latest",
temperature: float = 0.2,
) -> ChatMistralAI:
"""
Initialize the Mistral LLM.
Temperature = randomness:
0.0-0.3 = factual, focused (good for Q&A)
0.7-1.0 = creative (good for stories)
"""
return ChatMistralAI(
model=model,
api_key=os.getenv("MISTRAL_API_KEY"),
temperature=temperature,
max_tokens=1024,
)
Step 9.3 — Assemble the chain
def build_chain(vectorstore: FAISS, top_k: int = 5):
"""
Assemble the full RAG chain.
top_k: how many chunks to retrieve per question.
More = richer context, but also more noise and cost.
"""
# Turn the FAISS store into a LangChain retriever
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": top_k},
)
llm = create_llm()
prompt = ChatPromptTemplate.from_template(PROMPT)
# "Stuff" = concatenate all retrieved docs into the prompt
doc_chain = create_stuff_documents_chain(llm=llm, prompt=prompt)
# Connect retriever → doc_chain → LLM
return create_retrieval_chain(
retriever=retriever, combine_docs_chain=doc_chain
)
Step 9.4 — The ask function
def ask(chain, question: str) -> dict:
"""Send a question through the pipeline, get a structured answer."""
result = chain.invoke({"input": question})
return {
"question": question,
"answer": result.get("answer", "Sorry, couldn't generate an answer."),
"sources": [
{
"text": doc.page_content,
"title": doc.metadata.get("title", ""),
"venue": doc.metadata.get("venue", ""),
"date": doc.metadata.get("formatted_date", ""),
}
for doc in result.get("context", [])
],
}
Step 9.5 — Test it
if __name__ == "__main__":
from vectorstore import load_index
store = load_index()
chain = build_chain(store)
for q in ["Any blues shows Friday?", "Free family concerts this weekend?"]:
print(f"\n{'='*50}\nQ: {q}\n{'='*50}")
resp = ask(chain, q)
print(f"A: {resp['answer']}")
print(f"({len(resp['sources'])} sources)")
10. Wrapping It in a REST API with FastAPI
A Python script is great for development. But stakeholders need an API they can call. FastAPI gives us input validation, error handling, and automatic Swagger documentation.
Step 10.1 — Define request/response schemas
These Pydantic models do double duty: validate input AND generate the API docs.
# src/api.py
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from vectorstore import load_index, build_index
from rag_chain import build_chain, ask
from data_fetcher import fetch_all_events
from preprocessing import process_all_events
from chunking import create_chunks
class AskRequest(BaseModel):
question: str = Field(
...,
min_length=3, # Reject empty or trivial questions
max_length=500,
description="Your question about shows and events.",
examples=["Any blues shows this Friday?"],
)
num_results: int = Field(default=5, ge=1, le=20)
class Source(BaseModel):
title: str | None = None
venue: str | None = None
city: str | None = None
date: str | None = None
excerpt: str
class AskResponse(BaseModel):
question: str
answer: str
sources: list[Source]
num_sources: int
class HealthResponse(BaseModel):
status: str
index_size: int
message: str
Step 10.2 — Load resources at startup (not per request)
We load the heavy stuff (vector store, RAG chain) ONCE when the server starts. This makes responses fast:
_store = None
_chain = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _store, _chain
idx = os.getenv("FAISS_INDEX_PATH", "data/faiss_index")
try:
_store = load_index(idx)
_chain = build_chain(_store)
print("[READY] RAG system loaded.")
except Exception as e:
print(f"[WARN] Could not load index: {e}")
yield
print("[SHUTDOWN]")
Step 10.3 — Create the app
app = FastAPI(
title="GigFinder — Music Venue RAG API",
description="Ask questions about upcoming shows. Powered by FAISS + Mistral + LangChain.",
version="1.0.0",
lifespan=lifespan,
)
Step 10.4 — The /ask endpoint
This is the main endpoint. Notice the error handling:
@app.post("/ask", response_model=AskResponse)
async def handle_ask(req: AskRequest):
"""Ask a question. Returns an answer + source documents."""
if _chain is None:
raise HTTPException(503, "System not ready. Use POST /rebuild first.")
try:
result = ask(_chain, req.question)
sources = [
Source(
title=s.get("title"),
venue=s.get("venue"),
date=s.get("date"),
excerpt=s.get("text", "")[:200],
)
for s in result["sources"]
]
return AskResponse(
question=req.question,
answer=result["answer"],
sources=sources,
num_sources=len(sources),
)
except Exception as e:
raise HTTPException(500, f"Processing error: {e}")
Step 10.5 — Health check and metadata
@app.get("/health", response_model=HealthResponse)
async def health():
"""Is the system running? How many vectors are indexed?"""
if _store is None:
return HealthResponse(status="degraded", index_size=0,
message="No index loaded.")
return HealthResponse(
status="healthy",
index_size=_store.index.ntotal,
message="System operational.",
)
@app.get("/metadata")
async def metadata():
"""System info for monitoring."""
return {
"status": "loaded" if _store else "empty",
"index_vectors": _store.index.ntotal if _store else 0,
}
Step 10.6 — The /rebuild endpoint
This lets you refresh the data without restarting the server:
@app.post("/rebuild")
async def rebuild(agenda_uids: list[int] = [82837550], max_events: int = 200):
"""Re-fetch data from OpenAgenda and rebuild the FAISS index."""
global _store, _chain
try:
raw = fetch_all_events(agenda_uids, max_per_agenda=max_events)
processed = process_all_events(raw)
chunks = create_chunks(processed)
_store = build_index(chunks)
_chain = build_chain(_store)
return {
"status": "success",
"events_processed": len(processed),
"chunks_indexed": len(chunks),
"total_vectors": _store.index.ntotal,
}
except Exception as e:
raise HTTPException(500, f"Rebuild failed: {e}")
Testing your API
# Start the server
uvicorn src.api:app --host 0.0.0.0 --port 8000 --reload
# Test with curl
curl -X POST http://localhost:8000/ask \
-H "Content-Type: application/json" \
-d '{"question": "Any blues shows this Friday?"}'
# Test with Python
import requests
r = requests.post("http://localhost:8000/ask",
json={"question": "Free gigs this weekend?"})
print(r.json()["answer"])
# Or open http://localhost:8000/docs for interactive Swagger UI!
11. Evaluating Quality with RAGAS
Your system seems to work. But how well? "Looks good" isn't a metric. RAGAS provides purpose-built metrics for RAG systems.
The metrics
| Metric | What It Measures | Catches |
|---|---|---|
| Faithfulness | Is the answer grounded in the retrieved context? | Hallucinations |
| Answer Relevancy | Is the answer relevant to the question? | Off-topic responses |
| Context Precision | Were the retrieved docs actually useful? | Bad retrieval |
Step 11.1 — Create an annotated test dataset
This requires manual work but it's the only way to evaluate meaningfully. Each entry has a question and the expected correct answer:
# src/evaluation.py
import os
import json
from datetime import datetime
from dotenv import load_dotenv
from ragas import evaluate
from ragas.metrics import (
Faithfulness, ResponseRelevancy,
LLMContextPrecisionWithoutReference,
)
from ragas.dataset_schema import SingleTurnSample, EvaluationDataset
from langchain_mistralai import ChatMistralAI
from ragas.llms import LangchainLLMWrapper
load_dotenv()
TEST_SET = [
{
"question": "Any blues shows this Friday night?",
"ground_truth": "The Muddy Waters Tribute Band plays at Kantine "
"this Friday at 21:00. Entry is 12 euros.",
},
{
"question": "What free concerts are happening outdoors?",
"ground_truth": "Free acoustic session in Mauerpark on Saturday "
"at 15:00 with local singer-songwriters.",
},
{
"question": "Family-friendly shows on Sunday?",
"ground_truth": "Sunday Matinee at SO36, all-ages folk music "
"starting at 14:00. Tickets 5 euros.",
},
{
"question": "Electronic music venues tonight?",
"ground_truth": "Tresor has techno at 23:00; Berghain opens "
"at midnight with minimal techno.",
},
{
"question": "Jazz events in Kreuzberg this month?",
"ground_truth": "Kreuzberg Jazz Festival from the 15th to 18th "
"at Lido and Bi Nuu, evenings at 20:00.",
},
]
Step 11.2 — Run each question through the RAG chain
def run_evaluation(rag_chain, test_data=None) -> dict:
test_data = test_data or TEST_SET
print(f"Evaluating {len(test_data)} questions...")
# Generate answers for all test questions
samples = []
for i, item in enumerate(test_data):
print(f" [{i+1}/{len(test_data)}] {item['question'][:40]}...")
result = rag_chain.invoke({"input": item["question"]})
samples.append(SingleTurnSample(
user_input=item["question"],
response=result.get("answer", ""),
retrieved_contexts=[
doc.page_content for doc in result.get("context", [])
],
reference=item["ground_truth"],
))
Step 11.3 — Compute RAGAS metrics
# Set up the evaluator LLM
eval_llm = LangchainLLMWrapper(ChatMistralAI(
model="mistral-small-latest",
api_key=os.getenv("MISTRAL_API_KEY"),
temperature=0.1,
))
metrics = [
Faithfulness(llm=eval_llm),
ResponseRelevancy(llm=eval_llm),
LLMContextPrecisionWithoutReference(llm=eval_llm),
]
print("Computing RAGAS metrics...")
dataset = EvaluationDataset(samples=samples)
results = evaluate(dataset=dataset, metrics=metrics)
Step 11.4 — Build and save the report
report = {
"timestamp": datetime.now().isoformat(),
"num_questions": len(test_data),
"aggregate": {
"faithfulness": float(results["faithfulness"]),
"answer_relevancy": float(results["response_relevancy"]),
"context_precision": float(
results["llm_context_precision_without_reference"]
),
},
"per_question": [],
}
df = results.to_pandas()
for idx, row in df.iterrows():
report["per_question"].append({
"question": test_data[idx]["question"],
"faithfulness": float(row.get("faithfulness", 0)),
"relevancy": float(row.get("response_relevancy", 0)),
})
return report
Step 11.5 — Display and save
def print_report(report):
print("\n" + "=" * 55)
print(" RAG EVALUATION REPORT")
print("=" * 55)
m = report["aggregate"]
print(f" Faithfulness: {m['faithfulness']:.3f}")
print(f" Answer Relevancy: {m['answer_relevancy']:.3f}")
print(f" Context Precision: {m['context_precision']:.3f}")
for d in report["per_question"]:
print(f"\n Q: {d['question'][:45]}...")
print(f" Faith: {d['faithfulness']:.2f} | Relev: {d['relevancy']:.2f}")
print("=" * 55)
def save_report(report, path="data/eval_report.json"):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
json.dump(report, f, indent=2)
print(f"Report saved to {path}")
Interpreting scores
| Range | Meaning | What to do |
|---|---|---|
| 0.8–1.0 | Excellent | Ship it! |
| 0.6–0.8 | Solid | Tune prompts or chunking |
| 0.4–0.6 | Weak | Rethink retrieval + prompt |
| < 0.4 | Broken | Something fundamental is off |
12. Writing Unit Tests Across the Entire Pipeline
Tests catch regressions before your users do. We test every stage.
Step 12.1 — Sample data for tests
# tests/test_pipeline.py
import pytest
SAMPLE_RAW_EVENT = {
"uid": 99999,
"title": {"fr": "Test Blues Night"},
"description": {"fr": "<p>A <strong>great</strong> blues show&</p>"},
"longDescription": {"fr": "Three bands perform live."},
"location": {
"name": "Test Club", "address": "42 Test St",
"city": "Berlin", "latitude": 52.52, "longitude": 13.40,
},
"timings": [
{"begin": "2025-08-22T20:00:00+0200",
"end": "2025-08-22T23:00:00+0200"}
],
"keywords": {"fr": ["blues", "live"]},
}
Step 12.2 — Test HTML cleaning
class TestHTMLCleaning:
def test_strips_tags(self):
from src.preprocessing import strip_html
assert strip_html("<p>Hello <b>world</b></p>") == "Hello world"
def test_decodes_entities(self):
from src.preprocessing import strip_html
assert "&" in strip_html("Rock & Roll")
def test_handles_empty(self):
from src.preprocessing import strip_html
assert strip_html("") == ""
assert strip_html(None) == ""
Step 12.3 — Test date parsing
class TestDateParsing:
def test_valid_date(self):
from src.preprocessing import parse_timings
result = parse_timings([{
"begin": "2025-08-22T20:00:00+0200",
"end": "2025-08-22T23:00:00+0200"
}])
assert result["weekday"] == "Friday"
assert "Date not available" not in result["display"]
def test_empty_timings(self):
from src.preprocessing import parse_timings
assert parse_timings([])["display"] == "Date not available"
def test_malformed_date(self):
from src.preprocessing import parse_timings
assert parse_timings([{"begin": "nope"}])["display"] == "Date not available"
Step 12.4 — Test event processing
class TestEventProcessing:
def test_complete_event(self):
from src.preprocessing import process_event
doc = process_event(SAMPLE_RAW_EVENT)
assert doc is not None
assert "Test Blues Night" in doc["text"]
assert doc["metadata"]["city"] == "Berlin"
def test_missing_data_returns_none(self):
from src.preprocessing import process_event
assert process_event({"uid": 1}) is None
def test_batch_filters_incomplete(self):
from src.preprocessing import process_all_events
result = process_all_events([SAMPLE_RAW_EVENT, {"uid": 2}])
assert len(result) == 1
Step 12.5 — Test chunking
class TestChunking:
def test_metadata_preserved(self):
from src.chunking import create_chunks
events = [{"text": "Short text.", "metadata": {"title": "T"}}]
chunks = create_chunks(events, chunk_size=500)
assert chunks[0]["metadata"]["title"] == "T"
def test_long_text_splits(self):
from src.chunking import create_chunks
long = "A sentence. " * 100
chunks = create_chunks(
[{"text": long, "metadata": {"title": "X"}}],
chunk_size=200, chunk_overlap=50
)
assert len(chunks) > 1
def test_chunk_index_tracked(self):
from src.chunking import create_chunks
long = "Word " * 300
chunks = create_chunks(
[{"text": long, "metadata": {"title": "X"}}],
chunk_size=200
)
for c in chunks:
assert "chunk_index" in c["metadata"]
Step 12.6 — Test vector store
class TestVectorStore:
def test_embedding_produces_vector(self):
from src.vectorstore import get_embeddings
vec = get_embeddings().embed_query("test")
assert len(vec) > 0
def test_build_and_search(self):
from src.vectorstore import build_index, search
chunks = [
{"text": "Blues concert Friday night",
"metadata": {"title": "Blues Night"}},
{"text": "Sunday yoga in the park",
"metadata": {"title": "Yoga"}},
]
store = build_index(chunks, save_dir="/tmp/test_idx")
results = search(store, "live blues music", k=1)
assert "Blues" in results[0][0].metadata["title"]
Step 12.7 — Test API
class TestAPI:
def test_health_ok(self):
from fastapi.testclient import TestClient
from src.api import app
r = TestClient(app).get("/health")
assert r.status_code == 200
def test_empty_question_rejected(self):
from fastapi.testclient import TestClient
from src.api import app
r = TestClient(app).post("/ask", json={"question": ""})
assert r.status_code == 422 # Validation error
def test_too_short_rejected(self):
from fastapi.testclient import TestClient
from src.api import app
r = TestClient(app).post("/ask", json={"question": "Hi"})
assert r.status_code == 422
Run the tests
pytest tests/test_pipeline.py -v
13. Packaging Everything in Docker
Docker turns "it works on my machine" into "it works everywhere."
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && \
apt-get install -y --no-install-recommends build-essential curl && \
rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY data/ ./data/
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000"]
Build and run
docker build -t gigfinder-rag .
docker run -p 8000:8000 \
-e MISTRAL_API_KEY=your_key \
-e OPENAGENDA_API_KEY=your_key \
gigfinder-rag
# Verify
curl http://localhost:8000/health
# Ask a question
curl -X POST http://localhost:8000/ask \
-H "Content-Type: application/json" \
-d '{"question": "Any blues shows this Friday?"}'
Index rebuild script
Run this before docker build to populate data/faiss_index/:
#!/bin/bash
# build_index.sh
set -e
echo "=== Building FAISS index ==="
python -c "
from src.data_fetcher import fetch_all_events
from src.preprocessing import process_all_events
from src.chunking import create_chunks
from src.vectorstore import build_index
import json
raw = fetch_all_events([82837550], max_per_agenda=250)
processed = process_all_events(raw)
with open('data/processed_events.json', 'w') as f:
json.dump(processed, f, ensure_ascii=False, indent=2)
chunks = create_chunks(processed)
build_index(chunks)
print('Done!')
"
echo "=== Index ready ==="
14. Architecture Overview
┌──────────────────────────────────────────────────┐
│ USER REQUEST │
│ "Any blues shows Friday?" │
└─────────────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ FastAPI (POST /ask) │
│ Validation · Error handling │
└─────────────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ LangChain RAG Chain │
│ │
│ Mistral Embed → FAISS Search → Mistral LLM │
│ (query→vector) (top-k) (generate answer) │
└─────────────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ JSON RESPONSE │
│ { answer: "...", sources: [...] } │
└──────────────────────────────────────────────────┘
Project structure
music-venue-rag/
├── src/
│ ├── __init__.py
│ ├── data_fetcher.py # Fetch from OpenAgenda
│ ├── preprocessing.py # Clean + structure
│ ├── chunking.py # Split into chunks
│ ├── vectorstore.py # FAISS build/load/search
│ ├── rag_chain.py # LangChain pipeline
│ ├── api.py # FastAPI endpoints
│ └── evaluation.py # RAGAS metrics
├── tests/
│ └── test_pipeline.py # Unit tests
├── data/
│ ├── raw_events.json
│ ├── processed_events.json
│ ├── faiss_index/
│ └── eval_report.json
├── .env
├── .gitignore
├── requirements.txt
├── Dockerfile
├── build_index.sh
└── README.md
15. Full File Recap — Copy-Paste-Ready Code
Here's every file, complete and ready to use. This is the same code from above, just assembled into complete files without interruptions.
src/data_fetcher.py
"""Fetches event data from the OpenAgenda REST API."""
import os
import json
import time
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
def fetch_events_from_agenda(
agenda_uid: int,
api_key: str,
max_events: int = 300,
months_back: int = 12,
) -> list[dict]:
since = (
datetime.now() - timedelta(days=30 * months_back)
).strftime("%Y-%m-%dT00:00:00Z")
url = f"https://api.openagenda.com/v2/agendas/{agenda_uid}/events"
collected = []
offset = 0
while len(collected) < max_events:
params = {
"key": api_key,
"timings[gte]": since,
"size": min(100, max_events - len(collected)),
"offset": offset,
"sort": "timingsStart.desc",
}
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
payload = resp.json()
except requests.exceptions.HTTPError as e:
print(f" [HTTP ERROR] {e}")
break
except requests.exceptions.ConnectionError:
print(" [CONNECTION ERROR] Can't reach OpenAgenda")
break
except requests.exceptions.Timeout:
print(" [TIMEOUT] Request took too long")
break
except requests.exceptions.RequestException as e:
print(f" [ERROR] {e}")
break
events = payload.get("events", [])
if not events:
break
collected.extend(events)
offset += len(events)
if offset >= payload.get("total", 0):
break
time.sleep(0.5)
print(f" -> Fetched {len(collected)} events from agenda {agenda_uid}")
return collected
def fetch_all_events(
agenda_uids: list[int], max_per_agenda: int = 200
) -> list[dict]:
api_key = os.getenv("OPENAGENDA_API_KEY")
if not api_key:
raise ValueError("OPENAGENDA_API_KEY is missing!")
all_events = []
for uid in agenda_uids:
print(f"Fetching agenda {uid}...")
all_events.extend(
fetch_events_from_agenda(uid, api_key, max_per_agenda)
)
print(f"\nTotal events collected: {len(all_events)}")
return all_events
if __name__ == "__main__":
AGENDAS = [82837550]
events = fetch_all_events(AGENDAS, max_per_agenda=250)
os.makedirs("data", exist_ok=True)
with open("data/raw_events.json", "w", encoding="utf-8") as f:
json.dump(events, f, ensure_ascii=False, indent=2)
print(f"Saved {len(events)} events to data/raw_events.json")
src/preprocessing.py
"""Cleans and structures raw event data."""
import re
import json
from datetime import datetime
from bs4 import BeautifulSoup
def strip_html(text: str) -> str:
if not text:
return ""
cleaned = BeautifulSoup(text, "html.parser").get_text(separator=" ")
return re.sub(r"\s+", " ", cleaned).strip()
def extract_multilingual(field, preferred_lang="fr") -> str:
if isinstance(field, dict):
return field.get(preferred_lang, "") or next(iter(field.values()), "")
if isinstance(field, str):
return field
return ""
def parse_timings(timings: list[dict]) -> dict:
fallback = {
"start_iso": None, "end_iso": None,
"weekday": None, "display": "Date not available",
}
if not timings:
return fallback
try:
begin = datetime.fromisoformat(timings[0]["begin"])
weekdays = ["Monday","Tuesday","Wednesday","Thursday",
"Friday","Saturday","Sunday"]
return {
"start_iso": timings[0]["begin"],
"end_iso": timings[0].get("end", ""),
"weekday": weekdays[begin.weekday()],
"display": begin.strftime(
f"{weekdays[begin.weekday()]}, %B %d %Y at %H:%M"
),
}
except (ValueError, KeyError, IndexError):
return fallback
def process_event(raw: dict) -> dict | None:
title = strip_html(extract_multilingual(raw.get("title")))
description = strip_html(extract_multilingual(raw.get("description")))
details = strip_html(extract_multilingual(raw.get("longDescription")))
if not title and not description:
return None
loc = raw.get("location") or {}
venue = loc.get("name", "Unknown venue")
address = loc.get("address", "")
city = loc.get("city", "")
lat = loc.get("latitude")
lon = loc.get("longitude")
dates = parse_timings(raw.get("timings", []))
kw_field = raw.get("keywords")
keywords = []
if isinstance(kw_field, dict):
keywords = kw_field.get("fr", []) or []
elif isinstance(kw_field, list):
keywords = kw_field
parts = [f"Event: {title}"]
if description: parts.append(f"Description: {description}")
if details: parts.append(f"Details: {details}")
if venue: parts.append(f"Venue: {venue}")
if city: parts.append(f"City: {city}")
if address: parts.append(f"Address: {address}")
parts.append(f"Date: {dates['display']}")
if keywords: parts.append(f"Tags: {', '.join(keywords)}")
return {
"text": "\n".join(parts),
"metadata": {
"uid": raw.get("uid"), "title": title, "venue": venue,
"city": city, "address": address,
"start_date": dates["start_iso"], "end_date": dates["end_iso"],
"weekday": dates["weekday"],
"formatted_date": dates["display"],
"keywords": keywords, "lat": lat, "lon": lon,
},
}
def process_all_events(raw_events: list[dict]) -> list[dict]:
results, skipped = [], 0
for ev in raw_events:
doc = process_event(ev)
if doc:
results.append(doc)
else:
skipped += 1
print(f"Processed: {len(results)} | Skipped: {skipped}")
return results
if __name__ == "__main__":
with open("data/raw_events.json", encoding="utf-8") as f:
raw = json.load(f)
processed = process_all_events(raw)
with open("data/processed_events.json", "w", encoding="utf-8") as f:
json.dump(processed, f, ensure_ascii=False, indent=2)
src/chunking.py
"""Splits event texts into overlapping chunks."""
from langchain.text_splitter import RecursiveCharacterTextSplitter
def create_chunks(
processed_events: list[dict],
chunk_size: int = 500,
chunk_overlap: int = 100,
) -> list[dict]:
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", ", ", " ", ""],
length_function=len,
)
all_chunks = []
for event in processed_events:
pieces = splitter.split_text(event["text"])
for idx, text in enumerate(pieces):
all_chunks.append({
"text": text,
"metadata": {
**event["metadata"],
"chunk_index": idx,
"total_chunks": len(pieces),
},
})
avg = len(all_chunks) / max(len(processed_events), 1)
print(f"Created {len(all_chunks)} chunks ({avg:.1f}/event)")
return all_chunks
src/vectorstore.py
"""FAISS vector index: build, save, load, search."""
import os, json
from dotenv import load_dotenv
from langchain_mistralai import MistralAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
load_dotenv()
def get_embeddings() -> MistralAIEmbeddings:
key = os.getenv("MISTRAL_API_KEY")
if not key:
raise ValueError("Set MISTRAL_API_KEY in .env")
return MistralAIEmbeddings(model="mistral-embed", api_key=key)
def build_index(chunks, save_dir="data/faiss_index") -> FAISS:
docs = [Document(page_content=c["text"], metadata=c["metadata"])
for c in chunks]
print(f"Building index from {len(docs)} documents...")
store = FAISS.from_documents(docs, get_embeddings())
os.makedirs(save_dir, exist_ok=True)
store.save_local(save_dir)
print(f"Index saved ({store.index.ntotal} vectors)")
return store
def load_index(save_dir="data/faiss_index") -> FAISS:
store = FAISS.load_local(
save_dir, get_embeddings(), allow_dangerous_deserialization=True
)
print(f"Loaded index: {store.index.ntotal} vectors")
return store
def search(store, query, k=5):
return store.similarity_search_with_score(query, k=k)
if __name__ == "__main__":
from chunking import create_chunks
with open("data/processed_events.json", encoding="utf-8") as f:
processed = json.load(f)
chunks = create_chunks(processed)
store = build_index(chunks)
for q in ["blues concert", "family outdoor"]:
hits = search(store, q, k=2)
print(f'\n"{q}"')
for doc, score in hits:
print(f" [{score:.3f}] {doc.metadata.get('title','?')}")
src/rag_chain.py
"""RAG pipeline: Retrieve + Augment + Generate."""
import os
from dotenv import load_dotenv
from langchain_mistralai import ChatMistralAI
from langchain.prompts import ChatPromptTemplate
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
load_dotenv()
PROMPT = """You are a helpful assistant for a network of music venues.
Answer questions about upcoming shows based ONLY on the context below.
Rules:
1. Use ONLY context information. Never invent events.
2. If context is insufficient, say so honestly.
3. Include specifics: names, dates, venues, prices.
4. Be friendly and conversational.
5. List multiple matches clearly.
CONTEXT:
{context}
QUESTION:
{input}
ANSWER:"""
def create_llm(model="mistral-small-latest", temperature=0.2):
return ChatMistralAI(
model=model, api_key=os.getenv("MISTRAL_API_KEY"),
temperature=temperature, max_tokens=1024,
)
def build_chain(vectorstore, top_k=5):
retriever = vectorstore.as_retriever(
search_type="similarity", search_kwargs={"k": top_k}
)
llm = create_llm()
prompt = ChatPromptTemplate.from_template(PROMPT)
doc_chain = create_stuff_documents_chain(llm=llm, prompt=prompt)
return create_retrieval_chain(retriever=retriever,
combine_docs_chain=doc_chain)
def ask(chain, question: str) -> dict:
result = chain.invoke({"input": question})
return {
"question": question,
"answer": result.get("answer", "Sorry, no answer."),
"sources": [
{"text": d.page_content,
"title": d.metadata.get("title",""),
"venue": d.metadata.get("venue",""),
"date": d.metadata.get("formatted_date","")}
for d in result.get("context", [])
],
}
if __name__ == "__main__":
from vectorstore import load_index
store = load_index()
chain = build_chain(store)
for q in ["Blues shows Friday?", "Free family concerts?"]:
r = ask(chain, q)
print(f"\nQ: {q}\nA: {r['answer']}")
src/api.py
"""FastAPI REST API exposing the RAG system."""
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from vectorstore import load_index, build_index
from rag_chain import build_chain, ask
from data_fetcher import fetch_all_events
from preprocessing import process_all_events
from chunking import create_chunks
class AskRequest(BaseModel):
question: str = Field(..., min_length=3, max_length=500,
examples=["Any blues shows this Friday?"])
num_results: int = Field(default=5, ge=1, le=20)
class Source(BaseModel):
title: str | None = None
venue: str | None = None
city: str | None = None
date: str | None = None
excerpt: str
class AskResponse(BaseModel):
question: str
answer: str
sources: list[Source]
num_sources: int
class HealthResponse(BaseModel):
status: str
index_size: int
message: str
_store = None
_chain = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _store, _chain
try:
_store = load_index(os.getenv("FAISS_INDEX_PATH","data/faiss_index"))
_chain = build_chain(_store)
print("[READY]")
except Exception as e:
print(f"[WARN] {e}")
yield
app = FastAPI(
title="GigFinder RAG API",
description="Ask questions about upcoming shows.",
version="1.0.0", lifespan=lifespan,
)
@app.post("/ask", response_model=AskResponse)
async def handle_ask(req: AskRequest):
if _chain is None:
raise HTTPException(503, "Not ready. POST /rebuild first.")
try:
result = ask(_chain, req.question)
sources = [Source(title=s.get("title"), venue=s.get("venue"),
date=s.get("date"), excerpt=s.get("text","")[:200])
for s in result["sources"]]
return AskResponse(question=req.question, answer=result["answer"],
sources=sources, num_sources=len(sources))
except Exception as e:
raise HTTPException(500, f"Error: {e}")
@app.get("/health", response_model=HealthResponse)
async def health():
if _store is None:
return HealthResponse(status="degraded", index_size=0,
message="No index.")
return HealthResponse(status="healthy",
index_size=_store.index.ntotal, message="Operational.")
@app.post("/rebuild")
async def rebuild(agenda_uids: list[int]=[82837550], max_events: int=200):
global _store, _chain
try:
raw = fetch_all_events(agenda_uids, max_per_agenda=max_events)
processed = process_all_events(raw)
chunks = create_chunks(processed)
_store = build_index(chunks)
_chain = build_chain(_store)
return {"status":"success", "events": len(processed),
"chunks": len(chunks), "vectors": _store.index.ntotal}
except Exception as e:
raise HTTPException(500, f"Rebuild failed: {e}")
@app.get("/metadata")
async def metadata():
return {"status": "loaded" if _store else "empty",
"vectors": _store.index.ntotal if _store else 0}
src/evaluation.py
"""RAGAS evaluation of the RAG system."""
import os, json
from datetime import datetime
from dotenv import load_dotenv
from ragas import evaluate
from ragas.metrics import (Faithfulness, ResponseRelevancy,
LLMContextPrecisionWithoutReference)
from ragas.dataset_schema import SingleTurnSample, EvaluationDataset
from langchain_mistralai import ChatMistralAI
from ragas.llms import LangchainLLMWrapper
load_dotenv()
TEST_SET = [
{"question": "Blues shows this Friday?",
"ground_truth": "Muddy Waters Tribute at Kantine, Friday 21:00, 12€."},
{"question": "Free outdoor concerts?",
"ground_truth": "Free acoustic session Mauerpark Saturday 15:00."},
{"question": "Family-friendly Sunday shows?",
"ground_truth": "Sunday Matinee SO36, all-ages folk, 14:00, 5€."},
{"question": "Electronic music tonight?",
"ground_truth": "Tresor techno 23:00; Berghain midnight."},
{"question": "Jazz in Kreuzberg this month?",
"ground_truth": "Kreuzberg Jazz Festival 15th-18th, Lido & Bi Nuu."},
]
def run_evaluation(rag_chain, test_data=None):
test_data = test_data or TEST_SET
samples = []
for i, item in enumerate(test_data):
print(f" [{i+1}/{len(test_data)}] {item['question'][:40]}...")
result = rag_chain.invoke({"input": item["question"]})
samples.append(SingleTurnSample(
user_input=item["question"],
response=result.get("answer",""),
retrieved_contexts=[d.page_content for d in result.get("context",[])],
reference=item["ground_truth"],
))
eval_llm = LangchainLLMWrapper(ChatMistralAI(
model="mistral-small-latest",
api_key=os.getenv("MISTRAL_API_KEY"), temperature=0.1))
metrics = [Faithfulness(llm=eval_llm), ResponseRelevancy(llm=eval_llm),
LLMContextPrecisionWithoutReference(llm=eval_llm)]
results = evaluate(dataset=EvaluationDataset(samples=samples),
metrics=metrics)
report = {
"timestamp": datetime.now().isoformat(),
"aggregate": {
"faithfulness": float(results["faithfulness"]),
"answer_relevancy": float(results["response_relevancy"]),
"context_precision": float(
results["llm_context_precision_without_reference"]),
},
"per_question": [],
}
df = results.to_pandas()
for idx, row in df.iterrows():
report["per_question"].append({
"question": test_data[idx]["question"],
"faithfulness": float(row.get("faithfulness",0)),
"relevancy": float(row.get("response_relevancy",0)),
})
return report
def print_report(report):
m = report["aggregate"]
print(f"\nFaithfulness: {m['faithfulness']:.3f}")
print(f"Relevancy: {m['answer_relevancy']:.3f}")
print(f"Precision: {m['context_precision']:.3f}")
def save_report(report, path="data/eval_report.json"):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
json.dump(report, f, indent=2)
if __name__ == "__main__":
from vectorstore import load_index
from rag_chain import build_chain
store = load_index()
chain = build_chain(store)
report = run_evaluation(chain)
print_report(report)
save_report(report)
16. Resources and Further Reading
RAG Fundamentals:
Embeddings:
Tools:
Evaluation:
Data Source:
Wrapping Up
That Sunday evening, I demoed the system over a video call. I typed "Any blues shows this Friday night?" into Swagger UI, hit Execute, and a well-formatted JSON response came back with the exact event, venue, time, and ticket price — all sourced from real data, zero hallucination.
It's not magic. It's a clear pipeline: fetch → clean → chunk → embed → index → retrieve → generate → test → serve → containerize. Each piece is understandable, debuggable, and improvable on its own.
Now go build something cool.
If you found this useful, give it a clap and follow for more hands-on AI tutorials. Questions? Drop them in the comments.







Top comments (0)