DEV Community

agenthustler
agenthustler

Posted on

Data Quality in Web Scraping: Validation, Cleaning, and Deduplication

Scraping data is only half the battle. Raw scraped data is messy — missing fields, inconsistent formats, duplicates, and encoding issues are the norm. Without proper validation and cleaning, your scraped data is unreliable.

In this guide, I'll show you practical techniques for ensuring data quality in your scraping pipelines.

The Data Quality Problem

Typical issues in scraped data:

  • Missing fields: Product has no price, article has no author
  • Inconsistent formats: Dates as "Mar 9, 2026" vs "2026-03-09" vs "09/03/2026"
  • Duplicates: Same product scraped from multiple pages
  • Encoding issues: Mojibake characters, HTML entities in text
  • Type mismatches: Price as "$1,299.00" (string) instead of 1299.00 (float)
  • Stale data: Old cached pages mixed with fresh data

Step 1: Schema Validation with Pydantic

Define your data schema upfront and validate every record:

from pydantic import BaseModel, field_validator, HttpUrl
from datetime import datetime
from typing import Optional

class ScrapedProduct(BaseModel):
    name: str
    price: float
    currency: str = "USD"
    url: HttpUrl
    category: Optional[str] = None
    in_stock: bool = True
    scraped_at: datetime

    @field_validator("name")
    @classmethod
    def clean_name(cls, v):
        # Remove extra whitespace and newlines
        return " ".join(v.split()).strip()

    @field_validator("price")
    @classmethod
    def validate_price(cls, v):
        if v <= 0:
            raise ValueError("Price must be positive")
        if v > 1_000_000:
            raise ValueError("Price suspiciously high — likely a parsing error")
        return round(v, 2)

# Usage
try:
    product = ScrapedProduct(
        name="  Samsung Galaxy S25   Ultra  ",
        price=1299.99,
        url="https://example.com/products/galaxy-s25",
        scraped_at=datetime.now()
    )
    print(product.name)  # "Samsung Galaxy S25 Ultra" — cleaned!
except Exception as e:
    print(f"Validation failed: {e}")
Enter fullscreen mode Exit fullscreen mode

Step 2: Data Cleaning Functions

Build reusable cleaning utilities:

import re
import html
import unicodedata

def clean_text(text):
    """Clean scraped text: decode entities, normalize unicode, strip whitespace."""
    if not text:
        return ""

    # Decode HTML entities
    text = html.unescape(text)

    # Normalize unicode (fix mojibake)
    text = unicodedata.normalize("NFKD", text)

    # Remove zero-width characters
    text = re.sub(r"[\u200b\u200c\u200d\ufeff]", "", text)

    # Normalize whitespace
    text = " ".join(text.split())

    return text.strip()

def parse_price(price_str):
    """Extract numeric price from various formats."""
    if not price_str:
        return None

    # Remove currency symbols and whitespace
    cleaned = re.sub(r"[^\d.,]", "", price_str)

    # Handle European format (1.299,99 -> 1299.99)
    if "," in cleaned and "." in cleaned:
        if cleaned.rindex(",") > cleaned.rindex("."):
            cleaned = cleaned.replace(".", "").replace(",", ".")
        else:
            cleaned = cleaned.replace(",", "")
    elif "," in cleaned:
        # Could be decimal separator (9,99) or thousands (1,299)
        parts = cleaned.split(",")
        if len(parts[-1]) == 2:
            cleaned = cleaned.replace(",", ".")
        else:
            cleaned = cleaned.replace(",", "")

    try:
        return float(cleaned)
    except ValueError:
        return None

def normalize_date(date_str):
    """Parse dates from common web formats."""
    from dateutil import parser
    try:
        return parser.parse(date_str)
    except (ValueError, TypeError):
        return None

# Tests
print(parse_price("$1,299.00"))    # 1299.0
print(parse_price("1.299,99 EUR")) # 1299.99
print(parse_price("9,99"))          # 9.99
print(clean_text("  Hello&amp;  World\u200b  "))  # "Hello& World"
Enter fullscreen mode Exit fullscreen mode

Step 3: Deduplication

Duplicates waste storage and skew analysis. Use multiple strategies:

import hashlib
from collections import defaultdict

