DEV Community

Cover image for How To Validate Any API Response with Great Expectations (GX)
Prithwish Nath
Prithwish Nath

Posted on • Originally published at Medium

How To Validate Any API Response with Great Expectations (GX)

A lot of my data analysis work is forensic (example here). I’ll pull repeated snapshots — content, traffic, SERPs, metadata — and look at patterns across hundreds of these ingestions. So naturally, bad batches will skew results in ways that are just convincing enough so I don’t catch it. Disastrous.

You can add try-catches, but a 200 OK from your API with empty titles and duplicate rows is still considered a success on the wire unless you model validation as its own failure path. So is there a better way?

This is exactly the gap Great Expectations (GX Core) fills. It’s an open-source Python library for defining declarative quality rules on your data — “quality gates”, explicit rules your data must pass before it’s trusted things like “this field must never be null”, “these IDs must be unique”, or “this value must fall within a known range”. You basically codify what “good” looks like, run it as a validation step in your pipeline, and get a clear pass/fail on every batch — before bad data ever touches your analysis.

Let’s turn that idea into code — we’ll get data at scale via Bright Data (a SERP API), run a GX suite over each batch, then store it in DuckDB (a file-backed analytical SQL database that’s just a .duckdb file on disk) clean rows to the main table, failed batches to quarantine with enough context to debug.

The Setup

Here’s what you need:

requests>=2.28.0  
python-dotenv>=1.0.0  
duckdb>=1.0.0  
pandas>=2.0.0  
psutil>=5.9.0  
great-expectations>=1.0.0
Enter fullscreen mode Exit fullscreen mode

Note that Python 3.10+ is the floor for Great Expectations 1.x with current pandas / duckdb wheels.

The important ones:

  • great-expectations — the quality gates: expectations on each batch, clear pass/fail and reports. All we need is GX Core.
  • duckdb — Doesn't need a server to run.
  • requests —For HTTP to Bright Data’s POST /request endpoint (your zone must be a SERP zone).

Before running pip install -r requirements.txt, create a .env file with:

BRIGHT_DATA_API_KEY=your_api_key  
BRIGHT_DATA_ZONE=serp # or your SERP zone name from the Bright Data dashboard  
BRIGHT_DATA_COUNTRY=us # optional
Enter fullscreen mode Exit fullscreen mode

The Bright Data client reads these when you run the pipeline. You need a SERP-capable zone — without one, POST /request will not match what this code expects. Proxy rotation and unblocking stay on Bright Data’s side.

How Does the Pipeline Actually Work?

This diagram should make it clear.


bright_data.py # Bright Data API client  
serp_expectations.py # Great Expectations validation suite  
duckdb_store.py # DuckDB schema + insert/quarantine logic  
ingest.py # Pipeline: fetch → validate → store or quarantine
Enter fullscreen mode Exit fullscreen mode

The flow is straightforward: one query equals one batch. For each batch, we check things in a strict order before anything touches the database.

Step 1: Fetching Structured SERP Data with Bright Data

Our API client will just be a thin wrapper around Bright Data’s POST /request endpoint.

bright_data.py

import json  
import os  
from dataclasses import dataclass  
from pathlib import Path  

import requests  
from dotenv import load_dotenv  

_GX_ROOT = Path(__file__).resolve().parent  
load_dotenv(_GX_ROOT / ".env")  


def normalize_serp_payload(raw):  
    """Bright Data may wrap JSON in a string body."""  
    if isinstance(raw, dict) and "body" in raw:  
        body = raw["body"]  
        if isinstance(body, str):  
            return json.loads(body)  
        if isinstance(body, dict):  
            return body  
    return raw  


@dataclass  
class SerpApiResponse:  
    """Bright Data POST /request: HTTP status plus parsed JSON body (if any)."""  

    status_code: int  
    data: dict  


