DEV Community

LLM Evaluation & Observability in Production Retail Systems on GCP

Most teams know when their LLM is wrong after a customer complains. Production-grade retail AI requires knowing before that — with metrics, traces, and automated eval pipelines that catch drift, hallucination, and degradation continuously. This article shows you how to build that system on GCP.

🧭 Why LLM Observability in Retail Is Different

Traditional ML observability tracks distribution drift on structured features and monitors a single scalar metric — accuracy, RMSE, AUC. LLMs break this model in three ways:

  • Outputs are unstructured. There is no ground-truth label for "did the agent give a good answer?" arriving in real time.
  • Failure modes are silent. A hallucinated return policy answer looks identical to a correct one in your latency dashboard.
  • Context windows change behavior. The same model behaves differently depending on what is in the prompt — retrieved chunks, session history, tool results.

In retail specifically, the stakes are asymmetric. A mis-personalized recommendation costs a click. A hallucinated return policy answer costs a customer, a refund, and potentially a chargeback.

Design principle: Treat every LLM inference as a structured event with inputs, outputs, retrieved context, tool calls, and a confidence signal — not just a latency measurement.


🏗️ The Observability Stack

The system is built on four GCP components working in concert:

Component Role
Cloud Logging + Log Router Capture structured inference events from Cloud Run
BigQuery Central eval store — every inference logged with full context
Vertex AI Evaluation Service Automated metric computation (rouge, BLEU, coherence, groundedness)
Looker Studio Real-time eval dashboard over BigQuery eval tables

And one additional layer that makes it production-grade:

Component Role
Vertex AI Pipelines Scheduled eval pipeline — nightly batch scoring of sampled inferences
Cloud Monitoring + Alerting Threshold-based alerts when eval metrics degrade

📐 The Inference Event Schema

Everything starts with logging the right data. Every LLM inference — whether personalization re-ranking, agent orchestration, or RAG answer generation — emits a structured event to Cloud Logging, which Log Router sinks to BigQuery.

# Cloud Run inference handler — structured logging
import json
import logging
from google.cloud import logging as gcp_logging

client = gcp_logging.Client()
logger = client.logger("llm_inference_events")

def log_inference_event(
    request_id: str,
    layer: str,              # "personalization" | "multi_agent" | "agentic_rag"
    agent_id: str,
    model_id: str,
    prompt_tokens: int,
    completion_tokens: int,
    retrieved_chunks: list[dict],
    tool_calls: list[dict],
    output: str,
    latency_ms: int,
    self_confidence: float,  # model's own logprob-derived confidence
):
    logger.log_struct({
        "request_id":        request_id,
        "ts":                datetime.utcnow().isoformat() + "Z",
        "layer":             layer,
        "agent_id":          agent_id,
        "model_id":          model_id,
        "prompt_tokens":     prompt_tokens,
        "completion_tokens": completion_tokens,
        "retrieved_chunks":  retrieved_chunks,   # [{chunk_id, score, source}]
        "tool_calls":        tool_calls,          # [{tool, input, output, latency_ms}]
        "output":            output,
        "latency_ms":        latency_ms,
        "self_confidence":   self_confidence,
    }, severity="INFO")
Enter fullscreen mode Exit fullscreen mode

This single schema powers everything downstream — dashboards, evals, fine-tuning, and alerting.


🗄️ BigQuery Eval Schema

Log Router sinks the inference events into a partitioned BigQuery table:

CREATE TABLE retail_ai.llm_inference_log (
  request_id        STRING        NOT NULL,
  ts                TIMESTAMP     NOT NULL,
  layer             STRING,       -- personalization | multi_agent | agentic_rag
  agent_id          STRING,
  model_id          STRING,
  prompt_tokens     INT64,
  completion_tokens INT64,
  retrieved_chunks  ARRAY<STRUCT<
    chunk_id        STRING,
    relevance_score FLOAT64,
    source          STRING
  >>,
  tool_calls        ARRAY<STRUCT<
    tool_name       STRING,
    input_payload   JSON,
    output_payload  JSON,
    latency_ms      INT64
  >>,
  output            STRING,
  latency_ms        INT64,
  self_confidence   FLOAT64,
  -- Populated by the nightly eval pipeline:
  groundedness_score  FLOAT64,
  coherence_score     FLOAT64,
  rouge_l             FLOAT64,
  hallucination_flag  BOOL,
  human_label         STRING        -- populated for sampled reviews
)
PARTITION BY DATE(ts)
CLUSTER BY layer, agent_id;
Enter fullscreen mode Exit fullscreen mode

