DEV Community

Cover image for The Invisible War: Why Your Automation Fails (and How I Build Systems That Don't)
Adilet Akmatov
Adilet Akmatov

Posted on

The Invisible War: Why Your Automation Fails (and How I Build Systems That Don't)

The Invisible War: A Complete Engineering Reference for Production Web Automation

⚖️ Legal note before anything else: Web scraping exists in a legally complex space. Before building anything like this, review the target site's Terms of Service and robots.txt, understand applicable law in your jurisdiction (CFAA in the US, Computer Misuse Act in the UK, GDPR in the EU), and — ideally — have explicit permission or a legal opinion. The techniques below are documented for educational purposes. "I found it on the internet" is not a legal defence.


I spent 8 years in retail operations before moving into development. That background shapes how I think about automation: the business problem comes first, the code is just the means. This post is about what I learned building a pricing-intelligence system that actually ran in production — including the bugs I shipped, the incidents that broke it, and the architectural gaps I only recognised after the second outage.

It's not a "look at my dashboard" post. It's an engineering breakdown, including the parts that are embarrassing to admit.


Why "Standard" Scraping Fails Immediately

If you're reaching for requests + BeautifulSoup for anything modern, the failure isn't that your selectors are wrong. The failure happens before you send a single HTTP header.

Modern anti-bot systems analyse behavioural signals and hardware fingerprints at multiple simultaneous layers. Miss any one of them and you're blocked — often silently, sometimes with poisoned data returned instead of a 403.

Here's what those layers actually look like.


Layer 0: TLS Fingerprinting — You're Dead Before HTTP

import requests

# This dies before reaching the application layer.
# The TLS handshake itself identifies you as Python requests.
response = requests.get("https://target-site.com/products")
Enter fullscreen mode Exit fullscreen mode

The problem isn't your User-Agent header. It's not even your IP. Standard Python HTTP clients produce distinctive TLS fingerprints — specifically JA3, JA4, and JA4+ hashes — that Cloudflare, Akamai, and DataDome identify at the socket level, before they parse a single HTTP header.

JA3 vs JA4: Why the Generation Gap Matters

JA3 hashes the ClientHello message by concatenating: TLS version + cipher suites + extensions + elliptic curves + elliptic curve formats. It's a 32-character MD5 hash. Blocking by JA3 is table-stakes — every major WAF does it.

JA4 is a different beast. Its format is human-readable by design:

{transport}{tls_version}{sni_present}{cipher_count}{extension_count}{alpn}
_{cipher_suite_hash}_{extension_hash}
Enter fullscreen mode Exit fullscreen mode

A concrete example from Chrome 122 on Windows:

t13d1516h2_acb858a92679_14cb8fd47a25
│││ │  │ ││
│││ │  │ │└── ALPN: h2 (HTTP/2 negotiated)
│││ │  │ └─── 16 TLS extensions
│││ │  └───── 15 cipher suites
│││ └──────── SNI present (d = domain)
││└────────── TLS 1.3
│└─────────── TCP transport
└──────────── t = TLS (not QUIC)
Enter fullscreen mode Exit fullscreen mode

Now compare what Python's urllib3 (used by requests) produces:

t13d1912h2_acb858a92679_...
        ││
        │└── 12 extensions (vs 16 in Chrome)
        └─── 19 cipher suites (vs 15 in Chrome)
Enter fullscreen mode Exit fullscreen mode

The cipher count and extension count are different before you even look at the hashes. This is why changing your User-Agent to Chrome while using requests doesn't help — the JA4 prefix t13d1912 is a hard signal that you're not Chrome, regardless of what headers you send.

What JA4 adds beyond JA3:

  • Extension ordering — Chrome loads extensions in a specific order that reflects its internal TLS stack. Python's ssl module uses a different order.
  • ALPN protocol preference — Chrome negotiates HTTP/2 by default. Some HTTP clients don't include ALPN at all, or order h2 and http/1.1 differently.
  • Signature algorithm list — Chrome includes specific signature algorithms (ed25519, ecdsa-secp256r1-sha256) that urllib3's default SSL context omits.
  • JA4+ additionally fingerprints the server's response: which cipher the server selected, certificate details, extension responses. This catches clients that successfully mimic the ClientHello but can't mimic how they respond to the ServerHello.

If your evasion strategy is JA3-era — rotating User-Agents, tweaking headers — you're a generation behind. The handshake has already identified you.

The Fix: Start Clean, Don't Patch Dirty

# ❌ Vanilla Playwright — navigator.webdriver = true, Mesa OffScreen GPU exposed,
#    and the Chromium binary produces a recognisable TLS fingerprint.
browser = p.chromium.launch(headless=True)

# ✅ Option A: Camoufox — a hardened Firefox fork built for stealth automation.
#    Ships with patched fingerprints, a realistic GPU identity, and correct
#    navigator properties out of the box. Best integrity for Firefox-compatible targets.
#    pip install camoufox
from camoufox.async_api import AsyncCamoufox
async with AsyncCamoufox(headless=True) as browser:
    page = await browser.new_page()

# ✅ Option B: rebrowser-patches — patches applied at Chrome source level,
#    not via JS injection at runtime. A source-level patch is harder to detect
#    than monkey-patching because it can't be caught by checking whether native
#    function .toString() returns "[native code]".
#    https://github.com/rebrowser/rebrowser-patches

# ✅ Option C: curl_cffi — if you need an HTTP client rather than a full browser.
#    Impersonates real browser TLS profiles at the socket level via libcurl.
#    pip install curl_cffi
from curl_cffi.requests import AsyncSession
async with AsyncSession() as session:
    # impersonate= sets the TLS fingerprint + header order to match the target browser
    response = await session.get(url, impersonate="chrome122")
Enter fullscreen mode Exit fullscreen mode

Choose based on your threat model:

  • Pure HTTP endpoints without JS rendering → curl_cffi
  • Firefox-compatible targets where maximum fingerprint integrity matters → Camoufox
  • Chromium targets where site JS compatibility matters → rebrowser-patches or playwright-stealth (Layer 1)

The key principle: don't add stealth patches on top of a flagged baseline. Start with the cleanest baseline available, then add targeted patches for surfaces that baseline doesn't cover.


Layer 1: Stealth Patching — Necessary but Not Sufficient

playwright-stealth handles the obvious signals. Understanding which signals it patches — and which it doesn't — determines whether you need to go further.

import asyncio
from playwright.async_api import async_playwright
from playwright_stealth import stealth_async


async def create_stealth_page(browser, proxy_location: dict):
    context = await browser.new_context(
        viewport={"width": 1920, "height": 1080},
        user_agent=(
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/122.0.0.0 Safari/537.36"
        ),
        locale=proxy_location.get("locale", "en-US"),
        timezone_id=proxy_location.get("timezone", "America/New_York"),
    )

    page = await context.new_page()
    await stealth_async(page)

    # Never use `assert` for runtime validation in production code.
    # Python's -O optimisation flag silently strips all assert statements,
    # turning this into a no-op in deployed environments.
    webdriver_check = await page.evaluate("navigator.webdriver")
    if webdriver_check is not None:
        raise RuntimeError(
            f"Stealth patch failed: navigator.webdriver = {webdriver_check!r}. "
            "Check playwright-stealth version compatibility with your Chromium build."
        )

    return page
Enter fullscreen mode Exit fullscreen mode

What playwright-stealth patches:

  • navigator.webdriver — the most obvious automation flag
  • chrome.runtime — mimics a real extension environment
  • permissions.query — normalises responses that headless browsers answer differently
  • window.screen — fixes dimension inconsistencies in headless mode
  • navigator.plugins — populates the plugins array (empty in headless by default)

What it doesn't patch (you need Layer 3 for these):

  • WebGL renderer string
  • Canvas fingerprint
  • AudioContext output
  • Font enumeration results

Layer 2: Behavioural Simulation — Sessions, Not Clicks

Real users don't move the mouse in straight lines or click at pixel-perfect coordinates. Behavioural analysis engines like PerimeterX score entire sessions — mouse velocity profiles, click precision, scroll patterns, time-on-page distributions. A single unnatural movement can flag a session that was otherwise clean.

The Bug in the Original Implementation

The original HumanPage had two problems:

  1. It wasn't a Bézier curve. The docstring said "Bezier-like" but the implementation was linear interpolation with a sinusoidal noise term. A real cubic Bézier requires four control points and a specific interpolation formula.

  2. The physics were backwards relative to Fitts's Law. The noise was multiplied by sin(progress * π) — maximum scatter in the middle of the movement, zero at start and end. Real human motor control (Fitts's Law) works the opposite way: a fast ballistic phase with minimal correction makes up the first ~70% of movement, followed by a slow corrective phase with fine Gaussian adjustments near the target. Scorers that model velocity profiles will catch the sinusoidal pattern.

  3. Step count was distance-independent. random.randint(25, 45) regardless of whether you're moving 10px or 800px. Real mouse movement time scales logarithmically with distance (Fitts's Law: MT = a + b * log₂(2D/W)). A fixed step count produces unnatural velocity at extreme distances.

