DEV Community

agenthustler
agenthustler

Posted on

Python AsyncIO for Web Scraping: 10x Faster Data Collection

Why AsyncIO Changes Everything

Traditional synchronous scraping wastes 90% of its time waiting for HTTP responses. While one request waits, your CPU sits idle. AsyncIO lets you fire hundreds of requests concurrently, turning a 10-minute scrape into a 60-second one.

Let's build an async scraper that is 10x faster than the synchronous version.

Synchronous vs Async: The Numbers

# Synchronous: 100 pages = 100 * 2 seconds = 200 seconds
import requests
import time

urls = [f"https://example.com/page/{i}" for i in range(100)]

start = time.time()
for url in urls:
    resp = requests.get(url)  # Blocks here for ~2 seconds
print(f"Sync: {time.time() - start:.1f}s")  # ~200 seconds
Enter fullscreen mode Exit fullscreen mode
# Async: 100 pages = ~4 seconds (50 concurrent)
import aiohttp
import asyncio
import time

async def fetch_all(urls, concurrency=50):
    semaphore = asyncio.Semaphore(concurrency)

    async def fetch_one(session, url):
        async with semaphore:
            async with session.get(url) as resp:
                return await resp.text()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

urls = [f"https://example.com/page/{i}" for i in range(100)]

start = time.time()
results = asyncio.run(fetch_all(urls))
print(f"Async: {time.time() - start:.1f}s")  # ~4 seconds
Enter fullscreen mode Exit fullscreen mode

Building a Production Async Scraper

Here is a complete async scraper with error handling, retries, and rate limiting:

import aiohttp
import asyncio
from bs4 import BeautifulSoup
from dataclasses import dataclass
from typing import Optional, List
import logging
import random

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ScrapedPage:
    url: str
    title: Optional[str]
    content: Optional[str]
    status: int
    error: Optional[str] = None

class AsyncScraper:
    def __init__(self, concurrency=20, max_retries=3, delay=0.1):
        self.semaphore = asyncio.Semaphore(concurrency)
        self.max_retries = max_retries
        self.delay = delay
        self.results = []
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
        ]

    async def fetch(self, session, url, retry=0):
        async with self.semaphore:
            try:
                headers = {"User-Agent": random.choice(self.user_agents)}
                async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                    if resp.status == 429:  # Rate limited
                        wait = 2 ** retry + random.random()
                        logger.warning(f"Rate limited on {url}, waiting {wait:.1f}s")
                        await asyncio.sleep(wait)
                        if retry < self.max_retries:
                            return await self.fetch(session, url, retry + 1)

                    html = await resp.text()
                    return self.parse(url, html, resp.status)
            except Exception as e:
                if retry < self.max_retries:
                    await asyncio.sleep(2 ** retry)
                    return await self.fetch(session, url, retry + 1)
                return ScrapedPage(url=url, title=None, content=None, status=0, error=str(e))
            finally:
                await asyncio.sleep(self.delay)  # Polite delay

    def parse(self, url, html, status):
        soup = BeautifulSoup(html, "html.parser")
        title = soup.title.string if soup.title else None
        content = soup.select_one("main, article, .content")
        return ScrapedPage(
            url=url,
            title=title,
            content=content.get_text(strip=True)[:500] if content else None,
            status=status
        )

    async def scrape_all(self, urls):
        connector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
        async with aiohttp.ClientSession(connector=connector) as session:
            tasks = [self.fetch(session, url) for url in urls]
            self.results = await asyncio.gather(*tasks)
        return self.results

    def report(self):
        success = sum(1 for r in self.results if r.status == 200)
        failed = sum(1 for r in self.results if r.error)
        logger.info(f"Results: {success} success, {failed} failed, {len(self.results)} total")

# Usage
async def main():
    scraper = AsyncScraper(concurrency=30, max_retries=3)
    urls = [f"https://example.com/product/{i}" for i in range(500)]
    results = await scraper.scrape_all(urls)
    scraper.report()
    return results

results = asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Advanced Pattern: Async Pipeline

Process data as it arrives instead of waiting for all requests:

import asyncio
import aiohttp
from collections import deque

async def scrape_pipeline(urls, concurrency=20):
    queue = asyncio.Queue()
    results = deque()

    # Producer: fetch pages
    async def producer(session, sem):
        for url in urls:
            async with sem:
                async with session.get(url) as resp:
                    html = await resp.text()
                    await queue.put((url, html))
        await queue.put(None)  # Signal done

    # Consumer: parse pages as they arrive
    async def consumer():
        while True:
            item = await queue.get()
            if item is None:
                break
            url, html = item
            soup = BeautifulSoup(html, "html.parser")
            title = soup.title.string if soup.title else "No title"
            results.append({"url": url, "title": title})
            # Can write to DB here in real-time

    sem = asyncio.Semaphore(concurrency)
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(
            producer(session, sem),
            consumer()
        )

    return list(results)
Enter fullscreen mode Exit fullscreen mode

Using AsyncIO with Proxy Services

Integrate ScraperAPI with async requests for both speed and anti-bot bypass:

async def fetch_via_proxy(session, url, api_key):
    proxy_url = f"https://api.scraperapi.com?api_key={api_key}&url={url}"
    async with session.get(proxy_url) as resp:
        return await resp.text()
Enter fullscreen mode Exit fullscreen mode

For residential proxy rotation, ThorData supports async connections through their SOCKS5 proxy endpoints.

Monitoring Async Scrapers

Async scrapers are harder to debug. Use ScrapeOps to track per-URL success rates, response times, and error patterns across your concurrent requests.

Performance Tips

  1. Tune concurrency — start at 20, increase until you see 429s
  2. Reuse connectionsTCPConnector(limit=100) keeps connections alive
  3. DNS cachingttl_dns_cache=300 avoids repeated DNS lookups
  4. Use orjson — 10x faster JSON parsing than stdlib
  5. Stream responses — use resp.content.read() for large files
  6. Monitor memory — set limit_per_host to prevent connection explosions

Conclusion

AsyncIO transforms web scraping performance. The same hardware that scrapes 100 pages per minute synchronously can handle 1,000+ pages per minute with async. Combined with proper rate limiting and error handling, you get both speed and reliability.

Top comments (0)