DEV Community

11111
11111

Posted on

Building a Reliable CAPTCHA-Solving Pipeline: Error Handling, Retries, and Circuit Breakers

Your scraper solves CAPTCHAs fine in dev. Then you deploy it, scale to 1000 pages/hour, and everything falls apart — timeouts, expired tokens, wrong CAPTCHA types, rate limits.

The difference between a hobby scraper and a production one isn't the happy path. It's how you handle failures. Let's build a resilient CAPTCHA-solving pipeline step by step.

The Problem: Naive Solve-and-Submit

Most tutorials show you this:

token = solve_captcha(sitekey, url)
submit_form(token)
Enter fullscreen mode Exit fullscreen mode

This works until it doesn't. In production, you'll hit:

  • Timeouts — solver takes too long
  • Expired tokens — you solved it but submitted too late
  • Wrong type detection — you sent "recaptcha_v2" but it's actually "recaptcha_v3"
  • Rate limits — too many concurrent solves
  • Service outages — the solving API is temporarily down

Step 1: Classify Your Errors

Not all errors deserve the same response. Group them:

from enum import Enum

class CaptchaErrorType(Enum):
    TRANSIENT = "transient"      # Retry immediately
    RATE_LIMITED = "rate_limited" # Back off, then retry
    BAD_INPUT = "bad_input"      # Fix parameters, then retry
    EXPIRED = "expired"          # Re-solve from scratch
    FATAL = "fatal"              # Stop and alert

def classify_error(error: Exception) -> CaptchaErrorType:
    msg = str(error).lower()

    if "timeout" in msg or "503" in msg:
        return CaptchaErrorType.TRANSIENT
    elif "rate limit" in msg or "429" in msg:
        return CaptchaErrorType.RATE_LIMITED
    elif "invalid type" in msg or "wrong sitekey" in msg:
        return CaptchaErrorType.BAD_INPUT
    elif "expired" in msg or "token too old" in msg:
        return CaptchaErrorType.EXPIRED
    elif "banned" in msg or "invalid key" in msg:
        return CaptchaErrorType.FATAL
    else:
        return CaptchaErrorType.TRANSIENT
Enter fullscreen mode Exit fullscreen mode

Step 2: Smart Retry with Exponential Backoff

Different error types need different retry strategies:

import asyncio
import random
from dataclasses import dataclass

@dataclassclass RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 30.0
    jitter: bool = True

RETRY_STRATEGIES = {
    CaptchaErrorType.TRANSIENT: RetryConfig(
        max_attempts=3, base_delay=1.0
    ),
    CaptchaErrorType.RATE_LIMITED: RetryConfig(
        max_attempts=5, base_delay=5.0, max_delay=60.0
    ),
    CaptchaErrorType.BAD_INPUT: RetryConfig(
        max_attempts=2, base_delay=0.5
    ),
    CaptchaErrorType.EXPIRED: RetryConfig(
        max_attempts=3, base_delay=0.1
    ),
}

async def retry_solve(
    solve_fn, *args, 
    config: RetryConfig = None,
    **kwargs
) -> str:
    config = config or RetryConfig()
    last_error = None

    for attempt in range(config.max_attempts):
        try:
            return await solve_fn(*args, **kwargs)
        except Exception as e:
            last_error = e
            error_type = classify_error(e)

            if error_type == CaptchaErrorType.FATAL:
                raise  # Don't retry fatal errors

            strategy = RETRY_STRATEGIES.get(
                error_type, config
            )

            if attempt >= strategy.max_attempts - 1:
                raise

            # Calculate delay with exponential backoff
            delay = min(
                strategy.base_delay * (2 ** attempt),
                strategy.max_delay
            )

            # Add jitter to prevent thundering herd
            if strategy.jitter:
                delay *= (0.5 + random.random())

            print(
                f"Attempt {attempt + 1} failed "
                f"({error_type.value}), "
                f"retrying in {delay:.1f}s..."
            )
            await asyncio.sleep(delay)

    raise last_error
Enter fullscreen mode Exit fullscreen mode

Step 3: Token Freshness Guard

CAPTCHA tokens expire (typically 120 seconds). If your pipeline is slow, the token might expire before you use it:

import time

class CaptchaToken:
    def __init__(self, value: str, ttl: int = 110):
        self.value = value
        self.created_at = time.time()
        self.ttl = ttl  # 110s buffer (actual: 120s)

    @property    def is_valid(self) -> bool:
        return time.time() - self.created_at < self.ttl

    @property    def remaining_seconds(self) -> float:
        return max(
            0, 
            self.ttl - (time.time() - self.created_at)
        )

async def solve_and_use(
    sitekey: str, 
    url: str, 
    submit_fn
):
    """Solve CAPTCHA and submit immediately."""
    for attempt in range(3):
        token = CaptchaToken(
            await solve_captcha(sitekey, url)
        )

        if not token.is_valid:
            print("Token already expired, re-solving...")
            continue

        try:
            # Submit while token is still fresh
            result = await submit_fn(token.value)
            return result
        except TokenExpiredError:
            print(
                f"Token expired after "
                f"{120 - token.remaining_seconds:.0f}s"
            )
            continue

    raise Exception("Failed after 3 solve attempts")