import asyncio
import random
import math
from dataclasses import dataclass


@dataclass
class Point:
    x: float
    y: float

    def __add__(self, other: "Point") -> "Point":
        return Point(self.x + other.x, self.y + other.y)

    def __mul__(self, scalar: float) -> "Point":
        return Point(self.x * scalar, self.y * scalar)

    def distance_to(self, other: "Point") -> float:
        return math.hypot(other.x - self.x, other.y - self.y)


def cubic_bezier(t: float, p0: Point, p1: Point, p2: Point, p3: Point) -> Point:
    """
    Cubic Bézier: B(t) = (1-t)³P0 + 3(1-t)²tP1 + 3(1-t)t²P2 + t³P3

    Four control points:
        P0: start position
        P1: first control point (influences departure angle and speed)
        P2: second control point (influences arrival angle and speed)
        P3: end position (target)
    """
    mt = 1.0 - t
    return (
        p0 * (mt ** 3) +
        p1 * (3 * mt ** 2 * t) +
        p2 * (3 * mt * t ** 2) +
        p3 * (t ** 3)
    )


def fitts_steps(distance: float, target_width: float = 10.0) -> int:
    """
    Estimate step count from Fitts's Law movement time model.

    MT = a + b * log2(2D / W)

    Constants (a=0.1, b=0.15) are approximate empirical values for mouse input.
    The result is clamped: very short movements need at least 15 steps for
    smoothness; very long ones are capped at 80 to avoid excessive runtime.
    """
    if distance < 1:
        return 15
    index_of_difficulty = math.log2(2 * distance / target_width)
    mt = 0.1 + 0.15 * index_of_difficulty  # seconds (approximate)
    steps = int(mt * 60)  # 60 "steps" per second of movement time
    return max(15, min(80, steps))


class HumanPage:
    """
    Playwright page wrapper with physically-motivated mouse movement.

    Movement model:
      - Cubic Bézier curve for the overall path shape
      - Ballistic phase (t=0.0–0.7): fast movement, minimal correction
      - Corrective phase (t=0.7–1.0): slow approach, Gaussian scatter near target
      - Step count scales with distance via Fitts's Law

    Why per-instance state, not a module-level dict:
      In an asyncio worker pool with multiple pages running concurrently,
      a shared global is a race condition: worker A updates position mid-move
      while worker B reads it for its own calculation. Each HumanPage owns
      position state for its page only.
    """

    def __init__(self, page):
        self._page = page
        self._pos = Point(0.0, 0.0)

    def _build_control_points(self, start: Point, end: Point) -> tuple[Point, Point]:
        """
        Generate P1 and P2 control points that produce a natural arc.

        P1 is offset perpendicular to the movement direction, biased toward
        the first third of the path — this creates the curved departure.

        P2 is placed near the target with a slight perpendicular offset in
        the opposite direction — this creates the converging approach.

        The perpendicular magnitude scales with distance so that short
        movements don't produce implausible arcs.
        """
        dx = end.x - start.x
        dy = end.y - start.y
        distance = math.hypot(dx, dy)

        # Unit perpendicular vector
        if distance > 0:
            perp_x = -dy / distance
            perp_y = dx / distance
        else:
            perp_x, perp_y = 0.0, 1.0

        arc_magnitude = distance * random.uniform(0.1, 0.25)
        arc_direction = random.choice([-1, 1])

        # P1: near start, pushed perpendicular
        p1 = Point(
            start.x + dx * 0.3 + perp_x * arc_magnitude * arc_direction,
            start.y + dy * 0.3 + perp_y * arc_magnitude * arc_direction,
        )
        # P2: near end, slightly pushed back toward centre
        p2 = Point(
            start.x + dx * 0.8 + perp_x * arc_magnitude * arc_direction * 0.3,
            start.y + dy * 0.8 + perp_y * arc_magnitude * arc_direction * 0.3,
        )
        return p1, p2

    async def move(self, target_x: float, target_y: float) -> None:
        """
        Move mouse from current position to (target_x, target_y).

        Phase split at t=0.7:
          Ballistic (t < 0.7): velocity is high, Gaussian jitter is minimal.
            Real ballistic movement is essentially open-loop: the motor command
            is issued and not corrected until the corrective phase begins.
          Corrective (t >= 0.7): velocity decreases, Gaussian scatter increases
            as the hand homes in on the target. Sigma scales with distance to
            target — wide early in the corrective phase, tight at the end.
        """
        start = self._pos
        end = Point(target_x, target_y)
        distance = start.distance_to(end)

        if distance < 2:
            self._pos = end
            return

        p1, p2 = self._build_control_points(start, end)
        steps = fitts_steps(distance)

        BALLISTIC_SPLIT = 0.7

        for i in range(steps + 1):
            t = i / steps
            pt = cubic_bezier(t, start, p1, p2, end)

            if t < BALLISTIC_SPLIT:
                # Ballistic: small fixed jitter
                sigma = 1.5
            else:
                # Corrective: jitter shrinks linearly as we approach target
                progress_in_corrective = (t - BALLISTIC_SPLIT) / (1 - BALLISTIC_SPLIT)
                remaining_dist = pt.distance_to(end)
                sigma = max(0.5, remaining_dist * 0.05)

            jitter_x = random.gauss(0, sigma)
            jitter_y = random.gauss(0, sigma)

            await self._page.mouse.move(pt.x + jitter_x, pt.y + jitter_y)

            # Velocity profile: fast in ballistic, slow in corrective
            if t < BALLISTIC_SPLIT:
                await asyncio.sleep(random.uniform(0.003, 0.010))
            else:
                await asyncio.sleep(random.uniform(0.015, 0.040))

        self._pos = end

    async def scroll(self, delta_y: int) -> None:
        """
        Scroll with variable speed — humans don't scroll at constant velocity.
        """
        ticks = random.randint(3, 7)
        per_tick = delta_y // ticks
        for _ in range(ticks):
            await self._page.mouse.wheel(0, per_tick + random.randint(-5, 5))
            await asyncio.sleep(random.uniform(0.08, 0.25))

    async def click(self, selector: str) -> None:
        element = await self._page.wait_for_selector(selector)
        box = await element.bounding_box()

        if box is None:
            raise ValueError(
                f"Element '{selector}' has no bounding box — "
                "it may be display:contents, detached, or inside a collapsed container."
            )

        target_x = box["x"] + box["width"] * random.uniform(0.2, 0.8)
        target_y = box["y"] + box["height"] * random.uniform(0.2, 0.8)

        await self.move(target_x, target_y)
        await asyncio.sleep(random.uniform(0.08, 0.35))
        await self._page.mouse.click(target_x, target_y)
        await asyncio.sleep(random.uniform(0.15, 0.45))
