Your scraper solves CAPTCHAs fine in dev. Then you deploy it, scale to 1000 pages/hour, and everything falls apart — timeouts, expired tokens, wrong CAPTCHA types, rate limits.
The difference between a hobby scraper and a production one isn't the happy path. It's how you handle failures. Let's build a resilient CAPTCHA-solving pipeline step by step.
The Problem: Naive Solve-and-Submit
Most tutorials show you this:
token = solve_captcha(sitekey, url)
submit_form(token)
This works until it doesn't. In production, you'll hit:
- Timeouts — solver takes too long
- Expired tokens — you solved it but submitted too late
- Wrong type detection — you sent "recaptcha_v2" but it's actually "recaptcha_v3"
- Rate limits — too many concurrent solves
- Service outages — the solving API is temporarily down
Step 1: Classify Your Errors
Not all errors deserve the same response. Group them:
from enum import Enum
class CaptchaErrorType(Enum):
TRANSIENT = "transient" # Retry immediately
RATE_LIMITED = "rate_limited" # Back off, then retry
BAD_INPUT = "bad_input" # Fix parameters, then retry
EXPIRED = "expired" # Re-solve from scratch
FATAL = "fatal" # Stop and alert
def classify_error(error: Exception) -> CaptchaErrorType:
msg = str(error).lower()
if "timeout" in msg or "503" in msg:
return CaptchaErrorType.TRANSIENT
elif "rate limit" in msg or "429" in msg:
return CaptchaErrorType.RATE_LIMITED
elif "invalid type" in msg or "wrong sitekey" in msg:
return CaptchaErrorType.BAD_INPUT
elif "expired" in msg or "token too old" in msg:
return CaptchaErrorType.EXPIRED
elif "banned" in msg or "invalid key" in msg:
return CaptchaErrorType.FATAL
else:
return CaptchaErrorType.TRANSIENT
Step 2: Smart Retry with Exponential Backoff
Different error types need different retry strategies:
import asyncio
import random
from dataclasses import dataclass
@dataclassclass RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 30.0
jitter: bool = True
RETRY_STRATEGIES = {
CaptchaErrorType.TRANSIENT: RetryConfig(
max_attempts=3, base_delay=1.0
),
CaptchaErrorType.RATE_LIMITED: RetryConfig(
max_attempts=5, base_delay=5.0, max_delay=60.0
),
CaptchaErrorType.BAD_INPUT: RetryConfig(
max_attempts=2, base_delay=0.5
),
CaptchaErrorType.EXPIRED: RetryConfig(
max_attempts=3, base_delay=0.1
),
}
async def retry_solve(
solve_fn, *args,
config: RetryConfig = None,
**kwargs
) -> str:
config = config or RetryConfig()
last_error = None
for attempt in range(config.max_attempts):
try:
return await solve_fn(*args, **kwargs)
except Exception as e:
last_error = e
error_type = classify_error(e)
if error_type == CaptchaErrorType.FATAL:
raise # Don't retry fatal errors
strategy = RETRY_STRATEGIES.get(
error_type, config
)
if attempt >= strategy.max_attempts - 1:
raise
# Calculate delay with exponential backoff
delay = min(
strategy.base_delay * (2 ** attempt),
strategy.max_delay
)
# Add jitter to prevent thundering herd
if strategy.jitter:
delay *= (0.5 + random.random())
print(
f"Attempt {attempt + 1} failed "
f"({error_type.value}), "
f"retrying in {delay:.1f}s..."
)
await asyncio.sleep(delay)
raise last_error
Step 3: Token Freshness Guard
CAPTCHA tokens expire (typically 120 seconds). If your pipeline is slow, the token might expire before you use it:
import time
class CaptchaToken:
def __init__(self, value: str, ttl: int = 110):
self.value = value
self.created_at = time.time()
self.ttl = ttl # 110s buffer (actual: 120s)
@property def is_valid(self) -> bool:
return time.time() - self.created_at < self.ttl
@property def remaining_seconds(self) -> float:
return max(
0,
self.ttl - (time.time() - self.created_at)
)
async def solve_and_use(
sitekey: str,
url: str,
submit_fn
):
"""Solve CAPTCHA and submit immediately."""
for attempt in range(3):
token = CaptchaToken(
await solve_captcha(sitekey, url)
)
if not token.is_valid:
print("Token already expired, re-solving...")
continue
try:
# Submit while token is still fresh
result = await submit_fn(token.value)
return result
except TokenExpiredError:
print(
f"Token expired after "
f"{120 - token.remaining_seconds:.0f}s"
)
continue
raise Exception("Failed after 3 solve attempts")
Step 4: Circuit Breaker Pattern
If the CAPTCHA-solving service is down, don't keep hammering it. Use a circuit breaker:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CaptchaCircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
elapsed = time.time() - self.last_failure_time
if elapsed >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN: allow one request through
return True
def record_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(
f"Circuit OPEN — pausing solves "
f"for {self.recovery_timeout}s"
)
# Usage
breaker = CaptchaCircuitBreaker(
failure_threshold=5,
recovery_timeout=60
)
async def protected_solve(sitekey, url):
if not breaker.can_execute():
raise Exception(
"Circuit open — CAPTCHA service unavailable"
)
try:
token = await solve_captcha(sitekey, url)
breaker.record_success()
return token
except Exception as e:
breaker.record_failure()
raise
Step 5: Concurrency Control with Semaphore
Don't blast the solving API with 100 concurrent requests. Use a semaphore:
import asyncio
class CaptchaSolver:
def __init__(
self,
max_concurrent: int = 10,
api_base: str = "https://www.passxapi.com"
):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.breaker = CaptchaCircuitBreaker()
self.api_base = api_base
self._stats = {
"solved": 0, "failed": 0, "expired": 0
}
async def solve(
self,
captcha_type: str,
sitekey: str,
url: str
) -> CaptchaToken:
async with self.semaphore:
if not self.breaker.can_execute():
raise Exception("Service circuit open")
try:
token_str = await retry_solve(
self._do_solve,
captcha_type, sitekey, url
)
self.breaker.record_success()
self._stats["solved"] += 1
return CaptchaToken(token_str)
except Exception as e:
self.breaker.record_failure()
self._stats["failed"] += 1
raise
async def _do_solve(
self, captcha_type, sitekey, url
):
import httpx
async with httpx.AsyncClient() as client:
# Submit task
resp = await client.post(
f"{self.api_base}/api/v1/task",
json={
"type": captcha_type,
"sitekey": sitekey,
"pageurl": url
}
)
task_id = resp.json()["task_id"]
# Poll with timeout
for _ in range(60):
result = await client.get(
f"{self.api_base}/api/v1/task/"
f"{task_id}"
)
data = result.json()
if data["status"] == "completed":
return data["token"]
if data["status"] == "failed":
raise Exception(data.get("error"))
await asyncio.sleep(2)
raise TimeoutError("Solve timed out")
@property def stats(self):
total = self._stats["solved"] + self._stats["failed"]
rate = (
self._stats["solved"] / total * 100
if total > 0 else 0
)
return {
**self._stats,
"success_rate": f"{rate:.1f}%"
}
Step 6: Putting It All Together
import asyncio
async def scrape_with_captchas(urls: list[str]):
solver = CaptchaSolver(max_concurrent=10)
results = []
async def process_url(url):
async with httpx.AsyncClient() as client:
resp = await client.get(url)
# Detect CAPTCHA
captcha = detect_captcha(resp.text)
if not captcha:
return resp.text
# Solve with full resilience
token = await solver.solve(
captcha_type=captcha["type"],
sitekey=captcha["sitekey"],
url=url
)
if not token.is_valid:
raise Exception("Token expired pre-submit")
# Submit with token
return await client.post(url, data={
"captcha_response": token.value
})
tasks = [process_url(u) for u in urls]
results = await asyncio.gather(
*tasks, return_exceptions=True
)
# Report stats
print(f"Solver stats: {solver.stats}")
successes = [
r for r in results
if not isinstance(r, Exception)
]
print(
f"Scraped {len(successes)}/{len(urls)} "
f"pages successfully"
)
return results
# Run it
asyncio.run(
scrape_with_captchas(urls_to_scrape)
)
Key Takeaways
- Classify errors — don't treat all failures the same
- Exponential backoff + jitter — prevents thundering herd on retries
- Token freshness tracking — solve late, submit immediately
- Circuit breaker — stop hammering a failing service
- Semaphore — control concurrency to avoid rate limits
- Stats tracking — know your success rate in production
These patterns work with any CAPTCHA-solving service. For a Python client that handles the API communication, check out passxapi-python — you can wrap it with these resilience patterns for production use.
What error handling patterns do you use in your scrapers? Share in the comments.
Top comments (0)