Enter fullscreen mode Exit fullscreen mode

Step 4: Circuit Breaker Pattern

If the CAPTCHA-solving service is down, don't keep hammering it. Use a circuit breaker:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing, reject requests
    HALF_OPEN = "half_open" # Testing recovery

class CaptchaCircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED

    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True

        if self.state == CircuitState.OPEN:
            # Check if recovery timeout has passed
            elapsed = time.time() - self.last_failure_time
            if elapsed >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False

        # HALF_OPEN: allow one request through
        return True

    def record_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(
                f"Circuit OPEN — pausing solves "
                f"for {self.recovery_timeout}s"
            )

# Usage
breaker = CaptchaCircuitBreaker(
    failure_threshold=5, 
    recovery_timeout=60
)

async def protected_solve(sitekey, url):
    if not breaker.can_execute():
        raise Exception(
            "Circuit open — CAPTCHA service unavailable"
        )

    try:
        token = await solve_captcha(sitekey, url)
        breaker.record_success()
        return token
    except Exception as e:
        breaker.record_failure()
        raise
Enter fullscreen mode Exit fullscreen mode

Step 5: Concurrency Control with Semaphore

Don't blast the solving API with 100 concurrent requests. Use a semaphore:

import asyncio

class CaptchaSolver:
    def __init__(
        self, 
        max_concurrent: int = 10,
        api_base: str = "https://www.passxapi.com"
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.breaker = CaptchaCircuitBreaker()
        self.api_base = api_base
        self._stats = {
            "solved": 0, "failed": 0, "expired": 0
        }

    async def solve(
        self, 
        captcha_type: str,
        sitekey: str, 
        url: str
    ) -> CaptchaToken:
        async with self.semaphore:
            if not self.breaker.can_execute():
                raise Exception("Service circuit open")

            try:
                token_str = await retry_solve(
                    self._do_solve,
                    captcha_type, sitekey, url
                )
                self.breaker.record_success()
                self._stats["solved"] += 1
                return CaptchaToken(token_str)
            except Exception as e:
                self.breaker.record_failure()
                self._stats["failed"] += 1
                raise

    async def _do_solve(
        self, captcha_type, sitekey, url
    ):
        import httpx
        async with httpx.AsyncClient() as client:
            # Submit task
            resp = await client.post(
                f"{self.api_base}/api/v1/task",
                json={
                    "type": captcha_type,
                    "sitekey": sitekey,
                    "pageurl": url
                }
            )
            task_id = resp.json()["task_id"]

            # Poll with timeout
            for _ in range(60):
                result = await client.get(
                    f"{self.api_base}/api/v1/task/"
                    f"{task_id}"
                )
                data = result.json()
                if data["status"] == "completed":
                    return data["token"]
                if data["status"] == "failed":
                    raise Exception(data.get("error"))
                await asyncio.sleep(2)

            raise TimeoutError("Solve timed out")

    @property    def stats(self):
        total = self._stats["solved"] + self._stats["failed"]
        rate = (
            self._stats["solved"] / total * 100 
            if total > 0 else 0
        )
        return {
            **self._stats,
            "success_rate": f"{rate:.1f}%"
        }
Enter fullscreen mode Exit fullscreen mode

Step 6: Putting It All Together

import asyncio

async def scrape_with_captchas(urls: list[str]):
    solver = CaptchaSolver(max_concurrent=10)
    results = []

    async def process_url(url):
        async with httpx.AsyncClient() as client:
            resp = await client.get(url)

            # Detect CAPTCHA
            captcha = detect_captcha(resp.text)
            if not captcha:
                return resp.text

            # Solve with full resilience
            token = await solver.solve(
                captcha_type=captcha["type"],
                sitekey=captcha["sitekey"],
                url=url
            )

            if not token.is_valid:
                raise Exception("Token expired pre-submit")

            # Submit with token
            return await client.post(url, data={
                "captcha_response": token.value
            })

    tasks = [process_url(u) for u in urls]
    results = await asyncio.gather(
        *tasks, return_exceptions=True
    )

    # Report stats
    print(f"Solver stats: {solver.stats}")

    successes = [
        r for r in results 
        if not isinstance(r, Exception)
    ]
    print(
        f"Scraped {len(successes)}/{len(urls)} "
        f"pages successfully"
    )

    return results

# Run it
asyncio.run(
    scrape_with_captchas(urls_to_scrape)
)
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Classify errors — don't treat all failures the same
  2. Exponential backoff + jitter — prevents thundering herd on retries
  3. Token freshness tracking — solve late, submit immediately
  4. Circuit breaker — stop hammering a failing service
  5. Semaphore — control concurrency to avoid rate limits
  6. Stats tracking — know your success rate in production

These patterns work with any CAPTCHA-solving service. For a Python client that handles the API communication, check out passxapi-python — you can wrap it with these resilience patterns for production use.


What error handling patterns do you use in your scrapers? Share in the comments.

Top comments (0)