Enter fullscreen mode Exit fullscreen mode

A Note on GIL and Event Loop Blocking

A reasonable question when looking at Bézier calculations running across dozens of parallel Playwright pages: does the CPU work cause event loop lag via the GIL?

The short answer is no, and the reasoning matters because the confusion is common.

The GIL prevents parallel execution across OS threads. asyncio is single-threaded cooperative concurrency — the GIL has essentially no relevance here unless you're explicitly mixing asyncio with threading. The actual concern in an asyncio context is event loop blocking: synchronous CPU code that runs long enough without yielding to starve other coroutines.

The numbers don't support that concern here. One complete Bézier step — four Point multiplications and three additions — takes roughly 2–5 microseconds on a modern CPU. Between every step there is an await asyncio.sleep(), which yields control back to the event loop. The CPU portion is three orders of magnitude faster than a single await page.mouse.move() call, which involves IPC to the browser process at ~1–5ms round-trip latency.

If the implementation had 200+ steps with no yield points between them, loop.run_in_executor() would be appropriate. At the current structure it would be premature optimisation with added complexity for no measurable gain. The real bottleneck is always the browser IPC, not the arithmetic.


Layer 3: Fingerprint Consistency — Coherence Matters More Than Spoofing

This is where most stealth implementations break down. A real browser has a coherent hardware identity across Canvas, WebGL, AudioContext, and font enumeration simultaneously. Patching one surface while leaving others inconsistent creates detectable contradictions that are arguably more suspicious than no patching.

Bug: The AudioContext Patch Mutated Shared Memory

The original implementation had a critical flaw:

// ❌ ORIGINAL — has a memory mutation bug
const origGetChannelData = AudioBuffer.prototype.getChannelData;
AudioBuffer.prototype.getChannelData = function(channel) {
    const data = origGetChannelData.call(this, channel);
    for (let i = 0; i < data.length; i++) {
        data[i] += (Math.random() - 0.5) * 1e-7;  // BUG
    }
    return data;
};
Enter fullscreen mode Exit fullscreen mode

AudioBuffer.prototype.getChannelData returns a Float32Array that is a direct view into the AudioBuffer's internal ArrayBuffer — not a copy. Mutating data[i] modifies the AudioBuffer's underlying data in-place. Every subsequent call to getChannelData on the same buffer returns the already-mutated data, compounding the noise. More critically, any code that relies on the AudioBuffer's data after fingerprinting reads (Web Audio API nodes, audio worklets, Web Audio visualisers) will receive corrupted samples.

The same post correctly recommended working on offscreen copies for Canvas to avoid mutating page state. AudioContext needs the same treatment.

import json


async def patch_fingerprints(page, device_profile: dict):
    """
    Patch all hardware surfaces to a single consistent device profile.

    Consistency rule: every fingerprint surface must agree. A Windows/Chrome
    User-Agent with a Mesa OffScreen WebGL renderer is an instant flag. A
    Canvas patch that covers getImageData but not toDataURL creates a
    detectable contradiction. Patch all surfaces or patch none.

    Injection safety: never interpolate device_profile values directly into
    JavaScript via f-string. json.dumps() serialises Python values to valid,
    safely-escaped JS literals. float() enforces numeric type for noise,
    preventing injection there too. If device_profile comes from an external
    source (config file, API response), values containing quotes or JS syntax
    would become a code injection vulnerability without this.
    """
    vendor_js   = json.dumps(str(device_profile["webgl_vendor"]))
    renderer_js = json.dumps(str(device_profile["webgl_renderer"]))
    noise_js    = float(device_profile["canvas_noise"])

    await page.add_init_script(f"""
    (() => {{
        // ── WebGL ─────────────────────────────────────────────────────────
        // Vendor and renderer must match the claimed User-Agent OS/hardware.
        // Parameter 37445 = UNMASKED_VENDOR_WEBGL
        // Parameter 37446 = UNMASKED_RENDERER_WEBGL
        const getParameter = WebGLRenderingContext.prototype.getParameter;
        WebGLRenderingContext.prototype.getParameter = function(parameter) {{
            if (parameter === 37445) return {vendor_js};
            if (parameter === 37446) return {renderer_js};
            return getParameter.call(this, parameter);
        }};
        // Apply the same patch to WebGL2 — fingerprinters check both contexts.
        const getParameter2 = WebGL2RenderingContext.prototype.getParameter;
        WebGL2RenderingContext.prototype.getParameter = function(parameter) {{
            if (parameter === 37445) return {vendor_js};
            if (parameter === 37446) return {renderer_js};
            return getParameter2.call(this, parameter);
        }};

        // ── Canvas ────────────────────────────────────────────────────────
        // Patch all three read surfaces consistently. Patching only one is
        // worse than patching none — it creates a detectable contradiction.
        //   getImageData: pixel-level read
        //   toDataURL: base64 PNG/JPEG export
        //   toBlob: binary export (same pixel data, different delivery)
        const noise = {noise_js};

        function addNoise(imageData) {{
            for (let i = 0; i < imageData.data.length; i += 4) {{
                imageData.data[i]   += Math.floor(noise * Math.random() * 5);
                imageData.data[i+1] += Math.floor(noise * Math.random() * 5);
                imageData.data[i+2] += Math.floor(noise * Math.random() * 5);
                // Alpha channel untouched — modifying it breaks transparency.
            }}
            return imageData;
        }}

        const origGetImageData = CanvasRenderingContext2D.prototype.getImageData;
        CanvasRenderingContext2D.prototype.getImageData = function(x, y, w, h) {{
            return addNoise(origGetImageData.call(this, x, y, w, h));
        }};

        // For toDataURL and toBlob: work on an offscreen copy — never mutate
        // the original canvas. The original approach (putImageData back onto
        // the live canvas) would corrupt page state for anything that reads
        // its own canvas after calling toDataURL: game renderers, image
        // editors, captcha widgets.
        function withNoisedCopy(originalCanvas, fn) {{
            const offscreen = document.createElement('canvas');
            offscreen.width  = originalCanvas.width;
            offscreen.height = originalCanvas.height;
            const offCtx = offscreen.getContext('2d');
            offCtx.drawImage(originalCanvas, 0, 0);
            const d = offCtx.getImageData(0, 0, offscreen.width, offscreen.height);
            addNoise(d);
            offCtx.putImageData(d, 0, 0);
            return fn(offscreen);
        }}

        const origToDataURL = HTMLCanvasElement.prototype.toDataURL;
        HTMLCanvasElement.prototype.toDataURL = function(...args) {{
            if (this.getContext('2d')) {{
                return withNoisedCopy(this, c => origToDataURL.apply(c, args));
            }}
            return origToDataURL.apply(this, args);
        }};

        const origToBlob = HTMLCanvasElement.prototype.toBlob;
        HTMLCanvasElement.prototype.toBlob = function(callback, ...args) {{
            if (this.getContext('2d')) {{
                withNoisedCopy(this, c => origToBlob.call(c, callback, ...args));
                return;
            }}
            origToBlob.call(this, callback, ...args);
        }};

        // ── AudioContext ──────────────────────────────────────────────────
        // AudioContext fingerprinting renders a short buffer through the
        // browser's DSP pipeline and hashes the output. Different hardware
        // and OS audio stacks produce subtly different float results.
        //
        // FIX vs original: getChannelData returns a Float32Array that is a
        // *direct view* into the AudioBuffer's internal ArrayBuffer — not a
        // copy. Mutating data[i] in-place modifies the buffer permanently.
        // Every subsequent read gets double-noised, compounding the error.
        // Any downstream Web Audio nodes reading the same buffer get
        // corrupted samples.
        //
        // The fix: copy into a new Float32Array before modifying.
        // The copy is what gets returned; the original buffer is untouched.
        const origGetChannelData = AudioBuffer.prototype.getChannelData;
        AudioBuffer.prototype.getChannelData = function(channel) {{
            const original = origGetChannelData.call(this, channel);
            const copy = new Float32Array(original); // ← copy, not a view
            for (let i = 0; i < copy.length; i++) {{
                copy[i] += (Math.random() - 0.5) * 1e-7;
            }}
            return copy; // ← return the copy, original buffer is untouched
        }};
    }})();
    """)
