Most teams know when their LLM is wrong after a customer complains. Production-grade retail AI requires knowing before that — with metrics, traces, and automated eval pipelines that catch drift, hallucination, and degradation continuously. This article shows you how to build that system on GCP.
🧭 Why LLM Observability in Retail Is Different
Traditional ML observability tracks distribution drift on structured features and monitors a single scalar metric — accuracy, RMSE, AUC. LLMs break this model in three ways:
- Outputs are unstructured. There is no ground-truth label for "did the agent give a good answer?" arriving in real time.
- Failure modes are silent. A hallucinated return policy answer looks identical to a correct one in your latency dashboard.
- Context windows change behavior. The same model behaves differently depending on what is in the prompt — retrieved chunks, session history, tool results.
In retail specifically, the stakes are asymmetric. A mis-personalized recommendation costs a click. A hallucinated return policy answer costs a customer, a refund, and potentially a chargeback.
Design principle: Treat every LLM inference as a structured event with inputs, outputs, retrieved context, tool calls, and a confidence signal — not just a latency measurement.
🏗️ The Observability Stack
The system is built on four GCP components working in concert:
| Component | Role |
|---|---|
| Cloud Logging + Log Router | Capture structured inference events from Cloud Run |
| BigQuery | Central eval store — every inference logged with full context |
| Vertex AI Evaluation Service | Automated metric computation (rouge, BLEU, coherence, groundedness) |
| Looker Studio | Real-time eval dashboard over BigQuery eval tables |
And one additional layer that makes it production-grade:
| Component | Role |
|---|---|
| Vertex AI Pipelines | Scheduled eval pipeline — nightly batch scoring of sampled inferences |
| Cloud Monitoring + Alerting | Threshold-based alerts when eval metrics degrade |
📐 The Inference Event Schema
Everything starts with logging the right data. Every LLM inference — whether personalization re-ranking, agent orchestration, or RAG answer generation — emits a structured event to Cloud Logging, which Log Router sinks to BigQuery.
# Cloud Run inference handler — structured logging
import json
import logging
from google.cloud import logging as gcp_logging
client = gcp_logging.Client()
logger = client.logger("llm_inference_events")
def log_inference_event(
request_id: str,
layer: str, # "personalization" | "multi_agent" | "agentic_rag"
agent_id: str,
model_id: str,
prompt_tokens: int,
completion_tokens: int,
retrieved_chunks: list[dict],
tool_calls: list[dict],
output: str,
latency_ms: int,
self_confidence: float, # model's own logprob-derived confidence
):
logger.log_struct({
"request_id": request_id,
"ts": datetime.utcnow().isoformat() + "Z",
"layer": layer,
"agent_id": agent_id,
"model_id": model_id,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"retrieved_chunks": retrieved_chunks, # [{chunk_id, score, source}]
"tool_calls": tool_calls, # [{tool, input, output, latency_ms}]
"output": output,
"latency_ms": latency_ms,
"self_confidence": self_confidence,
}, severity="INFO")
This single schema powers everything downstream — dashboards, evals, fine-tuning, and alerting.
🗄️ BigQuery Eval Schema
Log Router sinks the inference events into a partitioned BigQuery table:
CREATE TABLE retail_ai.llm_inference_log (
request_id STRING NOT NULL,
ts TIMESTAMP NOT NULL,
layer STRING, -- personalization | multi_agent | agentic_rag
agent_id STRING,
model_id STRING,
prompt_tokens INT64,
completion_tokens INT64,
retrieved_chunks ARRAY<STRUCT<
chunk_id STRING,
relevance_score FLOAT64,
source STRING
>>,
tool_calls ARRAY<STRUCT<
tool_name STRING,
input_payload JSON,
output_payload JSON,
latency_ms INT64
>>,
output STRING,
latency_ms INT64,
self_confidence FLOAT64,
-- Populated by the nightly eval pipeline:
groundedness_score FLOAT64,
coherence_score FLOAT64,
rouge_l FLOAT64,
hallucination_flag BOOL,
human_label STRING -- populated for sampled reviews
)
PARTITION BY DATE(ts)
CLUSTER BY layer, agent_id;
Partitioned by date, clustered by layer and agent — eval queries over 30-day windows run in seconds, not minutes.
⚙️ The Nightly Eval Pipeline (Vertex AI Pipelines)
Automated evaluation runs nightly over a stratified sample of the previous day's inferences. The pipeline has four stages:
[Sample Inferences] → [Compute Automatic Metrics] → [Flag Anomalies] → [Write Eval Results]
Stage 1 — Stratified Sampling
@component
def sample_inferences(
eval_date: str,
sample_per_layer: int = 200,
) -> list[dict]:
"""
Stratified sample: 200 inferences per layer per day.
Oversample low-confidence inferences (self_confidence < 0.6).
"""
query = f"""
WITH ranked AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY layer
ORDER BY
-- oversample low-confidence
CASE WHEN self_confidence < 0.6 THEN 0 ELSE 1 END,
RAND()
) AS rn
FROM retail_ai.llm_inference_log
WHERE DATE(ts) = '{eval_date}'
)
SELECT * FROM ranked WHERE rn <= {sample_per_layer}
"""
return bq_client.query(query).to_dataframe().to_dict("records")
Oversampling low-confidence inferences ensures the eval pipeline focuses attention where failures are most likely.
Stage 2 — Automatic Metric Computation
Three metric families are computed for every sampled inference:
2a. Groundedness (Retrieval Faithfulness)
For RAG inferences — does the output make claims supported by the retrieved chunks?
@component
def score_groundedness(inferences: list[dict]) -> list[dict]:
results = []
for inf in inferences:
if not inf["retrieved_chunks"]:
continue # skip non-RAG inferences
context = "\n\n".join(
chunk["chunk_text"] for chunk in inf["retrieved_chunks"]
)
prompt = f"""
You are an evaluator. Given the retrieved context and the model output below,
score the output's groundedness on a scale of 0.0 to 1.0.
Groundedness = every factual claim in the output is directly supported by the context.
Penalise any claim that is not traceable to the context.
Context:
{context}
Output:
{inf["output"]}
Return ONLY a JSON object: {{"groundedness": <float>}}
"""
response = gemini.generate_content(prompt)
score = json.loads(response.text)["groundedness"]
results.append({**inf, "groundedness_score": score})
return results
2b. Coherence
Does the output make logical sense as a response to the inferred query?
coherence_prompt = """
Rate the coherence of the following retail AI response on a scale of 0.0 to 1.0.
Coherence = the response is logically consistent, fluent, and directly addresses
the implied user need. Penalise contradictions, non-sequiturs, or incomplete answers.
Response: {output}
Return ONLY: {{"coherence": <float>}}
"""
2c. Hallucination Detection
A binary flag for responses that assert specific facts (prices, stock counts, policy terms) not present in the retrieved context or tool outputs:
@component
def flag_hallucinations(inferences: list[dict]) -> list[dict]:
results = []
for inf in inferences:
tool_facts = extract_tool_facts(inf["tool_calls"])
chunk_facts = extract_chunk_facts(inf["retrieved_chunks"])
all_grounded_facts = tool_facts | chunk_facts
hallucination_prompt = f"""
You are a fact-checker for a retail AI system.
Grounded facts available to the model:
{json.dumps(list(all_grounded_facts), indent=2)}
Model output:
{inf["output"]}
Does the output assert any specific factual claim (price, stock count,
policy term, date, SKU) that is NOT present in the grounded facts above?
Return ONLY: {{"hallucination_detected": true|false, "reason": "<string>"}}
"""
response = gemini.generate_content(hallucination_prompt)
result = json.loads(response.text)
results.append({
**inf,
"hallucination_flag": result["hallucination_detected"],
"hallucination_reason": result.get("reason"),
})
return results
Stage 3 — Anomaly Detection
After scoring, the pipeline computes rolling 7-day baselines and flags sessions where today's metrics fall more than 2σ below baseline:
@component
def detect_anomalies(eval_date: str) -> dict:
baseline_query = f"""
SELECT
layer,
AVG(groundedness_score) AS baseline_groundedness,
STDDEV(groundedness_score) AS stddev_groundedness,
AVG(coherence_score) AS baseline_coherence,
AVG(CAST(hallucination_flag AS INT64)) AS baseline_hallucination_rate
FROM retail_ai.llm_inference_log
WHERE DATE(ts) BETWEEN DATE_SUB('{eval_date}', INTERVAL 7 DAY)
AND DATE_SUB('{eval_date}', INTERVAL 1 DAY)
GROUP BY layer
"""
baseline = bq_client.query(baseline_query).to_dataframe()
today_query = f"""
SELECT
layer,
AVG(groundedness_score) AS today_groundedness,
AVG(coherence_score) AS today_coherence,
AVG(CAST(hallucination_flag AS INT64)) AS today_hallucination_rate
FROM retail_ai.llm_inference_log
WHERE DATE(ts) = '{eval_date}'
GROUP BY layer
"""
today = bq_client.query(today_query).to_dataframe()
anomalies = []
for _, row in today.merge(baseline, on="layer").iterrows():
if row["today_groundedness"] < (
row["baseline_groundedness"] - 2 * row["stddev_groundedness"]
):
anomalies.append({
"layer": row["layer"],
"metric": "groundedness",
"today": row["today_groundedness"],
"baseline": row["baseline_groundedness"],
})
return {"anomalies": anomalies}
📊 The Eval Dashboard (Looker Studio over BigQuery)
Five views cover the full platform — one per layer plus a cross-layer summary:
Daily Metric Trends (all layers)
SELECT
DATE(ts) AS eval_date,
layer,
AVG(groundedness_score) AS avg_groundedness,
AVG(coherence_score) AS avg_coherence,
AVG(CAST(hallucination_flag AS INT64)) AS hallucination_rate,
APPROX_QUANTILES(latency_ms, 100)[OFFSET(99)] AS p99_latency_ms,
COUNT(*) AS inference_count
FROM retail_ai.llm_inference_log
WHERE DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY 1, 2
ORDER BY 1 DESC, 2;
Low-Confidence Inference Drill-Down
SELECT
request_id,
ts,
agent_id,
self_confidence,
groundedness_score,
hallucination_flag,
output
FROM retail_ai.llm_inference_log
WHERE DATE(ts) = CURRENT_DATE() - 1
AND (self_confidence < 0.6 OR hallucination_flag = TRUE)
ORDER BY self_confidence ASC
LIMIT 100;
Retrieval Quality (RAG Layer)
SELECT
chunk.source AS source_doc,
AVG(chunk.relevance_score) AS avg_retrieval_score,
AVG(groundedness_score) AS avg_groundedness,
COUNT(*) AS times_retrieved
FROM retail_ai.llm_inference_log,
UNNEST(retrieved_chunks) AS chunk
WHERE DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY 1
ORDER BY times_retrieved DESC;
This query surfaces which source documents are being retrieved most frequently and whether they actually contribute to grounded answers — your index quality scorecard.
🚨 Alerting — Cloud Monitoring Policies
Three alert policies cover the most critical failure modes:
Policy 1 — Hallucination Rate Spike
# Cloud Monitoring alert policy
displayName: "Retail AI — Hallucination Rate Spike"
conditions:
- displayName: "hallucination_rate > 5% (7-day rolling)"
conditionThreshold:
filter: >
resource.type="bigquery_table"
metric.type="custom.googleapis.com/retail_ai/hallucination_rate"
comparison: COMPARISON_GT
thresholdValue: 0.05
duration: 3600s # sustained for 1 hour before alert fires
alertStrategy:
notificationRateLimit:
period: 3600s
notificationChannels:
- projects/PROJECT_ID/notificationChannels/PAGERDUTY_CHANNEL
Policy 2 — Groundedness Drop
Fires when the 24-hour average groundedness score for the RAG layer falls below 0.75 — typically indicating index drift (stale documents) or a retrieval pipeline failure.
Policy 3 — p99 Latency Breach
# Metric written by the serving layer after every inference
monitoring_client.create_time_series(
name=f"projects/{PROJECT_ID}",
time_series=[{
"metric": {
"type": "custom.googleapis.com/retail_ai/inference_latency_ms",
"labels": {"layer": layer, "agent_id": agent_id}
},
"points": [{"interval": {"endTime": now}, "value": {"int64Value": latency_ms}}]
}]
)
🔁 Closing the Loop — Eval-Driven Fine-Tuning
The eval pipeline does more than surface problems — it produces the training data for the next model version.
# Weekly: export flagged inferences for human review + supervised fine-tuning
export_query = """
SELECT
request_id,
output AS model_output,
groundedness_score,
hallucination_flag,
human_label, -- populated by reviewer workflow
retrieved_chunks,
tool_calls
FROM retail_ai.llm_inference_log
WHERE DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
AND (hallucination_flag = TRUE OR groundedness_score < 0.70)
AND human_label IS NOT NULL
ORDER BY groundedness_score ASC
"""
flagged = bq_client.query(export_query).to_dataframe()
# Write to GCS in JSONL format for Vertex AI supervised fine-tuning
flagged.to_json(
f"gs://retail-ai-tuning/weekly/{eval_date}/flagged_inferences.jsonl",
orient="records",
lines=True
)
The human review workflow — a lightweight Cloud Run app backed by the same BigQuery table — lets domain experts label outputs as correct, incorrect, or needs-revision. Those labels feed directly into Vertex AI supervised fine-tuning jobs, closing the eval-to-improvement loop.
🔗 How This Connects to the Unified Platform
This observability layer is not bolt-on — it is wired into the three-layer retail AI platform:
| Platform Layer | What Eval Monitors | Key Metric |
|---|---|---|
| Personalization | Re-ranking model output quality, cold-start recommendation coherence | Coherence score by user segment |
| Multi-Agent Ops | Agent decision accuracy, tool call success rate, orchestrator reasoning quality | Tool call success rate, hallucination rate per agent |
| Agentic RAG | Retrieval groundedness, index freshness, self-correction trigger rate | Groundedness score, re-query rate |
| Cross-layer | End-to-end latency, overall hallucination rate, human escalation rate | p99 latency, hallucination rate, escalation rate |
The llm_inference_log table is the single source of truth for all of these — one schema, one dashboard, one alert policy set.
💡 Key Takeaways
- Log the full context, not just the output. Retrieved chunks, tool call inputs/outputs, and self-confidence scores are what make downstream eval possible.
- Use LLM-as-judge for groundedness and coherence. Gemini evaluating Gemini outputs at scale is practical and cost-effective for production eval at retail volumes.
- Oversample low-confidence inferences. Your eval budget is limited — concentrate it where failures are most likely.
-
Retrieval quality IS answer quality. The
source_docquery above will tell you which documents are dragging your groundedness scores down. Fix the index before fine-tuning the model. - The eval pipeline is your fine-tuning dataset. Every flagged, human-reviewed inference is a supervised training example. Treat it that way from day one.
🚀 Where to Start
- Week 1: Instrument your Cloud Run inference handlers with structured logging using the schema above. Sink to BigQuery via Log Router.
- Week 2: Write the five dashboard queries. Stand up Looker Studio. You now have observability.
- Week 3: Add the nightly Vertex AI Pipeline for groundedness and hallucination scoring on a 200-inference sample.
- Week 4: Set up the three Cloud Monitoring alert policies. Add a human review queue for flagged inferences.
- Ongoing: Feed human-labeled inferences back into Vertex AI fine-tuning jobs monthly.
The schema is the hardest part. Get it right in Week 1 and everything else is additive.
Top comments (0)