class Deduplicator:
    def __init__(self):
        self.seen_exact = set()      # Exact URL matches
        self.seen_fuzzy = set()      # Content-based fingerprints
        self.stats = defaultdict(int)

    def _content_fingerprint(self, record):
        """Create a fingerprint from key fields."""
        key_fields = f"{record.get('name', '')}{record.get('price', '')}"
        return hashlib.md5(key_fields.lower().encode()).hexdigest()

    def is_duplicate(self, record):
        """Check if record is a duplicate."""
        # Check 1: Exact URL match
        url = record.get("url", "")
        if url in self.seen_exact:
            self.stats["url_duplicates"] += 1
            return True
        self.seen_exact.add(url)

        # Check 2: Content fingerprint
        fingerprint = self._content_fingerprint(record)
        if fingerprint in self.seen_fuzzy:
            self.stats["content_duplicates"] += 1
            return True
        self.seen_fuzzy.add(fingerprint)

        self.stats["unique"] += 1
        return False

    def deduplicate(self, records):
        """Filter out duplicates from a list of records."""
        unique = []
        for record in records:
            if not self.is_duplicate(record):
                unique.append(record)
        return unique

    def report(self):
        print(f"Unique records: {self.stats['unique']}")
        print(f"URL duplicates removed: {self.stats['url_duplicates']}")
        print(f"Content duplicates removed: {self.stats['content_duplicates']}")

# Usage
dedup = Deduplicator()
raw_data = [
    {"name": "Widget A", "price": 9.99, "url": "https://example.com/a"},
    {"name": "Widget A", "price": 9.99, "url": "https://example.com/a"},  # URL dup
    {"name": "widget a", "price": 9.99, "url": "https://example.com/a2"},  # Content dup
    {"name": "Widget B", "price": 19.99, "url": "https://example.com/b"},
]

clean_data = dedup.deduplicate(raw_data)
dedup.report()
Enter fullscreen mode Exit fullscreen mode

Step 4: Building a Validation Pipeline

Combine everything into a reusable pipeline:

from pydantic import ValidationError
import json
from pathlib import Path

class DataPipeline:
    def __init__(self, schema_class):
        self.schema = schema_class
        self.dedup = Deduplicator()
        self.valid_records = []
        self.errors = []

    def process(self, raw_records):
        for i, raw in enumerate(raw_records):
            # Step 1: Clean
            cleaned = self._clean(raw)

            # Step 2: Validate
            try:
                validated = self.schema(**cleaned)
                record = validated.model_dump()
            except ValidationError as e:
                self.errors.append({"index": i, "raw": raw, "error": str(e)})
                continue

            # Step 3: Deduplicate
            if not self.dedup.is_duplicate(record):
                self.valid_records.append(record)

        return self.valid_records

    def _clean(self, raw):
        cleaned = {}
        for key, value in raw.items():
            if isinstance(value, str):
                cleaned[key] = clean_text(value)
            else:
                cleaned[key] = value

        # Parse price if it's a string
        if "price" in cleaned and isinstance(cleaned["price"], str):
            cleaned["price"] = parse_price(cleaned["price"])

        return cleaned

    def save(self, filepath):
        # Convert datetime objects to strings for JSON
        serializable = []
        for r in self.valid_records:
            record = {}
            for k, v in r.items():
                if hasattr(v, "isoformat"):
                    record[k] = v.isoformat()
                elif hasattr(v, "__str__") and not isinstance(v, (str, int, float, bool)):
                    record[k] = str(v)
                else:
                    record[k] = v
            serializable.append(record)
        Path(filepath).write_text(json.dumps(serializable, indent=2))

    def report(self):
        total = len(self.valid_records) + len(self.errors)
        print(f"Total processed: {total}")
        print(f"Valid records: {len(self.valid_records)}")
        print(f"Validation errors: {len(self.errors)}")
        self.dedup.report()
        if self.errors:
            print(f"\nSample errors:")
            for err in self.errors[:3]:
                print(f"  Record {err['index']}: {err['error'][:100]}")

# Usage
pipeline = DataPipeline(ScrapedProduct)
clean_data = pipeline.process(raw_scraped_data)
pipeline.report()
pipeline.save("clean_products.json")
Enter fullscreen mode Exit fullscreen mode

Monitoring Data Quality Over Time

def quality_score(records):
    """Calculate a data quality score (0-100)."""
    if not records:
        return 0

    total_fields = 0
    filled_fields = 0

    for record in records:
        for key, value in record.items():
            total_fields += 1
            if value is not None and value != "" and value != 0:
                filled_fields += 1

    completeness = (filled_fields / total_fields) * 100 if total_fields else 0
    return round(completeness, 1)

score = quality_score(clean_data)
print(f"Data quality score: {score}%")
Enter fullscreen mode Exit fullscreen mode

Integrating with Scraping Infrastructure

For production scraping pipelines, ScrapeOps provides monitoring dashboards that track your scraper performance, success rates, and data quality metrics over time — essential for catching data quality regressions before they corrupt your dataset.

Conclusion

Data quality is what separates a hobby scraping script from a production pipeline. Use Pydantic for schema validation, build reusable cleaning functions, implement deduplication at the fingerprint level, and monitor quality scores over time. Your downstream analysis is only as good as your data.

Happy scraping!

Top comments (0)