Partitioned by date, clustered by layer and agent — eval queries over 30-day windows run in seconds, not minutes.


⚙️ The Nightly Eval Pipeline (Vertex AI Pipelines)

Automated evaluation runs nightly over a stratified sample of the previous day's inferences. The pipeline has four stages:

[Sample Inferences] → [Compute Automatic Metrics] → [Flag Anomalies] → [Write Eval Results]
Enter fullscreen mode Exit fullscreen mode

Stage 1 — Stratified Sampling

@component
def sample_inferences(
    eval_date: str,
    sample_per_layer: int = 200,
) -> list[dict]:
    """
    Stratified sample: 200 inferences per layer per day.
    Oversample low-confidence inferences (self_confidence < 0.6).
    """
    query = f"""
        WITH ranked AS (
          SELECT *,
            ROW_NUMBER() OVER (
              PARTITION BY layer
              ORDER BY
                -- oversample low-confidence
                CASE WHEN self_confidence < 0.6 THEN 0 ELSE 1 END,
                RAND()
            ) AS rn
          FROM retail_ai.llm_inference_log
          WHERE DATE(ts) = '{eval_date}'
        )
        SELECT * FROM ranked WHERE rn <= {sample_per_layer}
    """
    return bq_client.query(query).to_dataframe().to_dict("records")
Enter fullscreen mode Exit fullscreen mode

Oversampling low-confidence inferences ensures the eval pipeline focuses attention where failures are most likely.

Stage 2 — Automatic Metric Computation

Three metric families are computed for every sampled inference:

2a. Groundedness (Retrieval Faithfulness)

For RAG inferences — does the output make claims supported by the retrieved chunks?

@component
def score_groundedness(inferences: list[dict]) -> list[dict]:
    results = []
    for inf in inferences:
        if not inf["retrieved_chunks"]:
            continue  # skip non-RAG inferences

        context = "\n\n".join(
            chunk["chunk_text"] for chunk in inf["retrieved_chunks"]
        )
        prompt = f"""
You are an evaluator. Given the retrieved context and the model output below,
score the output's groundedness on a scale of 0.0 to 1.0.
Groundedness = every factual claim in the output is directly supported by the context.
Penalise any claim that is not traceable to the context.

Context:
{context}

Output:
{inf["output"]}

Return ONLY a JSON object: {{"groundedness": <float>}}
"""
        response = gemini.generate_content(prompt)
        score = json.loads(response.text)["groundedness"]
        results.append({**inf, "groundedness_score": score})

    return results
Enter fullscreen mode Exit fullscreen mode

2b. Coherence

Does the output make logical sense as a response to the inferred query?

coherence_prompt = """
Rate the coherence of the following retail AI response on a scale of 0.0 to 1.0.
Coherence = the response is logically consistent, fluent, and directly addresses
the implied user need. Penalise contradictions, non-sequiturs, or incomplete answers.

Response: {output}

Return ONLY: {{"coherence": <float>}}
"""
Enter fullscreen mode Exit fullscreen mode

2c. Hallucination Detection

A binary flag for responses that assert specific facts (prices, stock counts, policy terms) not present in the retrieved context or tool outputs:

@component
def flag_hallucinations(inferences: list[dict]) -> list[dict]:
    results = []
    for inf in inferences:
        tool_facts = extract_tool_facts(inf["tool_calls"])
        chunk_facts = extract_chunk_facts(inf["retrieved_chunks"])
        all_grounded_facts = tool_facts | chunk_facts

        hallucination_prompt = f"""
You are a fact-checker for a retail AI system.
Grounded facts available to the model:
{json.dumps(list(all_grounded_facts), indent=2)}

Model output:
{inf["output"]}

Does the output assert any specific factual claim (price, stock count, 
policy term, date, SKU) that is NOT present in the grounded facts above?

Return ONLY: {{"hallucination_detected": true|false, "reason": "<string>"}}
"""
        response = gemini.generate_content(hallucination_prompt)
        result = json.loads(response.text)
        results.append({
            **inf,
            "hallucination_flag": result["hallucination_detected"],
            "hallucination_reason": result.get("reason"),
        })
    return results