Enter fullscreen mode Exit fullscreen mode

Device Profile Consistency

The patches above are only effective if the profile is internally consistent. A "Windows 10 / Chrome 122" User-Agent must pair with:

DEVICE_PROFILES = {
    "windows_chrome_122": {
        "webgl_vendor":   "Google Inc. (NVIDIA)",
        "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0, D3D11)",
        "canvas_noise":   0.4,
        "user_agent":     "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
        "locale":         "en-US",
        "timezone":       "America/New_York",
    },
    "macos_chrome_122": {
        "webgl_vendor":   "Google Inc. (Apple)",
        "webgl_renderer": "ANGLE (Apple, Apple M2 Pro, OpenGL 4.1)",
        "canvas_noise":   0.4,
        "user_agent":     "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
        "locale":         "en-US",
        "timezone":       "America/Los_Angeles",
    },
}
Enter fullscreen mode Exit fullscreen mode

Mismatches between any of these fields and the browser's actual reported values are detectable contradictions.


Layer 4: Honeypot Detection — The Silent Ban

Anti-bot systems plant invisible traps: zero-opacity links, off-screen form fields, elements styled display: none. Automated scripts interact with them; real users don't. Triggering a honeypot typically results in an immediate, silent ban — no 403, just poisoned data or session termination that's hard to distinguish from normal operation.

async def is_honeypot(page, selector: str) -> bool:
    """
    Returns True if the element looks like a honeypot.

    Uses a single evaluate() call instead of chained Playwright method calls
    (is_visible(), bounding_box(), evaluate()) — three separate browser
    round-trips that add up at scale. All checks run in one JS execution context.
    """
    element = await page.query_selector(selector)
    if not element:
        return True

    result = await element.evaluate("""el => {
        const box = el.getBoundingClientRect();
        const st  = window.getComputedStyle(el);
        return {
            display:    st.display,
            visibility: st.visibility,
            opacity:    parseFloat(st.opacity),
            width:      box.width,
            height:     box.height,
            x:          box.x,
            y:          box.y,
            inert:      el.inert,
            ariaHidden: el.getAttribute('aria-hidden'),
        };
    }""")

    if result["display"] == "none":      return True
    if result["visibility"] == "hidden": return True
    if result["opacity"] < 0.01:         return True
    if result["width"] < 1:              return True
    if result["height"] < 1:             return True
    if result["x"] < -100:              return True
    if result["y"] < -100:              return True
    if result["inert"]:                  return True

    return False


async def safe_click(hp: HumanPage, selector: str) -> None:
    if await is_honeypot(hp._page, selector):
        raise ValueError(f"Refusing to interact with suspected honeypot: {selector}")
    await hp.click(selector)
Enter fullscreen mode Exit fullscreen mode

Layer 5: Proxy Management — Infrastructure, Not an Afterthought

A flawless browser fingerprint still fails on a flagged datacenter IP. Proxy management has three decisions that matter:

import asyncio
import random
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class ProxyConfig:
    host: str
    port: int
    username: str
    password: str
    country: str
    fail_count: int = 0
    total_requests: int = 0
    total_failures: int = 0

    @property
    def playwright_config(self) -> dict:
        return {
            "server":   f"http://{self.host}:{self.port}",
            "username": self.username,
            "password": self.password,
        }

    @property
    def subnet(self) -> str:
        """First three octets — useful for detecting provider-level issues."""
        return ".".join(self.host.split(".")[:3])


class ProxyRegistry:
    """
    Explicit proxy registry passed via dependency injection.

    Why not a global Singleton?

    1. Testability: a singleton bleeds state between test cases. With DI,
       each test instantiates its own registry with a known fixture set.

    2. asyncio event loop scope: asyncio.Lock() must be created inside the
       running event loop. A class-level lock created at import time binds to
       the wrong loop in pytest-asyncio and multiprocessing deployments,
       producing obscure "attached to a different loop" errors at runtime.

    3. Visibility: a hidden global is an invisible dependency. DI makes
       the dependency explicit in every function signature that uses it.
    """

    MAX_FAILS = 3

    def __init__(self, proxies: list[ProxyConfig]):
        self._proxies = proxies
        self._lock = asyncio.Lock()

    async def get(self, country: Optional[str] = None) -> ProxyConfig:
        async with self._lock:
            available = [
                p for p in self._proxies
                if p.fail_count < self.MAX_FAILS
                and (country is None or p.country == country)
            ]
            if not available:
                raise RuntimeError("Proxy pool exhausted.")

            # Weighted random selection — weight = 1 / (fail_count + 1).
            # Pure min(fail_count) routes 100% of traffic to one proxy until
            # it reaches fail_count=1, then the next, then the next. That's
            # sequential destruction, not load balancing.
            # Weighted random spreads load while still preferring healthier proxies.
            weights    = [1.0 / (p.fail_count + 1) for p in available]
            total      = sum(weights)
            r          = random.uniform(0, total)
            cumulative = 0.0
            chosen     = available[-1]
            for proxy, w in zip(available, weights):
                cumulative += w
                if r <= cumulative:
                    chosen = proxy
                    break

            # Increment at selection time, not at failure report time.
            # Without this, concurrent workers can all receive the same proxy
            # (fail_count=0 at get() time), then all report failure
            # simultaneously, jumping fail_count from 0 to N and quarantining
            # a healthy proxy.
            chosen.fail_count += 1
            chosen.total_requests += 1
            return chosen

    async def report(self, proxy: ProxyConfig, success: bool) -> None:
        async with self._lock:
            if success:
                proxy.fail_count = max(0, proxy.fail_count - 1)
            else:
                proxy.total_failures += 1
                if proxy.fail_count >= self.MAX_FAILS:
                    pass  # already quarantined; fail_count was set at get()

    def subnet_failure_rates(self) -> dict[str, float]:
        """
        Returns failure rate per /24 subnet.

        Used to detect when a proxy provider silently routes a batch of IPs
        through a flagged datacenter range — a failure mode that looks like
        general degradation until you correlate by subnet.
        """
        subnets: dict[str, list[ProxyConfig]] = {}
        for p in self._proxies:
            subnets.setdefault(p.subnet, []).append(p)

        return {
            subnet: sum(p.total_failures for p in proxies) / max(1, sum(p.total_requests for p in proxies))
            for subnet, proxies in subnets.items()
        }
Enter fullscreen mode Exit fullscreen mode

Three decisions that matter in production:

  • Sticky sessions — same exit IP for the full logical session, not per-request rotation. Per-request rotation trips geo-consistency checks.
  • Geo-matching — proxy country must match locale and timezone_id in the browser context. Mismatches are trivial signals.
  • Subnet-level monitoring — per-proxy logging lets you detect when a provider silently routes a subset of IPs through a flagged datacenter. Without it, the failure looks like general degradation for days.