class BrightDataClient:  
    """SERP API zone client; uses https://api.brightdata.com/request"""  

    def __init__(self, api_key=None, zone=None, country=None):  
        self.api_key = api_key or os.getenv("BRIGHT_DATA_API_KEY")  
        self.zone = zone or os.getenv("BRIGHT_DATA_ZONE")  
        self.country = country or os.getenv("BRIGHT_DATA_COUNTRY")  
        self.api_endpoint = "https://api.brightdata.com/request"  

        if not self.api_key:  
            raise ValueError("BRIGHT_DATA_API_KEY must be set in the environment or constructor.")  
        if not self.zone:  
            raise ValueError("BRIGHT_DATA_ZONE must be set in the environment or constructor.")  

        self.session = requests.Session()  
        self.session.headers.update(  
            {  
                "Content-Type": "application/json",  
                "Authorization": f"Bearer {self.api_key}",  
            }  
        )  

    def search(self, query, num_results=10, language=None, country=None):  
        """Raises on non-200 or network error."""  
        serp = self.search_with_status(query, num_results, language, country)  
        if serp.status_code != 200:  
            raise RuntimeError(  
                f"Search request failed with HTTP {serp.status_code}: {serp.data!r}"[:500]  
            )  
        return normalize_serp_payload(serp.data)  

    def search_with_status(self, query, num_results=10, language=None, country=None):  
        """Does not raise on non-200 — for ingest + quarantine routing."""  
        search_url = (  
            f"https://www.google.com/search"  
            f"?q={requests.utils.quote(query)}"  
            f"&num={num_results}"  
            f"&brd_json=1"  
        )  
        if language:  
            search_url += f"&hl={language}&lr=lang_{language}"  

        target_country = country or self.country  
        payload = {"zone": self.zone, "url": search_url, "format": "json"}  
        if target_country:  
            payload["country"] = target_country  

        try:  
            response = self.session.post(self.api_endpoint, json=payload, timeout=30)  
        except requests.exceptions.RequestException as e:  
            return SerpApiResponse(status_code=0, data={"_error": str(e)})  

        data = _parse_response_body(response)  
        return SerpApiResponse(status_code=response.status_code, data=data)  


def _parse_response_body(response):  
    if not response.text:  
        return {}  
    try:  
        return response.json()  
    except ValueError:  
        return {"_non_json_body": response.text[:8000]}
Enter fullscreen mode Exit fullscreen mode

Some things to note here:

  • The search_with_status does not raise on non-200 so ingest can quarantine API failures
  • Transport errors are caught so you get status_code=0 and the same api_error path as HTTP failures.
  • Also, search() raises on failure for callers that want exceptions.
  • And finally,normalize_serp_payload unwraps responses just in case your API puts JSON under a string body.

Step 2: What Happens to Data That Doesn’t Pass?

Every batch in this pipeline has two possible destinations in DuckDB:

  • If it clears all our defined quality gates, it goes to serp_results.
  • If it doesn't — wrong HTTP status, empty organic list, or a failed GX expectation — it lands in serp_quarantine instead, with enough context to understand why.

Let's build that store before we wire up the gates.

duckdb_store.py

import hashlib  
import json  
import os  
from datetime import datetime  

import duckdb  
import pandas as pd  
import psutil  


