DEV Community

11111
11111

Posted on

Building a High-Throughput Async Scraper with aiohttp and Automatic CAPTCHA Handling

You need to scrape 10,000 product pages. With requests\, that's ~14 hours (5s per page). With aiohttp\ and proper concurrency control, it's under 30 minutes.

But here's the catch: at scale, you will hit CAPTCHAs. Let's build an async scraper from scratch that handles them transparently.

Why aiohttp Over requests/httpx?

Quick comparison for I/O-bound scraping:

Library 1000 pages Async Memory
requests ~83 min No Low
httpx (async) ~8 min Yes Medium
aiohttp ~5 min Yes Low

aiohttp is purpose-built for async HTTP. It reuses connections aggressively and has lower overhead per request than httpx.

Project Structure

async_scraper/
├── scraper/
│   ├── __init__.py
│   ├── client.py        # aiohttp session management
│   ├── captcha.py       # CAPTCHA detection + solving
│   ├── pipeline.py      # Data processing pipeline
│   └── rate_limiter.py  # Concurrency + rate control
├── config.py
├── main.py
└── requirements.txt
Enter fullscreen mode Exit fullscreen mode

Step 1: The Rate Limiter

Don't blast servers with 1000 concurrent requests. Use a token bucket:

# scraper/rate_limiter.py
import asyncio
import time

class TokenBucket:
    """Rate limiter using token bucket algorithm."""

    def __init__(
        self, 
        rate: float,        # tokens per second
        max_tokens: int = 10
    ):
        self.rate = rate
        self.max_tokens = max_tokens
        self._tokens = max_tokens
        self._last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self._last_refill
            self._tokens = min(
                self.max_tokens,
                self._tokens + elapsed * self.rate
            )
            self._last_refill = now

            if self._tokens < 1:
                wait_time = (1 - self._tokens) / self.rate
                await asyncio.sleep(wait_time)
                self._tokens = 0
            else:
                self._tokens -= 1


