You need to scrape 10,000 product pages. With requests\, that's ~14 hours (5s per page). With aiohttp\ and proper concurrency control, it's under 30 minutes.
But here's the catch: at scale, you will hit CAPTCHAs. Let's build an async scraper from scratch that handles them transparently.
Why aiohttp Over requests/httpx?
Quick comparison for I/O-bound scraping:
| Library | 1000 pages | Async | Memory |
|---|---|---|---|
| requests | ~83 min | No | Low |
| httpx (async) | ~8 min | Yes | Medium |
| aiohttp | ~5 min | Yes | Low |
aiohttp is purpose-built for async HTTP. It reuses connections aggressively and has lower overhead per request than httpx.
Project Structure
async_scraper/
├── scraper/
│ ├── __init__.py
│ ├── client.py # aiohttp session management
│ ├── captcha.py # CAPTCHA detection + solving
│ ├── pipeline.py # Data processing pipeline
│ └── rate_limiter.py # Concurrency + rate control
├── config.py
├── main.py
└── requirements.txt
Step 1: The Rate Limiter
Don't blast servers with 1000 concurrent requests. Use a token bucket:
# scraper/rate_limiter.py
import asyncio
import time
class TokenBucket:
"""Rate limiter using token bucket algorithm."""
def __init__(
self,
rate: float, # tokens per second
max_tokens: int = 10
):
self.rate = rate
self.max_tokens = max_tokens
self._tokens = max_tokens
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(
self.max_tokens,
self._tokens + elapsed * self.rate
)
self._last_refill = now
if self._tokens < 1:
wait_time = (1 - self._tokens) / self.rate
await asyncio.sleep(wait_time)
self._tokens = 0
else:
self._tokens -= 1
class ConcurrencyLimiter:
"""Combines semaphore + rate limiting."""
def __init__(
self,
max_concurrent: int = 20,
requests_per_second: float = 10.0
):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.bucket = TokenBucket(
rate=requests_per_second
)
async def __aenter__(self):
await self.semaphore.acquire()
await self.bucket.acquire()
return self
async def __aexit__(self, *args):
self.semaphore.release()
Step 2: CAPTCHA Detection and Solving
Build a detector that works across CAPTCHA types:
# scraper/captcha.py
import re
from dataclasses import dataclass
from enum import Enum
class CaptchaType(Enum):
RECAPTCHA_V2 = "recaptcha_v2"
RECAPTCHA_V3 = "recaptcha_v3"
HCAPTCHA = "hcaptcha"
TURNSTILE = "turnstile"
NONE = "none"
@dataclassclass CaptchaInfo:
type: CaptchaType
sitekey: str
page_url: str
extra: dict = None
CAPTCHA_PATTERNS = [
{
"type": CaptchaType.RECAPTCHA_V2,
"selector": r'class="g-recaptcha"[^>]*data-sitekey="([^"]+)"',
},
{
"type": CaptchaType.RECAPTCHA_V3,
"selector": r'grecaptcha\.execute\([\'"]([^\'"]+)',
},
{
"type": CaptchaType.HCAPTCHA,
"selector": r'class="h-captcha"[^>]*data-sitekey="([^"]+)"',
},
{
"type": CaptchaType.TURNSTILE,
"selector": r'class="cf-turnstile"[^>]*data-sitekey="([^"]+)"',
},
]
def detect_captcha(html: str, url: str) -> CaptchaInfo | None:
for pattern in CAPTCHA_PATTERNS:
match = re.search(pattern["selector"], html)
if match:
return CaptchaInfo(
type=pattern["type"],
sitekey=match.group(1),
page_url=url
)
return None
Now the async solver:
# scraper/captcha.py (continued)
import aiohttp
import asyncio
class AsyncCaptchaSolver:
def __init__(
self,
api_base: str = "https://www.passxapi.com",
max_concurrent_solves: int = 5
):
self.api_base = api_base
self.semaphore = asyncio.Semaphore(
max_concurrent_solves
)
self.stats = {"solved": 0, "failed": 0}
async def solve(
self,
info: CaptchaInfo,
session: aiohttp.ClientSession
) -> str | None:
async with self.semaphore:
try:
# Submit task
async with session.post(
f"{self.api_base}/api/v1/task",
json={
"type": info.type.value,
"sitekey": info.sitekey,
"pageurl": info.page_url,
**(info.extra or {})
}
) as resp:
data = await resp.json()
task_id = data["task_id"]
# Poll for result
for _ in range(60):
async with session.get(
f"{self.api_base}/api/v1/task/{task_id}"
) as resp:
result = await resp.json()
if result["status"] == "completed":
self.stats["solved"] += 1
return result["token"]
if result["status"] == "failed":
raise Exception(result.get("error"))
await asyncio.sleep(2)
raise TimeoutError("Solve timed out")
except Exception as e:
self.stats["failed"] += 1
print(f"CAPTCHA solve failed: {e}")
return None
Step 3: The Main Scraper Client
# scraper/client.py
import aiohttp
import asyncio
from .captcha import detect_captcha, AsyncCaptchaSolver
from .rate_limiter import ConcurrencyLimiter
class AsyncScraper:
def __init__(
self,
max_concurrent: int = 20,
requests_per_second: float = 10.0,
max_retries: int = 3
):
self.limiter = ConcurrencyLimiter(
max_concurrent, requests_per_second
)
self.solver = AsyncCaptchaSolver()
self.max_retries = max_retries
self.session = None
self._results = []
async def __aenter__(self):
connector = aiohttp.TCPConnector(
limit=100, # Connection pool size
ttl_dns_cache=300, # DNS cache 5 min
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=30),
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36"
)
}
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def fetch(self, url: str) -> dict:
"""Fetch a URL with CAPTCHA handling."""
for attempt in range(self.max_retries):
async with self.limiter:
try:
async with self.session.get(url) as resp:
html = await resp.text()
# Check for CAPTCHA
captcha = detect_captcha(html, url)
if captcha:
token = await self.solver.solve(
captcha, self.session
)
if token:
# Resubmit with token
html = await self._submit_with_token(
url, token, captcha
)
return {
"url": url,
"status": resp.status,
"html": html,
"captcha_solved": captcha is not None
}
except (
aiohttp.ClientError,
asyncio.TimeoutError
) as e:
if attempt == self.max_retries - 1:
return {
"url": url, "error": str(e)
}
# Exponential backoff
await asyncio.sleep(2 ** attempt)
async def _submit_with_token(
self, url, token, captcha_info
):
"""POST the solved token back to the page."""
field_names = {
"recaptcha_v2": "g-recaptcha-response",
"recaptcha_v3": "g-recaptcha-response",
"hcaptcha": "h-captcha-response",
"turnstile": "cf-turnstile-response",
}
field = field_names.get(
captcha_info.type.value,
"captcha_response"
)
async with self.session.post(
url, data={field: token}
) as resp:
return await resp.text()
async def scrape_many(
self, urls: list[str]
) -> list[dict]:
"""Scrape multiple URLs concurrently."""
tasks = [self.fetch(url) for url in urls]
results = await asyncio.gather(
*tasks, return_exceptions=True
)
# Convert exceptions to error dicts
processed = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
processed.append({
"url": url, "error": str(result)
})
else:
processed.append(result)
return processed
Step 4: Data Processing Pipeline
Process results as they come in, don't wait for all pages:
# scraper/pipeline.py
import asyncio
import json
from pathlib import Path
class DataPipeline:
def __init__(self, output_dir: str = "output"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.queue = asyncio.Queue()
self.processed = 0
async def process(self, result: dict):
"""Extract and save data from a scraped page."""
if "error" in result:
print(f" ✗ {result['url']}: {result['error']}")
return
# Your extraction logic here
data = self.extract(result["html"], result["url"])
if data:
await self.queue.put(data)
self.processed += 1
def extract(self, html: str, url: str) -> dict | None:
"""Override this with your parsing logic."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
title = soup.find("h1")
price = soup.find(class_="price")
if not title:
return None
return {
"url": url,
"title": title.text.strip(),
"price": price.text.strip() if price else None,
}
async def writer(self):
"""Background task that writes results to disk."""
output_file = self.output_dir / "results.jsonl"
with open(output_file, "a") as f:
while True:
try:
data = await asyncio.wait_for(
self.queue.get(), timeout=5.0
)
f.write(json.dumps(data) + "\n")
f.flush()
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
# Drain remaining items
while not self.queue.empty():
data = self.queue.get_nowait()
f.write(json.dumps(data) + "\n")
break
Step 5: Putting It All Together
# main.py
import asyncio
import time
from scraper.client import AsyncScraper
from scraper.pipeline import DataPipeline
async def main():
urls = [
f"https://example-shop.com/product/{i}"
for i in range(1, 10001)
]
pipeline = DataPipeline(output_dir="output")
writer_task = asyncio.create_task(pipeline.writer())
print(f"Scraping {len(urls)} URLs...")
start = time.monotonic()
async with AsyncScraper(
max_concurrent=20,
requests_per_second=10
) as scraper:
# Process in batches of 100
batch_size = 100
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
results = await scraper.scrape_many(batch)
for result in results:
await pipeline.process(result)
elapsed = time.monotonic() - start
rate = (i + len(batch)) / elapsed
print(
f" Batch {i // batch_size + 1}: "
f"{rate:.0f} pages/min | "
f"CAPTCHAs solved: "
f"{scraper.solver.stats['solved']}"
)
# Stop the writer
writer_task.cancel()
try:
await writer_task
except asyncio.CancelledError:
pass
elapsed = time.monotonic() - start
print(f"\nDone! {len(urls)} pages in {elapsed:.0f}s")
print(f"Extracted: {pipeline.processed} items")
print(f"CAPTCHA stats: {scraper.solver.stats}")
if __name__ == "__main__":
asyncio.run(main())
Performance Tuning Tips
1. Connection Pool Size Matters
# Too low: connections wait in queue
connector = aiohttp.TCPConnector(limit=10) # Slow
# Sweet spot: matches your concurrency
connector = aiohttp.TCPConnector(limit=100) # Good
# Too high: target server may block you
connector = aiohttp.TCPConnector(limit=1000) # Risky
2. DNS Caching Saves Milliseconds Per Request
# Without cache: DNS lookup every request (~50ms each)
# With cache: DNS lookup once per domain
connector = aiohttp.TCPConnector(
ttl_dns_cache=300, # Cache for 5 minutes
use_dns_cache=True # Enabled by default
)
3. Separate CAPTCHA Solve Concurrency
Don't let CAPTCHA solves eat all your concurrent slots:
# Main scraping: 20 concurrent
scraper_sem = asyncio.Semaphore(20)
# CAPTCHA solving: 5 concurrent (separate pool)
solver_sem = asyncio.Semaphore(5)
4. Monitor and Adjust in Real-Time
async def adaptive_rate(scraper, initial_rate=10):
"""Adjust rate based on error ratio."""
rate = initial_rate
while True:
await asyncio.sleep(60)
stats = scraper.solver.stats
total = stats["solved"] + stats["failed"]
if total > 0:
error_rate = stats["failed"] / total
if error_rate > 0.3:
rate *= 0.7 # Back off
elif error_rate < 0.05:
rate *= 1.2 # Speed up
scraper.limiter.bucket.rate = rate
Expected Performance
With 20 concurrent connections and 10 req/s:
| Pages | No CAPTCHAs | 10% CAPTCHA rate | 50% CAPTCHA rate |
|---|---|---|---|
| 100 | ~10s | ~15s | ~45s |
| 1,000 | ~100s | ~150s | ~450s |
| 10,000 | ~17min | ~25min | ~75min |
The CAPTCHA overhead scales with hit rate, not total pages — because solves happen concurrently with other fetches.
Wrapping Up
The key patterns for high-throughput async scraping:
- Token bucket for rate limiting (not just semaphore)
- Separate concurrency pools for fetching vs CAPTCHA solving
- Streaming pipeline — process data as it arrives
- Adaptive rate control — respond to errors dynamically
For the CAPTCHA-solving integration used in these examples, check out passxapi-python — it provides both sync and async clients that plug into aiohttp workflows.
What throughput are you getting with your scrapers? Drop your numbers in the comments.
Top comments (0)