Layer 6: Retry Logic — Transient Failures Are Not Failures

No production system runs without transient failures: proxy timeouts, soft rate-limits, transient blocks. The response is structured retry with backoff, not except: pass.

import asyncio
import random
from typing import Callable, TypeVar

T = TypeVar("T")


async def with_retry(
    fn: Callable,
    *args,
    max_attempts: int = 3,
    base_delay: float = 2.0,
    max_delay: float = 60.0,
    retryable_exceptions: tuple = (OSError, asyncio.TimeoutError),
    **kwargs,
) -> T:
    """
    Exponential backoff with full jitter.

    Pure exponential backoff causes thundering herd: all workers that failed
    at the same moment retry at the same moment. Full jitter
    (random.uniform(0, cap)) spreads retries evenly across the window.

    Why not retryable_exceptions=(Exception,) as the default?
    Exception catches KeyboardInterrupt, SystemExit, and MemoryError —
    none of which should be silently retried. (OSError, asyncio.TimeoutError)
    covers transient network failures. Opt into broader coverage explicitly.

    Note: this function handles *per-request* transients — a proxy timeout,
    a momentary rate-limit response. It does not handle "the entire target
    domain is currently blocking us." That's Layer 7: circuit breaker.
    """
    for attempt in range(max_attempts):
        try:
            return await fn(*args, **kwargs)
        except retryable_exceptions as e:
            if attempt == max_attempts - 1:
                # Log before re-raising — without this the caller receives an
                # exception with no indication that N retries preceded it.
                print(
                    f"[Retry] All {max_attempts} attempts failed. "
                    f"Final error: {type(e).__name__}: {e}"
                )
                raise
            cap   = min(max_delay, base_delay * (2 ** attempt))
            delay = random.uniform(0, cap)
            print(f"[Retry] Attempt {attempt + 1}/{max_attempts} failed: {e}. "
                  f"Waiting {delay:.1f}s.")
            await asyncio.sleep(delay)
Enter fullscreen mode Exit fullscreen mode

Layer 7: Circuit Breaker — The Layer the Original Skipped

The original post acknowledged the gap: "with_retry handles per-request transients; it doesn't handle 'this entire target site is currently blocking us.'" This section fills it — including an architectural constraint that matters the moment you scale beyond a single process.

A retry loop and a circuit breaker are different abstractions solving different failure modes:

Retry Circuit Breaker
Scope Single request Entire domain
Failure mode Transient (proxy timeout) Sustained (site-wide block)
Response Wait and retry Stop draining the proxy pool
Recovery Automatic Automatic, via HALF_OPEN probe

The In-Process Version (Single Container)

import asyncio
import time
from collections import deque
from enum import Enum
from typing import Callable, TypeVar

T = TypeVar("T")


class CircuitState(Enum):
    CLOSED    = "closed"     # Normal operation — all requests flow through
    OPEN      = "open"       # Sustained failure — all requests rejected
    HALF_OPEN = "half_open"  # Recovery probe — limited requests allowed


class CircuitBreaker:
    """
    Domain-level circuit breaker with sliding window block-rate tracking.

    State machine:
        CLOSED → OPEN:      block_rate > threshold over window_seconds
        OPEN → HALF_OPEN:   recovery_timeout seconds have elapsed
        HALF_OPEN → CLOSED: probe_limit consecutive successes
        HALF_OPEN → OPEN:   any failure during probe

    ⚠️  SCOPE LIMITATION: this implementation lives in process memory.
    It works correctly for a single container or a single long-running script.
    If you run multiple workers — Docker Compose, K8s pods, parallel EC2
    instances — each pod maintains its own independent breaker state.
    Pod A opens its breaker; pods B, C, D know nothing about it and continue
    burning through the proxy pool against a domain that's blocking all of
    them. See DistributedCircuitBreaker below for the multi-pod solution.
    """

    def __init__(
        self,
        domain: str,
        threshold: float = 0.20,     # open if block rate exceeds 20%
        window_seconds: int = 600,   # 10-minute sliding window
        recovery_timeout: int = 300, # wait 5 minutes before probing
        probe_limit: int = 3,        # consecutive successes to close
    ):
        self.domain = domain
        self.threshold = threshold
        self.window_seconds = window_seconds
        self.recovery_timeout = recovery_timeout
        self.probe_limit = probe_limit

        self._state = CircuitState.CLOSED
        self._lock = asyncio.Lock()
        self._events: deque[tuple[float, bool]] = deque()
        self._opened_at: float = 0.0
        self._probe_successes: int = 0

    def _prune_window(self, now: float) -> None:
        cutoff = now - self.window_seconds
        while self._events and self._events[0][0] < cutoff:
            self._events.popleft()

    def _block_rate(self, now: float) -> float:
        self._prune_window(now)
        if not self._events:
            return 0.0
        failures = sum(1 for _, ok in self._events if not ok)
        return failures / len(self._events)

    async def allow(self) -> bool:
        async with self._lock:
            now = time.monotonic()
            if self._state == CircuitState.CLOSED:
                return True
            if self._state == CircuitState.OPEN:
                if now - self._opened_at >= self.recovery_timeout:
                    self._state = CircuitState.HALF_OPEN
                    self._probe_successes = 0
                    return True
                return False
            return True  # HALF_OPEN: let probe through

    async def record(self, success: bool) -> None:
        async with self._lock:
            now = time.monotonic()
            self._events.append((now, success))

            if self._state == CircuitState.HALF_OPEN:
                if success:
                    self._probe_successes += 1
                    if self._probe_successes >= self.probe_limit:
                        self._state = CircuitState.CLOSED
                        log.info("circuit.closed", domain=self.domain)
                else:
                    self._state = CircuitState.OPEN
                    self._opened_at = now
                    log.warning("circuit.reopened", domain=self.domain)
                return

            if self._state == CircuitState.CLOSED and not success:
                rate = self._block_rate(now)
                if rate > self.threshold:
                    self._state = CircuitState.OPEN
                    self._opened_at = now
                    log.warning(
                        "circuit.opened",
                        domain=self.domain,
                        block_rate=round(rate, 3),
                        threshold=self.threshold,
                    )

    def status(self) -> dict:
        now = time.monotonic()
        return {
            "domain":      self.domain,
            "state":       self._state.value,
            "block_rate":  round(self._block_rate(now), 3),
            "window_size": len(self._events),
            "opened_at":   self._opened_at if self._state != CircuitState.CLOSED else None,
        }


class CircuitOpenError(Exception):
    pass


class CircuitBreakerRegistry:
    """One breaker per domain, created on first access."""

    def __init__(self, **breaker_defaults):
        self._breakers: dict[str, CircuitBreaker] = {}
        self._defaults = breaker_defaults

    def get(self, domain: str) -> CircuitBreaker:
        if domain not in self._breakers:
            self._breakers[domain] = CircuitBreaker(domain=domain, **self._defaults)
        return self._breakers[domain]

    def all_statuses(self) -> list[dict]:
        return [b.status() for b in self._breakers.values()]
Enter fullscreen mode Exit fullscreen mode

The Distributed Version (Multi-Pod / K8s)

The in-process breaker has a hard architectural constraint: state lives in one process's memory. In a horizontal deployment — K8s pods, Docker Compose workers, parallel EC2 instances — each pod observes only its own traffic. Pod A trips its breaker; pods B through D keep hammering the same blocked domain, burning proxy IPs until their individual breakers catch up. By that point you may have lost the entire proxy subnet.