class SerpStore:  
    """serp_results + serp_quarantine"""  

    def __init__(self, db_path, memory_limit=None):  
        parent = os.path.dirname(db_path)  
        if parent:  
            os.makedirs(parent, exist_ok=True)  

        self.db_path = db_path  
        self.conn = duckdb.connect(db_path)  

        if memory_limit:  
            self.conn.execute(f"SET memory_limit='{memory_limit}'")  
        else:  
            available_memory = psutil.virtual_memory().available  
            memory_gb = int(available_memory / (1024**3) * 0.8)  
            self.conn.execute(f"SET memory_limit='{memory_gb}GB'")  

        self._create_schema()  

    def _create_schema(self):  
        self.conn.execute("""  
            CREATE TABLE IF NOT EXISTS serp_results (  
                id BIGINT PRIMARY KEY,  
                query TEXT NOT NULL,  
                timestamp TIMESTAMP NOT NULL,  
                result_position INTEGER NOT NULL,  
                title TEXT,  
                url TEXT,  
                snippet TEXT,  
                domain TEXT,  
                rank INTEGER,  
                previous_rank INTEGER,  
                rank_delta INTEGER  
            )  
        """)  
        self.conn.execute("CREATE INDEX IF NOT EXISTS idx_query ON serp_results(query)")  
        self.conn.execute("CREATE INDEX IF NOT EXISTS idx_domain ON serp_results(domain)")  

        self.conn.execute("""  
            CREATE TABLE IF NOT EXISTS serp_quarantine (  
                query TEXT,  
                timestamp TIMESTAMP,  
                reason TEXT,  
                organic_count INTEGER,  
                payload_hash TEXT,  
                http_status INTEGER,  
                raw_json JSON  
            )  
        """)  

    @staticmethod  
    def _payload_hash(payload):  
        canonical = json.dumps(payload, sort_keys=True, default=str)  
        return hashlib.sha256(canonical.encode()).hexdigest()  

    def insert_quarantine(        self,  
        query,  
        reason,  
        organic_count,  
        raw_json,  
        http_status,  
        timestamp=None,    ):  
        if timestamp is None:  
            timestamp = datetime.now()  
        payload_hash = self._payload_hash(raw_json)  
        self.conn.execute(  
            """  
            INSERT INTO serp_quarantine  
                (query, timestamp, reason, organic_count, payload_hash, http_status, raw_json)  
            VALUES (?, ?, ?, ?, ?, ?, ?)  
            """,  
            [ 
                query,  
                timestamp,  
                reason,  
                organic_count,  
                payload_hash,  
                http_status,  
                json.dumps(raw_json, default=str),  
            ],  
        )  

    def insert_batch(self, vdf, query, timestamp=None):  
        """Insert from the GX validation frame (output of organic_to_validation_df).  

        Store what we validated: normalized url, derived domain, snippet, and positional rank —  
        not a second parse from raw organic dicts (avoids drift vs Great Expectations).  
        """  
        if timestamp is None:  
            timestamp = datetime.now()  
        if vdf.empty:  
            return  

        max_id_result = self.conn.execute("SELECT COALESCE(MAX(id), 0) FROM serp_results").fetchone()  
        next_id = (max_id_result[0] if max_id_result else 0) + 1  

        rows = []  
        for idx, (_, row) in enumerate(vdf.iterrows()):  
            title = "" if pd.isna(row["title"]) else str(row["title"])  
            url = "" if pd.isna(row["url"]) else str(row["url"])  
            snippet = "" if pd.isna(row["snippet"]) else str(row["snippet"])  
            domain = "" if pd.isna(row["domain"]) else str(row["domain"])  
            rank_val = int(row["rank"])  
            rows.append(  
                {  
                    "id": next_id + idx,  
                    "query": query,  
                    "timestamp": timestamp,  
                    "result_position": idx + 1,  
                    "title": title,  
                    "url": url,  
                    "snippet": snippet,  
                    "domain": domain,  
                    "rank": rank_val,  
                    "previous_rank": None,  
                    "rank_delta": None,  
                }  
            )  

        df = pd.DataFrame(rows)  
        self.conn.execute("""  
            INSERT INTO serp_results (id, query, timestamp, result_position, title, url, snippet, domain, rank, previous_rank, rank_delta)  
            SELECT id, query, timestamp, result_position, title, url, snippet, domain, rank, previous_rank, rank_delta FROM df  
        """)  

    def get_row_count(self):  
        result = self.conn.execute("SELECT COUNT(*) FROM serp_results").fetchone()  
        return result[0] if result else 0  

    def close(self):  
        self.conn.close()  

    def __enter__(self):  
        return self  

    def __exit__(self, *args):  
        self.close()
Enter fullscreen mode Exit fullscreen mode

What’s happening here?

  • The reason field is an enum-like string: api_error, organic_empty, or validation_failed.
  • The payload_hash is a SHA-256 of the canonical JSON — so you can tell if the same bad payload keeps coming back (very useful!)
  • Each row stores a JSON blob in raw_json: for api_error and organic_empty it is the normalized SERP dict; for validation_failed it is {"serp":[normalized serp here], "gx_validation":[GX result dict here]} so you still have both the SERP payload and the failing expectation details. Every bad batch lands in serp_quarantine with a paper trail instead of silently disappearing or, worse, silently making it into serp_results. When a batch passes all gates, insert_batch writes one row per organic result from the validated DataFrame — same normalized fields GX validated. previous_rank / rank_delta are reserved for later rank-tracking; they stay null on first ingest.

Step 3: The Gate Order — Why Sequence Matters

Bright Data's parsed SERP JSON path (request URL includes brd_json=1 or, in their docs, brd_json=json) returns the same kind of structured object you see in their examples and in real dumps — organic, general, input, and so on — not a page of HTML you parse yourself.