class ConcurrencyLimiter:
    """Combines semaphore + rate limiting."""

    def __init__(
        self, 
        max_concurrent: int = 20,
        requests_per_second: float = 10.0
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.bucket = TokenBucket(
            rate=requests_per_second
        )

    async def __aenter__(self):
        await self.semaphore.acquire()
        await self.bucket.acquire()
        return self

    async def __aexit__(self, *args):
        self.semaphore.release()
Enter fullscreen mode Exit fullscreen mode

Step 2: CAPTCHA Detection and Solving

Build a detector that works across CAPTCHA types:

# scraper/captcha.py
import re
from dataclasses import dataclass
from enum import Enum

class CaptchaType(Enum):
    RECAPTCHA_V2 = "recaptcha_v2"
    RECAPTCHA_V3 = "recaptcha_v3"
    HCAPTCHA = "hcaptcha"
    TURNSTILE = "turnstile"
    NONE = "none"

@dataclassclass CaptchaInfo:
    type: CaptchaType
    sitekey: str
    page_url: str
    extra: dict = None

CAPTCHA_PATTERNS = [
    {
        "type": CaptchaType.RECAPTCHA_V2,
        "selector": r'class="g-recaptcha"[^>]*data-sitekey="([^"]+)"',
    },
    {
        "type": CaptchaType.RECAPTCHA_V3,
        "selector": r'grecaptcha\.execute\([\'"]([^\'"]+)',
    },
    {
        "type": CaptchaType.HCAPTCHA,
        "selector": r'class="h-captcha"[^>]*data-sitekey="([^"]+)"',
    },
    {
        "type": CaptchaType.TURNSTILE,
        "selector": r'class="cf-turnstile"[^>]*data-sitekey="([^"]+)"',
    },
]

def detect_captcha(html: str, url: str) -> CaptchaInfo | None:
    for pattern in CAPTCHA_PATTERNS:
        match = re.search(pattern["selector"], html)
        if match:
            return CaptchaInfo(
                type=pattern["type"],
                sitekey=match.group(1),
                page_url=url
            )
    return None
Enter fullscreen mode Exit fullscreen mode

Now the async solver:

# scraper/captcha.py (continued)
import aiohttp
import asyncio

class AsyncCaptchaSolver:
    def __init__(
        self, 
        api_base: str = "https://www.passxapi.com",
        max_concurrent_solves: int = 5
    ):
        self.api_base = api_base
        self.semaphore = asyncio.Semaphore(
            max_concurrent_solves
        )
        self.stats = {"solved": 0, "failed": 0}

    async def solve(
        self, 
        info: CaptchaInfo,
        session: aiohttp.ClientSession
    ) -> str | None:
        async with self.semaphore:
            try:
                # Submit task
                async with session.post(
                    f"{self.api_base}/api/v1/task",
                    json={
                        "type": info.type.value,
                        "sitekey": info.sitekey,
                        "pageurl": info.page_url,
                        **(info.extra or {})
                    }
                ) as resp:
                    data = await resp.json()
                    task_id = data["task_id"]

                # Poll for result
                for _ in range(60):
                    async with session.get(
                        f"{self.api_base}/api/v1/task/{task_id}"
                    ) as resp:
                        result = await resp.json()
                        if result["status"] == "completed":
                            self.stats["solved"] += 1
                            return result["token"]
                        if result["status"] == "failed":
                            raise Exception(result.get("error"))
                    await asyncio.sleep(2)

                raise TimeoutError("Solve timed out")

            except Exception as e:
                self.stats["failed"] += 1
                print(f"CAPTCHA solve failed: {e}")
                return None
Enter fullscreen mode Exit fullscreen mode

Step 3: The Main Scraper Client

# scraper/client.py
import aiohttp
import asyncio
from .captcha import detect_captcha, AsyncCaptchaSolver
from .rate_limiter import ConcurrencyLimiter

class AsyncScraper:
    def __init__(
        self,
        max_concurrent: int = 20,
        requests_per_second: float = 10.0,
        max_retries: int = 3
    ):
        self.limiter = ConcurrencyLimiter(
            max_concurrent, requests_per_second
        )
        self.solver = AsyncCaptchaSolver()
        self.max_retries = max_retries
        self.session = None
        self._results = []

    async def __aenter__(self):
        connector = aiohttp.TCPConnector(
            limit=100,           # Connection pool size
            ttl_dns_cache=300,   # DNS cache 5 min
            enable_cleanup_closed=True
        )
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(total=30),
            headers={
                "User-Agent": (
                    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                    "AppleWebKit/537.36"
                )
            }
        )
        return self

    async def __aexit__(self, *args):
        if self.session:
            await self.session.close()

    async def fetch(self, url: str) -> dict:
        """Fetch a URL with CAPTCHA handling."""
        for attempt in range(self.max_retries):
            async with self.limiter:
                try:
                    async with self.session.get(url) as resp:
                        html = await resp.text()

                        # Check for CAPTCHA
                        captcha = detect_captcha(html, url)
                        if captcha:
                            token = await self.solver.solve(
                                captcha, self.session
                            )
                            if token:
                                # Resubmit with token
                                html = await self._submit_with_token(
                                    url, token, captcha
                                )

                        return {
                            "url": url,
                            "status": resp.status,
                            "html": html,
                            "captcha_solved": captcha is not None
                        }

                except (
                    aiohttp.ClientError,
                    asyncio.TimeoutError
                ) as e:
                    if attempt == self.max_retries - 1:
                        return {
                            "url": url, "error": str(e)
                        }
                    # Exponential backoff
                    await asyncio.sleep(2 ** attempt)

    async def _submit_with_token(
        self, url, token, captcha_info
    ):
        """POST the solved token back to the page."""
        field_names = {
            "recaptcha_v2": "g-recaptcha-response",
            "recaptcha_v3": "g-recaptcha-response",
            "hcaptcha": "h-captcha-response",
            "turnstile": "cf-turnstile-response",
        }
        field = field_names.get(
            captcha_info.type.value, 
            "captcha_response"
        )

        async with self.session.post(
            url, data={field: token}
        ) as resp:
            return await resp.text()

    async def scrape_many(
        self, urls: list[str]
    ) -> list[dict]:
        """Scrape multiple URLs concurrently."""
        tasks = [self.fetch(url) for url in urls]
        results = await asyncio.gather(
            *tasks, return_exceptions=True
        )

        # Convert exceptions to error dicts
        processed = []
        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                processed.append({
                    "url": url, "error": str(result)
                })
            else:
                processed.append(result)

        return processed
Enter fullscreen mode Exit fullscreen mode

Step 4: Data Processing Pipeline

Process results as they come in, don't wait for all pages:

# scraper/pipeline.py
import asyncio
import json
from pathlib import Path