The fix is to centralise state. Redis is the standard tool for this: it's fast enough for per-request writes (sub-millisecond), supports atomic operations, and has a TTL mechanism that handles cleanup automatically.

# pip install redis[hiredis]
import redis.asyncio as aioredis
import time


class DistributedCircuitBreaker:
    """
    Circuit breaker backed by Redis sorted sets. Shares state across all pods.

    Key schema (all keys scoped to domain):
        cb:events:{domain}    — sorted set, score=timestamp, member="ok:{ts}" or "fail:{ts}"
        cb:state:{domain}     — string: "closed" | "open" | "half_open"
        cb:opened_at:{domain} — float timestamp of last OPEN transition
        cb:probe_ok:{domain}  — integer: consecutive probe successes in HALF_OPEN

    Why sorted set instead of a simple counter:
        A counter gives you total failures — you lose the sliding window.
        ZREMRANGEBYSCORE on a sorted set prunes stale events atomically,
        giving you an accurate block rate over the last N seconds without
        a separate cleanup process.

    Redis as SPOF:
        If Redis becomes unavailable, allow() returns True (fail-open).
        The alternative — fail-closed — stops all scraping across all pods
        on a Redis hiccup. In this workload, that's worse than a brief period
        of unguarded requests. If you need fail-closed, invert the except clause.

    HA:
        Use Redis Sentinel (3-node) or Redis Cluster for production.
        A standalone Redis instance is itself a single point of failure.
    """

    def __init__(
        self,
        redis_client: aioredis.Redis,
        domain: str,
        threshold: float = 0.20,
        window_seconds: int = 600,
        recovery_timeout: int = 300,
        probe_limit: int = 3,
    ):
        self.r = redis_client
        self.domain = domain
        self.threshold = threshold
        self.window_seconds = window_seconds
        self.recovery_timeout = recovery_timeout
        self.probe_limit = probe_limit

        self._key_events   = f"cb:events:{domain}"
        self._key_state    = f"cb:state:{domain}"
        self._key_opened   = f"cb:opened_at:{domain}"
        self._key_probe_ok = f"cb:probe_ok:{domain}"

    async def allow(self) -> bool:
        try:
            state = await self.r.get(self._key_state)
            state = (state or b"closed").decode()

            if state == "closed":
                return True

            if state == "open":
                opened_at = float(await self.r.get(self._key_opened) or 0)
                if time.time() - opened_at >= self.recovery_timeout:
                    # Atomic transition to half_open.
                    # GETSET is deprecated in Redis 6.2+; use SET ... GET instead.
                    await self.r.set(self._key_state, "half_open")
                    await self.r.set(self._key_probe_ok, 0)
                    log.info("circuit.half_open", domain=self.domain, source="redis")
                    return True
                return False

            # half_open: let probe requests through
            return True

        except aioredis.RedisError as e:
            # Redis unavailable — fail open rather than stop all pods.
            log.error("circuit.redis_error", domain=self.domain, error=str(e))
            return True

    async def record(self, success: bool) -> None:
        try:
            now = time.time()
            pipe = self.r.pipeline()

            # Maintain sliding window: prune old events, add new one.
            # Pipeline batches both commands into a single round-trip.
            member = f"{'ok' if success else 'fail'}:{now}"
            pipe.zremrangebyscore(self._key_events, 0, now - self.window_seconds)
            pipe.zadd(self._key_events, {member: now})
            pipe.expire(self._key_events, self.window_seconds * 2)
            await pipe.execute()

            state = (await self.r.get(self._key_state) or b"closed").decode()

            if state == "half_open":
                if success:
                    count = await self.r.incr(self._key_probe_ok)
                    if count >= self.probe_limit:
                        await self.r.set(self._key_state, "closed")
                        log.info("circuit.closed", domain=self.domain, source="redis")
                else:
                    await self.r.set(self._key_state, "open")
                    await self.r.set(self._key_opened, now)
                    log.warning("circuit.reopened", domain=self.domain, source="redis")
                return

            if state == "closed" and not success:
                # Compute block rate from the sliding window we just updated.
                all_members = await self.r.zrange(self._key_events, 0, -1)
                if all_members:
                    failures = sum(
                        1 for m in all_members if m.decode().startswith("fail")
                    )
                    rate = failures / len(all_members)
                    if rate > self.threshold:
                        await self.r.set(self._key_state, "open")
                        await self.r.set(self._key_opened, now)
                        log.warning(
                            "circuit.opened",
                            domain=self.domain,
                            block_rate=round(rate, 3),
                            source="redis",
                        )

        except aioredis.RedisError as e:
            log.error("circuit.redis_record_error", domain=self.domain, error=str(e))

    async def status(self) -> dict:
        try:
            state = (await self.r.get(self._key_state) or b"closed").decode()
            all_members = await self.r.zrange(self._key_events, 0, -1)
            failures = sum(1 for m in all_members if m.decode().startswith("fail"))
            block_rate = failures / len(all_members) if all_members else 0.0
            return {
                "domain":      self.domain,
                "state":       state,
                "block_rate":  round(block_rate, 3),
                "window_size": len(all_members),
                "backend":     "redis",
            }
        except aioredis.RedisError:
            return {"domain": self.domain, "state": "unknown", "backend": "redis"}
Enter fullscreen mode Exit fullscreen mode

Choosing between the two implementations:

Deployment Use
Single process / single container CircuitBreaker (in-memory)
Docker Compose with multiple workers DistributedCircuitBreaker
K8s pods, any horizontal scaling DistributedCircuitBreaker

Both share the same allow() / record() interface, so the calling code at the request site is identical. The only difference is construction: one takes no external dependency, the other takes a redis.asyncio.Redis client.

Usage at the request site (works with both implementations):

async def fetch_price(
    url: str,
    hp: HumanPage,
    proxy: ProxyConfig,
    proxy_registry: ProxyRegistry,
    breaker: CircuitBreaker | DistributedCircuitBreaker,
) -> dict:
    if not await breaker.allow():
        raise CircuitOpenError(
            f"Circuit OPEN for {breaker.domain}"
            "skipping to avoid draining proxy pool."
        )

    success = False
    try:
        await hp._page.goto(url, wait_until="domcontentloaded", timeout=30_000)
        data = await hp._page.evaluate("() => window.__PRICE_DATA__")
        success = True
        await proxy_registry.report(proxy, success=True)
        return data
    except Exception:
        await proxy_registry.report(proxy, success=False)
        raise
    finally:
        await breaker.record(success)
Enter fullscreen mode Exit fullscreen mode

Layer 8: Observability — print() Is Not a Monitoring Strategy

The original post noted that "print() to stdout is not observability" — and then didn't show what observability actually looks like. Here it is.

The difference matters operationally. print() tells you something happened. Structured logging tells you what happened, to which resource, in which context, with which outcome, in a format that's queryable. The incident in Month 11 (proxy subnet flagged) took three days to diagnose partly because correlating proxy host to block rate required manual log parsing. With structured logging it's a one-line query.

Structured Logging with structlog

pip install structlog
Enter fullscreen mode Exit fullscreen mode
import structlog
import logging
import sys


def configure_logging(level: str = "INFO") -> None:
    """
    Configure structlog for structured JSON output in production,
    human-readable output in development.

    structlog processors run in order. Each processor receives the event
    dict and returns a modified version. The final processor (JSONRenderer
    or ConsoleRenderer) serialises the dict to a string.
    """
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,          # thread-local context
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.ExceptionRenderer(),
            structlog.processors.JSONRenderer(),              # → JSON in prod
        ],
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, level.upper(), logging.INFO)
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(sys.stdout),
    )


# Module-level logger — bind context progressively
log = structlog.get_logger()
Enter fullscreen mode Exit fullscreen mode

Binding session context:

async def run_session(
    url: str,
    proxy: ProxyConfig,
    circuit_registry: CircuitBreakerRegistry,
    session_id: str,
) -> dict:
    from urllib.parse import urlparse
    domain = urlparse(url).netloc

    # Bind fields that apply to every log call in this session
    session_log = log.bind(
        session_id=session_id,
        domain=domain,
        proxy_host=proxy.host,
        proxy_subnet=proxy.subnet,
        proxy_country=proxy.country,
    )

    session_log.info("session.start", url=url)

    start = time.monotonic()
    try:
        result = await fetch_price(url, ...)
        duration_ms = int((time.monotonic() - start) * 1000)
        session_log.info(
            "session.success",
            duration_ms=duration_ms,
            price=result.get("price"),
        )
        return result
    except CircuitOpenError as e:
        session_log.warning("session.circuit_open", reason=str(e))
        raise
    except Exception as e:
        duration_ms = int((time.monotonic() - start) * 1000)
        session_log.error(
            "session.failure",
            duration_ms=duration_ms,
            error_type=type(e).__name__,
            error=str(e),
        )
        raise
Enter fullscreen mode Exit fullscreen mode

What the output looks like:

{"event": "session.start", "session_id": "s_1a2b3c", "domain": "example.com", "proxy_host": "192.168.1.10", "proxy_subnet": "192.168.1", "proxy_country": "US", "url": "https://example.com/product/123", "timestamp": "2024-03-15T14:23:01.123Z", "log_level": "info"}
{"event": "session.success", "session_id": "s_1a2b3c", "domain": "example.com", "proxy_host": "192.168.1.10", "proxy_subnet": "192.168.1", "duration_ms": 3241, "price": 49.99, "timestamp": "2024-03-15T14:23:04.364Z", "log_level": "info"}
Enter fullscreen mode Exit fullscreen mode

Querying for the Month 11 incident in seconds:

# Block rate by subnet (what took 3 days to find manually)
cat session.log | jq -s '
  group_by(.proxy_subnet) |
  map({
    subnet: .[0].proxy_subnet,
    total: length,
    failures: map(select(.event == "session.failure")) | length,
    block_rate: (map(select(.event == "session.failure")) | length) / length
  })
' | jq 'sort_by(-.block_rate)'
Enter fullscreen mode Exit fullscreen mode

Circuit Breaker Status Logging

async def log_circuit_status(circuit_registry: CircuitBreakerRegistry) -> None:
    """Log circuit breaker states periodically."""
    for status in circuit_registry.all_statuses():
        if status["state"] != "closed" or status["block_rate"] > 0.05:
            log.warning(
                "circuit.status",
                **status,
            )
Enter fullscreen mode Exit fullscreen mode

Layer 9: Storage — SQLite, Postgres, and When the Answer Changes

The original post mentioned SQLite and WAL mode briefly. This section makes the decision criteria explicit.

SQLite in WAL Mode: What It Actually Handles

import aiosqlite
import asyncio
from contextlib import asynccontextmanager


async def init_db(path: str) -> aiosqlite.Connection:
    db = await aiosqlite.connect(path)
    # WAL mode: readers don't block writers, writers don't block readers.
    # Critical for a pattern where one writer task feeds many reader tasks.
    await db.execute("PRAGMA journal_mode=WAL")
    # synchronous=NORMAL: flush on checkpoint, not every write.
    # Faster than FULL; safe for crash recovery with WAL.
    await db.execute("PRAGMA synchronous=NORMAL")
    # Larger page cache reduces I/O for read-heavy workloads.
    await db.execute("PRAGMA cache_size=-65536")  # 64MB
    await db.execute("PRAGMA foreign_keys=ON")
    await db.execute("""
        CREATE TABLE IF NOT EXISTS price_snapshots (
            id           INTEGER PRIMARY KEY,
            sku          TEXT    NOT NULL,
            domain       TEXT    NOT NULL,
            price        REAL,
            currency     TEXT,
            session_id   TEXT    NOT NULL,
            proxy_host   TEXT    NOT NULL,
            status_code  INTEGER NOT NULL,
            duration_ms  INTEGER,
            captured_at  TEXT    NOT NULL DEFAULT (datetime('now'))
        )
    """)
    await db.execute("""
        CREATE INDEX IF NOT EXISTS idx_sku_captured
        ON price_snapshots(sku, captured_at DESC)
    """)
    await db.commit()
    return db


class WriteQueue:
    """
    Funnel all writes through a single asyncio task.

    SQLite's WAL mode handles concurrent reads cleanly. Concurrent writes
    still serialise at the SQLite level — but if multiple coroutines call
    db.execute() concurrently, they also contend on the aiosqlite connection
    object itself. A single writer task eliminates that contention entirely
    and makes write ordering predictable.

    maxsize is not optional:
        asyncio.Queue() with no maxsize is an unbounded queue. If disk I/O
        stalls — a slow fsync, a cloud VM with noisy-neighbour I/O — the
        worker falls behind while producers keep enqueuing. The queue grows
        without limit until the container is OOM-killed. There is no warning;
        the process just disappears.

        maxsize=500 creates backpressure: write() will await when the queue
        is full, slowing producers down rather than accumulating memory.
        The right maxsize depends on your write latency and worker count —
        measure under realistic load, not in dev.

    write_timeout controls how long write() will wait for a queue slot before
    giving up. A timeout is preferable to blocking forever: if the writer
    task dies (unhandled exception in _worker), an unbounded wait would
    silently hang all producer coroutines.
    """

    def __init__(
        self,
        db: aiosqlite.Connection,
        maxsize: int = 500,
        write_timeout: float = 10.0,
    ):
        self._db            = db
        self._write_timeout = write_timeout
        self._dropped       = 0
        # maxsize enforces backpressure — producers block instead of OOM-ing.
        self._queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
        self._task = asyncio.create_task(self._worker())

    async def _worker(self):
        while True:
            item = await self._queue.get()
            if item is None:
                break
            sql, params, fut = item
            try:
                await self._db.execute(sql, params)
                await self._db.commit()
                fut.set_result(None)
            except Exception as e:
                fut.set_exception(e)
            finally:
                self._queue.task_done()

    async def write(self, sql: str, params: tuple) -> None:
        loop = asyncio.get_running_loop()
        fut  = loop.create_future()
        try:
            # wait_for gives the worker time to drain a slot before we give up.
            # put_nowait would raise QueueFull immediately with no grace period.
            await asyncio.wait_for(
                self._queue.put((sql, params, fut)),
                timeout=self._write_timeout,
            )
        except asyncio.TimeoutError:
            self._dropped += 1
            log.error(
                "write_queue.backpressure_timeout",
                queue_size=self._queue.qsize(),
                dropped_total=self._dropped,
                write_timeout=self._write_timeout,
            )
            raise  # propagate — the caller decides whether to retry or skip

        await fut

    @property
    def queue_size(self) -> int:
        return self._queue.qsize()

    async def close(self):
        await self._queue.put(None)
        await self._task
Enter fullscreen mode Exit fullscreen mode

When SQLite Stops Being the Right Answer

SQLite with WAL handles the pricing-intelligence workload comfortably: ~4,000 SKUs, one write per SKU per run cycle, many concurrent readers. At what point does the answer change?

Benchmark: synthetic write throughput (single writer task, WAL mode)

Batch size Writes/sec p99 latency
1 (unbatched) ~800 ~1.8ms
10 ~4,200 ~3.1ms
100 ~12,000 ~11ms

These numbers are from a local NVMe SSD. On network-attached storage or a cloud VM with shared I/O, divide by 3–5.