Enter fullscreen mode Exit fullscreen mode

Stage 3 — Anomaly Detection

After scoring, the pipeline computes rolling 7-day baselines and flags sessions where today's metrics fall more than 2σ below baseline:

@component
def detect_anomalies(eval_date: str) -> dict:
    baseline_query = f"""
        SELECT
          layer,
          AVG(groundedness_score)  AS baseline_groundedness,
          STDDEV(groundedness_score) AS stddev_groundedness,
          AVG(coherence_score)     AS baseline_coherence,
          AVG(CAST(hallucination_flag AS INT64)) AS baseline_hallucination_rate
        FROM retail_ai.llm_inference_log
        WHERE DATE(ts) BETWEEN DATE_SUB('{eval_date}', INTERVAL 7 DAY)
                           AND DATE_SUB('{eval_date}', INTERVAL 1 DAY)
        GROUP BY layer
    """
    baseline = bq_client.query(baseline_query).to_dataframe()

    today_query = f"""
        SELECT
          layer,
          AVG(groundedness_score)  AS today_groundedness,
          AVG(coherence_score)     AS today_coherence,
          AVG(CAST(hallucination_flag AS INT64)) AS today_hallucination_rate
        FROM retail_ai.llm_inference_log
        WHERE DATE(ts) = '{eval_date}'
        GROUP BY layer
    """
    today = bq_client.query(today_query).to_dataframe()

    anomalies = []
    for _, row in today.merge(baseline, on="layer").iterrows():
        if row["today_groundedness"] < (
            row["baseline_groundedness"] - 2 * row["stddev_groundedness"]
        ):
            anomalies.append({
                "layer": row["layer"],
                "metric": "groundedness",
                "today": row["today_groundedness"],
                "baseline": row["baseline_groundedness"],
            })
    return {"anomalies": anomalies}
Enter fullscreen mode Exit fullscreen mode

📊 The Eval Dashboard (Looker Studio over BigQuery)

Five views cover the full platform — one per layer plus a cross-layer summary:

Daily Metric Trends (all layers)

SELECT
  DATE(ts)                              AS eval_date,
  layer,
  AVG(groundedness_score)               AS avg_groundedness,
  AVG(coherence_score)                  AS avg_coherence,
  AVG(CAST(hallucination_flag AS INT64)) AS hallucination_rate,
  APPROX_QUANTILES(latency_ms, 100)[OFFSET(99)] AS p99_latency_ms,
  COUNT(*)                              AS inference_count
FROM retail_ai.llm_inference_log
WHERE DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY 1, 2
ORDER BY 1 DESC, 2;
Enter fullscreen mode Exit fullscreen mode

Low-Confidence Inference Drill-Down

SELECT
  request_id,
  ts,
  agent_id,
  self_confidence,
  groundedness_score,
  hallucination_flag,
  output
FROM retail_ai.llm_inference_log
WHERE DATE(ts) = CURRENT_DATE() - 1
  AND (self_confidence < 0.6 OR hallucination_flag = TRUE)
ORDER BY self_confidence ASC
LIMIT 100;
Enter fullscreen mode Exit fullscreen mode

Retrieval Quality (RAG Layer)

SELECT
  chunk.source                        AS source_doc,
  AVG(chunk.relevance_score)          AS avg_retrieval_score,
  AVG(groundedness_score)             AS avg_groundedness,
  COUNT(*)                            AS times_retrieved
FROM retail_ai.llm_inference_log,
UNNEST(retrieved_chunks) AS chunk
WHERE DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY 1
ORDER BY times_retrieved DESC;
Enter fullscreen mode Exit fullscreen mode

This query surfaces which source documents are being retrieved most frequently and whether they actually contribute to grounded answers — your index quality scorecard.


🚨 Alerting — Cloud Monitoring Policies

Three alert policies cover the most critical failure modes:

Policy 1 — Hallucination Rate Spike

# Cloud Monitoring alert policy
displayName: "Retail AI  Hallucination Rate Spike"
conditions:
  - displayName: "hallucination_rate > 5% (7-day rolling)"
    conditionThreshold:
      filter: >
        resource.type="bigquery_table"
        metric.type="custom.googleapis.com/retail_ai/hallucination_rate"
      comparison: COMPARISON_GT
      thresholdValue: 0.05
      duration: 3600s   # sustained for 1 hour before alert fires