So when Google can't be reached or nothing usable is extracted, that tends to show up as a non-200 from POST /request, a network error (we treat as api_error), or an empty organic array —and not as a captcha HTML document mistaken for a normal organic list. So the gates to implement are exactly three:

  • non-200,
  • empty organic,
  • then GX on whatever is left.

ingest.py

import json  
from datetime import datetime, timezone  
from pathlib import Path  

_ROOT = Path(__file__).resolve().parent  

from bright_data import BrightDataClient, normalize_serp_payload  
from duckdb_store import SerpStore  
from serp_expectations import organic_to_validation_df, validate_organic_batch

def _write_validation_report(report_entries):  
    reports_dir = _ROOT / "data" / "reports"  
    reports_dir.mkdir(parents=True, exist_ok=True)  
    ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")  
    path = reports_dir / f"validation_{ts}.json"  
    doc = {  
        "generated_at_utc": ts,  
        "batches": report_entries,  
    }  
    path.write_text(json.dumps(doc, indent=2, default=str), encoding="utf-8")  
    return path  


def ingest_live(    queries,  
    *,  
    db_path=None,  
    num_results=10,  
    memory_limit="2GB",  
    serp_json_out=None,):  
    """  
    One batch = one query → one SERP response.  
    Order: api_error → organic_empty → GX validate → insert or validation_failed quarantine.  
    """  
    path = db_path or str(_ROOT / "data" / "serp_gx.duckdb")  
    client = BrightDataClient()  
    total = 0  
    batches_total = 0  
    empty_batches = 0  
    api_error_batches = 0  
    validation_failed_batches = 0  
    report_entries = []  
    serp_dump_batches = []  

    with SerpStore(path, memory_limit=memory_limit) as store:  
        for q in queries:  
            batches_total += 1  
            serp = client.search_with_status(q, num_results=num_results)  
            raw = normalize_serp_payload(serp.data)  
            if serp_json_out:  
                serp_dump_batches.append(  
                    {  
                        "query": q,  
                        "http_status": serp.status_code,  
                        "normalized_serp": raw,  
                    }  
                )  
            organic = raw.get("organic") or []  
            organic_count = len(organic)  

            # Optional (not implemented): best-effort catch-all for an explicit blocked API response  
            # (e.g. substring-match json.dumps(raw) for "captcha" / "unusual traffic") — just in case.  
            # Not needed here, only include if the API you're using can error out like that  

            if serp.status_code != 200:  
                api_error_batches += 1  
                store.insert_quarantine(q, "api_error", organic_count, raw, serp.status_code)  
                report_entries.append(  
                    {  
                        "query": q,  
                        "outcome": "api_error",  
                        "http_status": serp.status_code,  
                        "organic_count": organic_count,  
                        "gx": None,  
                    }  
                )  
                continue  
            if not organic:  
                empty_batches += 1  
                store.insert_quarantine(q, "organic_empty", 0, raw, serp.status_code)  
                report_entries.append(  
                    {  
                        "query": q,  
                        "outcome": "organic_empty",  
                        "http_status": serp.status_code,  
                        "organic_count": 0,  
                        "gx": None,  
                    }  
                )  
                continue  

            vdf = organic_to_validation_df(organic)  
            gx_ok, gx_payload = validate_organic_batch(vdf, num_results=num_results)  
            if not gx_ok:  
                validation_failed_batches += 1  
                store.insert_quarantine(  
                    q,  
                    "validation_failed",  
                    organic_count,  
                    {"serp": raw, "gx_validation": gx_payload},  
                    serp.status_code,  
                )  
                report_entries.append(  
                    {  
                        "query": q,  
                        "outcome": "validation_failed",  
                        "http_status": serp.status_code,  
                        "organic_count": organic_count,  
                        "gx": gx_payload,  
                    }  
                )  
                continue  

            # Persist the validated DataFrame so DuckDB gets the same normalized url/domain/rank GX checked.  
            store.insert_batch(vdf, q)  
            total += len(vdf)  
            report_entries.append(  
                {  
                    "query": q,  
                    "outcome": "inserted",  
                    "http_status": serp.status_code,  
                    "organic_count": organic_count,  
                    "gx": gx_payload,  
                }  
            )  

        row_count = store.get_row_count()  

    report_path = _write_validation_report(report_entries)  

    stats = {  
        "batches_total": batches_total,  
        "empty_batches": empty_batches,  
        "api_error_batches": api_error_batches,  
        "validation_failed_batches": validation_failed_batches,  
        "empty_rate": (empty_batches / batches_total) if batches_total else 0.0,  
        "api_error_rate": (api_error_batches / batches_total) if batches_total else 0.0,  
        "validation_failed_rate": (validation_failed_batches / batches_total)  
        if batches_total  
        else 0.0,  
        "rows_ingested": total,  
        "serp_results_row_count": row_count,  
        "db_path": path,  
        "validation_report_path": str(report_path),  
    }  
    if serp_json_out:  
        out_path = Path(serp_json_out)  
        out_path.parent.mkdir(parents=True, exist_ok=True)  
        ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")  
        doc = {  
            "generated_at_utc": ts,  
            "num_results": num_results,  
            "batches": serp_dump_batches,  
        }  
        out_path.write_text(json.dumps(doc, indent=2, default=str), encoding="utf-8")  
        stats["serp_json_path"] = str(out_path.resolve())  

    return stats  