Switch to Postgres when any of these are true:

  1. Write throughput sustained above ~5,000/sec. SQLite's single-writer model becomes a bottleneck. Postgres's MVCC handles concurrent writes without serialising them.

  2. Multiple machines need to write simultaneously. SQLite is a single-file database — not usable across a network without a proxy (Litestream, rqlite). Postgres is native client-server.

  3. You need row-level locking for concurrent updates. SQLite locks at the database level for writes. If you're updating price records in place rather than inserting new snapshots, concurrent updates will serialise hard.

  4. Your team's incident tooling expects SQL-compatible infrastructure. EXPLAIN ANALYZE, pg_stat_statements, and integration with tools like Datadog APM are Postgres-native. Adding this observability to SQLite requires custom instrumentation.

  5. Audit trail requirements. If regulators or clients need immutable write history, Postgres + logical replication is the standard answer.

At the 4,000 SKU scale in this post: SQLite is correct. Don't optimise prematurely. Measure write throughput under real load before deciding.


What Actually Happened in Production

I'll be direct about the metrics, because "0 incidents, 99.x% uptime" framing is useless.

18 months, ~4,000 SKUs. The real picture:


Incident 1 — Month 4: Partial Canvas Patch Detected

Cloudflare pushed an update to their Managed Challenge. Our canvas noise patch covered getImageData but not toDataURL or toBlob. Detection rate jumped from near-zero to ~40% of sessions within 48 hours.

That figure comes from HTTP response code logging in the SQLite session table — we tracked status_code per run and could query block rate as COUNT(*) WHERE status=403 / COUNT(*) per hour. Diagnosis took a day. The fix (patching all three canvas surfaces) took two hours.

Lesson: partial fingerprint patching is worse than none — it creates a detectable contradiction. Patch all surfaces or patch none.


Incident 2 — Month 11: Silent Proxy Subnet Compromise

One residential proxy provider silently began routing a subset of IPs through a datacenter range. Cloudflare's IP reputation scoring flagged those exit nodes. We saw a gradual uptick in 403s over three days before correlating it to the proxy subnet.

We found it because we were logging proxy_host per request and could GROUP BY subnet in the session table. Without per-proxy logging, this would have looked like general degradation indefinitely.

Lesson: instrument at the proxy level, not just the request level. The subnet_failure_rates() method in the ProxyRegistry and the proxy_subnet field in structured logs address this directly.

What the incidents exposed architecturally:

Both incidents were recoverable because the failures were observable. What I didn't have — and should have — was the circuit breaker (now Layer 7). Both incidents involved the system continuing to burn through proxy pool capacity against a domain that was blocking us at scale. with_retry handles per-request transients; it doesn't stop draining the pool when the entire target is blocking you. Those are different failure modes and need different responses.

On the "0 manual interventions" claim in the original draft: true if "intervention" means a human sitting down to debug a mid-run block. The retry and rotation logic handled transient failures automatically. Not true if you count the two incidents above, which both required code changes and redeployments. Write your metrics definitions before you write your metrics.


The Full Stack

┌─────────────────────────────────────────────────────────────┐
│                    Orchestrator (asyncio)                     │
├──────────────┬──────────────────┬───────────────────────────┤
│  TLS Clean   │  Stealth         │  Fingerprint              │
│  Baseline    │  Patching        │  Consistency              │
│  (Camoufox / │  (playwright-    │  (Canvas/WebGL/Audio,     │
│  rebrowser)  │  stealth)        │  all surfaces, no leaks)  │
├──────────────┴──────────────────┴───────────────────────────┤
│  Behavioural Simulation (HumanPage)                          │
│  Cubic Bézier · Fitts's Law step count · Phase-split jitter  │
├──────────────────────────────────────────────────────────────┤
│  Honeypot Detection · single evaluate() · 7 visibility checks│
├──────────────────────────────────────────────────────────────┤
│  Retry (per-request, full jitter) │ Circuit Breaker (domain) │
│                                   │ In-process OR Redis-backed│
├──────────────────────────────────────────────────────────────┤
│  ProxyRegistry (DI) · Weighted · Geo-matched · Subnet audit  │
├──────────────────────────────────────────────────────────────┤
│  Observability: structlog JSON · session_id · proxy_subnet   │
│  Circuit status · block_rate queryable in < 1 minute         │
├──────────────────────────────────────────────────────────────┤
│  Storage: SQLite WAL + WriteQueue (bounded, backpressure)    │
│  Switch-to-Postgres criteria defined                         │
└──────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. TLS fingerprinting kills you before HTTP. JA4 captures extension structure, ALPN ordering, and signature algorithms — not just cipher suites. Rotating User-Agents while using requests or vanilla Playwright doesn't help. Start with a clean baseline: Camoufox, rebrowser-patches, or curl_cffi.

  2. Partial fingerprint patching is actively harmful. All three canvas surfaces (getImageData, toDataURL, toBlob) plus AudioContext must be patched consistently. WebGL2 needs the same patch as WebGL. Partial patches create detectable contradictions.

  3. The AudioContext patch must copy the buffer. getChannelData returns a view into the AudioBuffer's internal ArrayBuffer. Mutating it in-place corrupts the buffer permanently. new Float32Array(original) creates a copy; return the copy.

  4. Mouse movement physics need to match the model you're claiming. Real cubic Bézier requires four control points. Fitts's Law means step count should scale with distance. The ballistic phase (0–70%) has minimal correction; the corrective phase (70–100%) has increasing Gaussian scatter as the cursor homes in.

  5. Behavioural simulation is session-level. Per-page state, not a global dict. Global state in a concurrent worker pool is a race condition.

  6. Bézier arithmetic doesn't block the event loop. The GIL is irrelevant in single-threaded asyncio. One Bézier step takes ~2–5µs; a single browser IPC call takes ~1–5ms. The real bottleneck is always the browser. run_in_executor() here is premature optimisation.

  7. Honeypots cause silent bans. Consolidate visibility checks into a single evaluate() call. Add inert to the check list.

  8. Proxy selection needs weighted random, not always-min. Increment fail_count at selection time, not failure time. Log at the subnet level — subnet-level failure rates diagnose provider issues that look like general degradation.

  9. Retry and circuit breakers are different abstractions. with_retry handles per-request transients. A circuit breaker handles sustained domain-level blocks. Build both.

  10. In-process circuit breakers don't work in horizontal deployments. In K8s or Docker Compose with multiple workers, each pod's breaker is invisible to the others. Use DistributedCircuitBreaker backed by Redis whenever you scale beyond a single container. Use Redis Sentinel or Cluster — standalone Redis is itself a SPOF.

  11. Unbounded queues are silent OOM bombs. asyncio.Queue(maxsize=500) with asyncio.wait_for() creates backpressure: producers slow down instead of accumulating memory until the container is killed.

  12. assert is not runtime validation, Exception is not a retry target. Use explicit raise, narrow your retryable exception set, log before the final re-raise.

  13. print() is not observability. Structured logging (structlog) with bound context makes incidents diagnosable in minutes. The Month 11 subnet incident took three days with ad-hoc logging; with subnet_failure_rates() and structured logs it's a one-command query.

  14. SQLite is right at 4,000 SKUs. Define the switch criteria before you need them. WAL + single writer task handles the workload. Postgres becomes correct at sustained ~5,000 writes/sec, multiple concurrent writers across machines, or when your team's incident tooling expects it.


The detection systems are good and getting better. The gap you're maintaining is operational: how fast can you diagnose a new detection vector, measure its scope, and ship a fix. Everything in this post exists to make that loop shorter.


Working through a specific anti-bot system or detection layer? Drop it in the comments.


Tags: #python #automation #playwright #webdev #scraping #architecture #devops #observability

Top comments (1)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.