alertStrategy:
  notificationRateLimit:
    period: 3600s
notificationChannels:
  - projects/PROJECT_ID/notificationChannels/PAGERDUTY_CHANNEL
Enter fullscreen mode Exit fullscreen mode

Policy 2 — Groundedness Drop

Fires when the 24-hour average groundedness score for the RAG layer falls below 0.75 — typically indicating index drift (stale documents) or a retrieval pipeline failure.

Policy 3 — p99 Latency Breach

# Metric written by the serving layer after every inference
monitoring_client.create_time_series(
    name=f"projects/{PROJECT_ID}",
    time_series=[{
        "metric": {
            "type": "custom.googleapis.com/retail_ai/inference_latency_ms",
            "labels": {"layer": layer, "agent_id": agent_id}
        },
        "points": [{"interval": {"endTime": now}, "value": {"int64Value": latency_ms}}]
    }]
)
Enter fullscreen mode Exit fullscreen mode

🔁 Closing the Loop — Eval-Driven Fine-Tuning

The eval pipeline does more than surface problems — it produces the training data for the next model version.

# Weekly: export flagged inferences for human review + supervised fine-tuning
export_query = """
    SELECT
      request_id,
      output                AS model_output,
      groundedness_score,
      hallucination_flag,
      human_label,           -- populated by reviewer workflow
      retrieved_chunks,
      tool_calls
    FROM retail_ai.llm_inference_log
    WHERE DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
      AND (hallucination_flag = TRUE OR groundedness_score < 0.70)
      AND human_label IS NOT NULL
    ORDER BY groundedness_score ASC
"""

flagged = bq_client.query(export_query).to_dataframe()

# Write to GCS in JSONL format for Vertex AI supervised fine-tuning
flagged.to_json(
    f"gs://retail-ai-tuning/weekly/{eval_date}/flagged_inferences.jsonl",
    orient="records",
    lines=True
)
Enter fullscreen mode Exit fullscreen mode

The human review workflow — a lightweight Cloud Run app backed by the same BigQuery table — lets domain experts label outputs as correct, incorrect, or needs-revision. Those labels feed directly into Vertex AI supervised fine-tuning jobs, closing the eval-to-improvement loop.


🔗 How This Connects to the Unified Platform

This observability layer is not bolt-on — it is wired into the three-layer retail AI platform:

Platform Layer What Eval Monitors Key Metric
Personalization Re-ranking model output quality, cold-start recommendation coherence Coherence score by user segment
Multi-Agent Ops Agent decision accuracy, tool call success rate, orchestrator reasoning quality Tool call success rate, hallucination rate per agent
Agentic RAG Retrieval groundedness, index freshness, self-correction trigger rate Groundedness score, re-query rate
Cross-layer End-to-end latency, overall hallucination rate, human escalation rate p99 latency, hallucination rate, escalation rate

The llm_inference_log table is the single source of truth for all of these — one schema, one dashboard, one alert policy set.


💡 Key Takeaways

  • Log the full context, not just the output. Retrieved chunks, tool call inputs/outputs, and self-confidence scores are what make downstream eval possible.
  • Use LLM-as-judge for groundedness and coherence. Gemini evaluating Gemini outputs at scale is practical and cost-effective for production eval at retail volumes.
  • Oversample low-confidence inferences. Your eval budget is limited — concentrate it where failures are most likely.
  • Retrieval quality IS answer quality. The source_doc query above will tell you which documents are dragging your groundedness scores down. Fix the index before fine-tuning the model.
  • The eval pipeline is your fine-tuning dataset. Every flagged, human-reviewed inference is a supervised training example. Treat it that way from day one.

🚀 Where to Start

  1. Week 1: Instrument your Cloud Run inference handlers with structured logging using the schema above. Sink to BigQuery via Log Router.
  2. Week 2: Write the five dashboard queries. Stand up Looker Studio. You now have observability.
  3. Week 3: Add the nightly Vertex AI Pipeline for groundedness and hallucination scoring on a 200-inference sample.
  4. Week 4: Set up the three Cloud Monitoring alert policies. Add a human review queue for flagged inferences.
  5. Ongoing: Feed human-labeled inferences back into Vertex AI fine-tuning jobs monthly.

The schema is the hardest part. Get it right in Week 1 and everything else is additive.

Top comments (0)