def main():  
    import argparse  

    p = argparse.ArgumentParser(  
        description="gx: Bright Data SERP → Great Expectations → DuckDB (fail fast on GX failure)",  
    )  
    p.add_argument(  
        "queries",  
        nargs="*",  
        default=[ 
            "inference engineering",  
            "python duckdb",  
            "Great Expectations data validation",  
            "machine learning",  
            "opentelemetry how to",  
        ],  
        help\="Search queries (default: five example queries if none given)",  
    )  
    p.add_argument("--num-results", type\=int, default=10)  
    p.add_argument("--db", type\=str, default=None, help\="DuckDB file path")  
    p.add_argument(  
        "--save-serp-json",  
        type\=str,  
        default=None,  
        metavar="PATH",  
        help\="Write normalized SERP (+ query, http_status) per batch to this JSON file.",  
    )  
    args = p.parse_args()  
    out = ingest_live(  
        args.queries,  
        db_path=args.db,  
        num_results=args.num_results,  
        serp_json_out=args.save_serp_json,  
    )  
    print(json.dumps(out, indent=2))  


if __name__ == "__main__":  
    main()
Enter fullscreen mode Exit fullscreen mode

The main loop enforces that order we talked about: api_error and organic_empty are cheap pre-checks that catch the most common failure modes without spinning up GX at all.

In fact, GX only runs when there's actual data to validate. store.insert_batch(vdf, q) passes the validated DataFrame, not the original raw organic list — what goes into DuckDB is exactly what GX checked, with the same URL normalization applied.

Step 4: What Quality Gates Should You Put on SERP Data?

To reiterate, when we say “Quality gates”, we mean explicit Great Expectations rules on the DataFrame we validate: each “expectation” is simply one condition the batch must satisfy before you trust it for analytics.

For this pipeline we normalize the organiclist in our API response into a small schema — url, title, snippet, domain, and rank — and run the suite on that frame only.

serp_expectations.py

import uuid  
from urllib.parse import urlparse  

import great_expectations as gx  
import pandas as pd  
from great_expectations.data_context.types.base import ProgressBarsConfig  


# helpers   
def _extract_domain(url):  
    if not url:  
        return ""  
    return urlparse(url).netloc.replace("www.", "")  


def _coerce_rank(raw_rank, fallback):  
    if raw_rank is None:  
        return fallback  
    try:  
        return int(float(raw_rank))  
    except (TypeError, ValueError):  
        return fallback  


def _normalize_url(url):  
    if not url:  
        return url  
    try:  
        parsed = urlparse(url)  
        normalised = parsed._replace(  
            scheme=parsed.scheme.lower(),  
            fragment="",  
        )  
        return normalised.geturl()  
    except Exception:  
        return url  

# before anything, normalize the data  
def organic_to_validation_df(organic):  
    rows = []  
    for i, r in enumerate(organic):  
        raw_url = (r.get("url") or r.get("link") or "").strip()  
        url = _normalize_url(raw_url)  
        title = (r.get("title") or "").strip()  
        snippet = (r.get("snippet") or r.get("description") or "").strip()  
        domain = _extract_domain(url)  
        raw_rank = r.get("rank")  
        rank = _coerce_rank(raw_rank, fallback=i + 1)  
        rows.append(  
            {  
                "url": url,  
                "title": title,  
                "snippet": snippet,  
                "domain": domain,  
                "rank": rank,  
            }  
        )  
    return pd.DataFrame(rows)  