class DataPipeline:
    def __init__(self, output_dir: str = "output"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.queue = asyncio.Queue()
        self.processed = 0

    async def process(self, result: dict):
        """Extract and save data from a scraped page."""
        if "error" in result:
            print(f"{result['url']}: {result['error']}")
            return

        # Your extraction logic here
        data = self.extract(result["html"], result["url"])

        if data:
            await self.queue.put(data)
            self.processed += 1

    def extract(self, html: str, url: str) -> dict | None:
        """Override this with your parsing logic."""
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(html, "html.parser")

        title = soup.find("h1")
        price = soup.find(class_="price")

        if not title:
            return None

        return {
            "url": url,
            "title": title.text.strip(),
            "price": price.text.strip() if price else None,
        }

    async def writer(self):
        """Background task that writes results to disk."""
        output_file = self.output_dir / "results.jsonl"

        with open(output_file, "a") as f:
            while True:
                try:
                    data = await asyncio.wait_for(
                        self.queue.get(), timeout=5.0
                    )
                    f.write(json.dumps(data) + "\n")
                    f.flush()
                except asyncio.TimeoutError:
                    continue
                except asyncio.CancelledError:
                    # Drain remaining items
                    while not self.queue.empty():
                        data = self.queue.get_nowait()
                        f.write(json.dumps(data) + "\n")
                    break
Enter fullscreen mode Exit fullscreen mode

Step 5: Putting It All Together

# main.py
import asyncio
import time
from scraper.client import AsyncScraper
from scraper.pipeline import DataPipeline

async def main():
    urls = [
        f"https://example-shop.com/product/{i}"
        for i in range(1, 10001)
    ]

    pipeline = DataPipeline(output_dir="output")
    writer_task = asyncio.create_task(pipeline.writer())

    print(f"Scraping {len(urls)} URLs...")
    start = time.monotonic()

    async with AsyncScraper(
        max_concurrent=20,
        requests_per_second=10
    ) as scraper:
        # Process in batches of 100
        batch_size = 100
        for i in range(0, len(urls), batch_size):
            batch = urls[i:i + batch_size]
            results = await scraper.scrape_many(batch)

            for result in results:
                await pipeline.process(result)

            elapsed = time.monotonic() - start
            rate = (i + len(batch)) / elapsed
            print(
                f"  Batch {i // batch_size + 1}: "
                f"{rate:.0f} pages/min | "
                f"CAPTCHAs solved: "
                f"{scraper.solver.stats['solved']}"
            )

    # Stop the writer
    writer_task.cancel()
    try:
        await writer_task
    except asyncio.CancelledError:
        pass

    elapsed = time.monotonic() - start
    print(f"\nDone! {len(urls)} pages in {elapsed:.0f}s")
    print(f"Extracted: {pipeline.processed} items")
    print(f"CAPTCHA stats: {scraper.solver.stats}")


if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Performance Tuning Tips

1. Connection Pool Size Matters

# Too low: connections wait in queue
connector = aiohttp.TCPConnector(limit=10)   # Slow

# Sweet spot: matches your concurrency
connector = aiohttp.TCPConnector(limit=100)  # Good

# Too high: target server may block you
connector = aiohttp.TCPConnector(limit=1000) # Risky
Enter fullscreen mode Exit fullscreen mode

2. DNS Caching Saves Milliseconds Per Request

# Without cache: DNS lookup every request (~50ms each)
# With cache: DNS lookup once per domain
connector = aiohttp.TCPConnector(
    ttl_dns_cache=300,   # Cache for 5 minutes
    use_dns_cache=True   # Enabled by default
)
Enter fullscreen mode Exit fullscreen mode

3. Separate CAPTCHA Solve Concurrency

Don't let CAPTCHA solves eat all your concurrent slots:

# Main scraping: 20 concurrent
scraper_sem = asyncio.Semaphore(20)

# CAPTCHA solving: 5 concurrent (separate pool)
solver_sem = asyncio.Semaphore(5)
Enter fullscreen mode Exit fullscreen mode

4. Monitor and Adjust in Real-Time

async def adaptive_rate(scraper, initial_rate=10):
    """Adjust rate based on error ratio."""
    rate = initial_rate
    while True:
        await asyncio.sleep(60)
        stats = scraper.solver.stats
        total = stats["solved"] + stats["failed"]
        if total > 0:
            error_rate = stats["failed"] / total
            if error_rate > 0.3:
                rate *= 0.7  # Back off
            elif error_rate < 0.05:
                rate *= 1.2  # Speed up
        scraper.limiter.bucket.rate = rate
Enter fullscreen mode Exit fullscreen mode

Expected Performance

With 20 concurrent connections and 10 req/s:

Pages No CAPTCHAs 10% CAPTCHA rate 50% CAPTCHA rate
100 ~10s ~15s ~45s
1,000 ~100s ~150s ~450s
10,000 ~17min ~25min ~75min

The CAPTCHA overhead scales with hit rate, not total pages — because solves happen concurrently with other fetches.

Wrapping Up

The key patterns for high-throughput async scraping:

  1. Token bucket for rate limiting (not just semaphore)
  2. Separate concurrency pools for fetching vs CAPTCHA solving
  3. Streaming pipeline — process data as it arrives
  4. Adaptive rate control — respond to errors dynamically

For the CAPTCHA-solving integration used in these examples, check out passxapi-python — it provides both sync and async clients that plug into aiohttp workflows.


What throughput are you getting with your scrapers? Drop your numbers in the comments.

Top comments (0)