# now add the actual expectations  
def build_serp_expectation_suite(num_results):  
    _meta = {"notes": "SERP organic batch gates for Bright Data brd_json=1 payloads."}  

    return gx.ExpectationSuite(  
        name="serp_organic_batch",  
        expectations=[ 
            gx.expectations.ExpectColumnValuesToNotBeNull(  
                column="url",  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValuesToNotBeNull(  
                column="title",  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValuesToMatchRegex(  
                column="url",  
                regex=r"^https?://\\S+",  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValuesToBeUnique(  
                column="url",  
                meta=_meta,  
            ),  
            gx.expectations.ExpectTableRowCountToBeBetween(  
                min_value=1,  
                max_value=num_results,  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValueLengthsToBeBetween(  
                column="title",  
                min_value=1,  
                max_value=500,  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValuesToMatchRegex(  
                column="title",  
                regex=r"\\S",  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValueLengthsToBeBetween(  
                column="domain",  
                min_value=1,  
                max_value=253,  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValuesToBeBetween(  
                column="rank",  
                min_value=1,  
                max_value=float(num_results),  
                meta=_meta,  
            ),  
            gx.expectations.ExpectColumnValuesToNotBeNull(  
                column="snippet",  
                mostly=0.8,  
                meta=_meta,  
            ),  
        ],  
    )  

# gx in ephemeral mode  
def validate_organic_batch(df, *, num_results):  
    if df.empty:  
        return False, {  
            "success": False,  
            "statistics": {  
                "evaluated_expectations": 0,  
                "successful_expectations": 0,  
                "unsuccessful_expectations": 0,  
            },  
            "results": [],  
            "exception_message": "validation_frame_empty",  
        }  

    context = gx.get_context(mode="ephemeral")  
    context.variables.progress_bars = ProgressBarsConfig(  
        globally=False,  
        metric_calculations=False,  
    )  

    data_source = context.data_sources.add_pandas(f"serp_{uuid.uuid4().hex[:12]}")  
    asset = data_source.add_dataframe_asset(name="organic_batch")  
    batch_definition = asset.add_batch_definition_whole_dataframe("whole")  
    batch = batch_definition.get_batch(batch_parameters={"dataframe": df})  
    suite = build_serp_expectation_suite(num_results=num_results)  
    result = batch.validate(suite)  
    payload = result.to_json_dict()  
    return bool(result.success), payload
Enter fullscreen mode Exit fullscreen mode

Let’s walk through the decisions we made here, because they’re not arbitrary.

Normalize Before You Validate — Never Inside the Validation Layer

Before anything in GX runs, organic_to_validation_df normalizes the data:

def _normalize_url(url: str) -> str:  
    """Lowercase scheme, strip fragment."""  
    parsed = urlparse(url)  
    return parsed._replace(scheme=parsed.scheme.lower(), fragment="").geturl()  
def organic_to_validation_df(organic: List[Dict[str, Any]]) -> pd.DataFrame:  
    rows = []  
    for i, r in enumerate(organic):  
        raw_url = (r.get("url") or r.get("link") or "").strip()  
        url = _normalize_url(raw_url)  
        title = (r.get("title") or "").strip()  
        snippet = (r.get("snippet") or r.get("description") or "").strip()  
        domain = _extract_domain(url)  
        rank = _coerce_rank(r.get("rank"), fallback=i + 1)  
        rows.append({"url": url, "title": title, "snippet": snippet, "domain": domain, "rank": rank})  
    return pd.DataFrame(rows)
Enter fullscreen mode Exit fullscreen mode

A few things happening here:

  • Bright Data’s brd_json=1 responses use link as the key, not url. We normalize this before GX sees it. You’ve probably used such remappings plenty of times, for most API responses.
  • Fragments are stripped and scheme is lowercased before the uniqueness check — so https://example.com/page#section and https://example.com/page don't pass as different URLs.
  • _coerce_rank handles the fact that rank values from APIs can come back as int, float, numpy scalars, or even strings. Coerce to a consistent type before GX, not inside an expectation.

Ephemeral mode

The [mode="ephemeral"] simply means no config files, no persisted context directory, and no great_expectations.yml — a pure in-process validator you spin up per batch. Progress bars are turned off in code when batching many runs. We do this to make GX lightweight and easier to get started with.

Null Checks Must Come Before Regex Gate

ExpectColumnValuesToNotBeNull on url and title must come before the regex and length gates. That's because GX's ExpectColumnValuesToMatchRegex silently skips null values by default — it only evaluates non-null rows. Without an explicit null check, a null URL would slip through the regex gate undetected.

Why the URL Regex Is Intentionally Loose

^https?://\S+ is deliberately not a strict RFC 3986 URL validator. Overly strict URL validation generates false positives on legitimately unusual URLs — CDN URLs, tracking URLs, URLs with encoded characters. The goal here is to catch the two actual failure modes: an empty string and a non-URL value like "N/A" or a relative path.

URL Uniqueness Catches Parse Artifacts

ExpectColumnValuesToBeUnique on url catches cases where the same URL appears twice in a single batch. That's a real thing that can happen with pagination artifacts or certain response formats. In a ranking pipeline, a duplicate URL means your rank counts are wrong.

Use a Fixed Ceiling for Rank and Row Count, Not a Self-Adjusting One

Notice that max_value=float(num_results) for the rank gate, and max_value=num_results for the row count, both use the same value — the same number we used for our SERP API. This is intentional and it matters.

A common mistake is to derive the ceiling from the batch itself:

# Don't do this  
max_rank = max(num_results, int(float(vdf["rank"].max())))
Enter fullscreen mode Exit fullscreen mode

Why? Well, if the API returns a result with rank=47 for a 10-result query, max_rank becomes 47 and the gate passes. You've made the gate self-adjust to whatever the data says, which means the gate can never actually fail on an out-of-range rank. Using a fixed num_results means an unexpected rank value will actually trigger a quarantine.

Optional Fields Need Soft Gates, Not Hard Ones

snippet uses mostly=0.8 rather than a hard non-null gate. That's because Google legitimately suppresses snippets for some result types — videos, certain knowledge panel entries, sitelinks. A hard non-null gate on snippet would quarantine perfectly valid batches. The mostly parameter lets you say "80% of rows must pass this check" — if more than 20% of rows have no snippet, that's a signal the response structure has changed, not normal Google behaviour.

What Does a Failed GX Validation Actually Look Like?

When a batch fails validation, the gx block in the report tells you exactly which expectation failed and what the unexpected values were. Here's a representative example — two organic rows end up with the same normalized URL (duplicate rows in the response, or two URLs that collapse to one after fragment stripping), so the uniqueness gate fires:

{  
  "query": "inference engineering",  
  "outcome": "validation_failed",  
  "http_status": 200,  
  "organic_count": 2,  
  "gx": {  
    "success": false,  
    "statistics": {  
      "evaluated_expectations": 10,  
      "successful_expectations": 9,  
      "unsuccessful_expectations": 1  
    },  
    "results": [ 
      {  
        "success": false,  
        "expectation_config": {  
          "type": "expect_column_values_to_be_unique",  
          "kwargs": { "column": "url" }  
        },  
        "result": {  
          "element_count": 2,  
          "unexpected_count": 2,  
          "unexpected_percent": 100.0,  
          "partial_unexpected_list": [ 
            "https://example.com/page?q=1",  
            "https://example.com/page?q=1"  
          ]  
        }  
      }  
    ]  
  }  
}
Enter fullscreen mode Exit fullscreen mode

The http_status is 200. Without the quality gate, this batch would have been inserted. The downstream join on URL would have silently doubled a result's apparent frequency in your analytics.

The Validation Report

After every run, we emit a timestamped JSON report with one entry per query:

{  
  "generated_at_utc": "20260326T102005Z",  
  "batches": [ 
    {  
      "query": "inference engineering",  
      "outcome": "inserted",  
      "http_status": 200,  
      "organic_count": 8,  
      "gx": { "success": true, "statistics": { "evaluated_expectations": 10, ... } }  
    },  
    {  
      "query": "some broken query",  
      "outcome": "organic_empty",  
      "http_status": 200,  
      "organic_count": 0,  
      "gx": null  
    }  
  ]  
}
Enter fullscreen mode Exit fullscreen mode

The possible outcome values are inserted, api_error, organic_empty, and validation_failed. Over time, watching the rate of each outcome per query is how you'd catch gradual degradation — a rising organic_empty rate is a signal before it becomes a crisis.

How to Run the Pipeline

Put the four modules in one package or folder on your PYTHONPATH, install dependencies (pip install -r requirements.txt), and set Bright Data credentials (BRIGHT_DATA_API_KEY, BRIGHT_DATA_ZONE, plus optional BRIGHT_DATA_COUNTRY) in a .env file next to the code or in the environment.

Our orchestrator uses argparse. So here’s some useful flags worth including, mostly for quality-of-life:

  • --num-results (default 10),
  • --db (override the DuckDB file path),
  • and optionally, I’ve found using a --save-serp-json PATH to dump normalized SERP payloads while tuning your rules really helps.
# One query  
python ingest.py "inference engineering"  

# Multiple queries (defaults to five example queries if you pass none)  
python ingest.py  

# Example: 20 results, custom DB path  
python ingest.py "python asyncio" --num-results 20 --db ./data/my_run.duckdb
Enter fullscreen mode Exit fullscreen mode

On success, the script prints a small JSON summary — batch counts, rows written, paths to the DuckDB file and the validation report:

{  
  "batches_total": 1,  
  "empty_batches": 0,  
  "api_error_batches": 0,  
  "validation_failed_batches": 0,  
  "empty_rate": 0.0,  
  "api_error_rate": 0.0,  
  "validation_failed_rate": 0.0,  
  "rows_ingested": 8,  
  "serp_results_row_count": 8,  
  "db_path": "./data/serp_gx.duckdb",  
  "validation_report_path": "./data/reports/validation_20260326T102005Z.json"  
}
Enter fullscreen mode Exit fullscreen mode

That’s pretty much everything. I’ve walked you through each file. Just know ingest.py is the entry point.

Frequently Asked Questions (FAQ)

Q: Why not just write the validation logic myself?

A: You can, and for one or two checks, you probably should! The case for GX is not that something likeif not url or not url.startswith("http") is hard to write — it's that twenty of those checks scattered across your pipeline become hard to read, hard to audit, and easy to accidentally skip when you're in a hurry. GX gives you a single place where all your rules live, a consistent result structure across every check, and a report that tells you exactly which rule failed and on which values. It saves a ton of trouble in the long run.

Q: Does running GX on every batch slow the pipeline down?

A: In practice, no — not at the batch sizes typical of API calls (let’s say 10–50 rows per query). To make double sure, this is why I run GX in Ephemeral context mode (mode=”ephemeral”) — that avoids any file I/O or context persistence overhead. The validation itself is pandas operations under the hood anyway.

Q: What happens to bad data in a pipeline without validation?

A: It gets inserted and you don’t find out until something downstream breaks — or worse, until it produces wrong answers that are just plausible enough to go unnoticed. A 200 OK with duplicate rows, empty titles, or out-of-range values looks like a success on the wire. Without an explicit quality gate, that batch goes straight into your database and silently skews every query that touches it.

Q: How do I know what rules to write for my own data?

A: Sample first, then write rules. Pull a handful of real responses from your source API, inspect the fields and edge cases, then encode what “good” looks like. Rules written speculatively against an imagined schema generate false positives; rules written against real samples catch actual failure modes. Start with the fields you’d join on or aggregate in your analytics, and only add gates for everything else if you’ve seen it break.

What GX + Bright Data Gives You

So why does this pattern work reliably?

  • Bright Data handles the upstream layer. Proxy rotation, bot detection, structured extraction — all abstracted behind a single API call that returns structured JSON.
  • Great Expectations handles downstream trust. It doesn’t fix bad data. It measures whether incoming data meets your rules before you trust it with your analytics.
  • The quarantine table (in DuckDB) gives you an audit trail. Not just “something went wrong,” but what the payload was, which expectations failed, and when. You can query the quarantine table to understand your pipeline’s health over time.

Web pipelines that don’t have explicit quality gates just gradually produce wrong answers. Catching that before it reaches your database is the whole point.

Top comments (0)