DEV Community

Alex Chen
Alex Chen

Posted on

Deploying a Production Scraper with Docker, GitHub Actions, and CAPTCHA Solving

You've built a scraper that handles CAPTCHAs. It works on your laptop. Now what?

Getting a scraper from "works locally" to "runs reliably in production" is a different kind of challenge. Here's the full deployment pipeline I use: Docker for packaging, GitHub Actions for CI/CD, and structured monitoring to know when things break.

Project Structure

my-scraper/
  scraper/
    __init__.py
    core.py          # Main scraping logic
    captcha.py       # CAPTCHA detection and solving
    config.py        # Settings from env vars
  tests/
    test_captcha.py
    test_core.py
  Dockerfile
  docker-compose.yml
  requirements.txt
  .github/
    workflows/
      test.yml       # Run tests on PR
      deploy.yml     # Build and deploy on merge
      scrape.yml     # Scheduled scraping runs
Enter fullscreen mode Exit fullscreen mode

The Scraper Module

Keep CAPTCHA solving separate from scraping logic:

# scraper/config.py
import os
from dataclasses import dataclass

@dataclassclass Config:
    passxapi_key: str = os.getenv("PASSXAPI_KEY", "")
    target_urls: list[str] = None
    max_concurrency: int = int(os.getenv("MAX_CONCURRENCY", "10"))
    output_path: str = os.getenv("OUTPUT_PATH", "/data/results.json")
    proxy_url: str = os.getenv("PROXY_URL", "")

    def __post_init__(self):
        if not self.passxapi_key:
            raise ValueError("PASSXAPI_KEY is required")
        if self.target_urls is None:
            self.target_urls = []
Enter fullscreen mode Exit fullscreen mode
# scraper/captcha.py
import httpx
import time
import logging
import re

logger = logging.getLogger(__name__)

class CaptchaSolver:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.stats = {"solved": 0, "failed": 0, "total_time": 0}

    async def detect(self, html: str, url: str) -> dict | None:
        html_lower = html.lower()

        types = {
            "recaptcha_v2": ["g-recaptcha", "recaptcha/api.js"],
            "hcaptcha": ["hcaptcha", "h-captcha"],
            "turnstile": ["cf-turnstile", "challenges.cloudflare.com/turnstile"],
        }

        for captcha_type, indicators in types.items():
            if any(ind in html_lower for ind in indicators):
                sitekey = self._extract_sitekey(html, captcha_type)
                if sitekey:
                    return {"type": captcha_type, "sitekey": sitekey, "url": url}

        return None

    def _extract_sitekey(self, html: str, captcha_type: str) -> str | None:
        # data-sitekey works for all types
        match = re.search(r'data-sitekey="([^"]+)"', html)
        if match:
            return match.group(1)

        # v3: check render parameter
        if captcha_type == "recaptcha_v2":
            match = re.search(r'render=([0-9A-Za-z_-]{20,})', html)
            if match:
                return match.group(1)

        return None

    async def solve(self, captcha_info: dict) -> str | None:
        start = time.time()

        try:
            async with httpx.AsyncClient() as client:
                resp = await client.post(
                    "https://api.passxapi.com/solve",
                    json={
                        "type": captcha_info["type"],
                        "sitekey": captcha_info["sitekey"],
                        "url": captcha_info["url"],
                    },
                    headers={"x-api-key": self.api_key},
                    timeout=60,
                )
                resp.raise_for_status()
                token = resp.json()["token"]

                duration = time.time() - start
                self.stats["solved"] += 1
                self.stats["total_time"] += duration
                logger.info(
                    f"CAPTCHA solved: type={captcha_info['type']}, "
                    f"time={duration:.1f}s"
                )
                return token

        except Exception as e:
            self.stats["failed"] += 1
            logger.error(f"CAPTCHA solve failed: {e}")
            return None

    def get_stats(self) -> dict:
        total = self.stats["solved"] + self.stats["failed"]
        return {
            "total": total,
            "solved": self.stats["solved"],
            "failed": self.stats["failed"],
            "success_rate": (
                self.stats["solved"] / total if total > 0 else 0
            ),
            "avg_time": (
                self.stats["total_time"] / self.stats["solved"]
                if self.stats["solved"] > 0 else 0
            ),
        }
Enter fullscreen mode Exit fullscreen mode
# scraper/core.py
import asyncio
import httpx
import json
import logging
from .config import Config
from .captcha import CaptchaSolver

logger = logging.getLogger(__name__)

class Scraper:
    def __init__(self, config: Config):
        self.config = config
        self.solver = CaptchaSolver(config.passxapi_key)
        self.results = []

    async def scrape_url(self, client: httpx.AsyncClient, 
                         url: str, semaphore: asyncio.Semaphore):
        async with semaphore:
            try:
                resp = await client.get(url, timeout=30)

                captcha = await self.solver.detect(resp.text, url)
                if captcha:
                    token = await self.solver.solve(captcha)
                    if token:
                        resp = await client.post(url, data={
                            "g-recaptcha-response": token,
                            "h-captcha-response": token,
                            "cf-turnstile-response": token,
                        })

                data = self.parse(resp.text, url)
                self.results.append(data)
                return data

            except Exception as e:
                logger.error(f"Failed to scrape {url}: {e}")
                return {"url": url, "error": str(e)}

    def parse(self, html: str, url: str) -> dict:
        # Override this for your specific site
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(html, "html.parser")
        return {
            "url": url,
            "title": soup.title.string if soup.title else None,
            "status": "ok",
        }

    async def run(self):
        semaphore = asyncio.Semaphore(self.config.max_concurrency)

        proxy = self.config.proxy_url or None
        async with httpx.AsyncClient(
            proxy=proxy,
            follow_redirects=True,
            headers={"User-Agent": "Mozilla/5.0 (compatible)"},
        ) as client:
            tasks = [
                self.scrape_url(client, url, semaphore)
                for url in self.config.target_urls
            ]
            await asyncio.gather(*tasks)

        # Save results
        with open(self.config.output_path, "w") as f:
            json.dump(self.results, f, indent=2)

        # Log stats
        stats = self.solver.get_stats()
        logger.info(
            f"Scraping complete: "
            f"{len(self.results)} pages, "
            f"{stats['solved']} CAPTCHAs solved, "
            f"{stats['success_rate']:.1%} success rate"
        )

        return self.results
Enter fullscreen mode Exit fullscreen mode

Dockerfile

FROM python:3.12-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy source
COPY scraper/ scraper/
COPY main.py .

# Create data directory
RUN mkdir -p /data

# Run as non-root
RUN useradd -m scraper
USER scraper

CMD ["python", "main.py"]
Enter fullscreen mode Exit fullscreen mode
# main.py
import asyncio
import logging
from scraper.config import Config
from scraper.core import Scraper

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

async def main():
    config = Config(
        target_urls=[
            "https://example.com/page/1",
            "https://example.com/page/2",
            # Load from file or API in production
        ]
    )

    scraper = Scraper(config)
    results = await scraper.run()
    print(f"Done. {len(results)} results saved.")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode
# requirements.txt
httpx>=0.27.0
beautifulsoup4>=4.12.0
passxapi>=1.0.0
Enter fullscreen mode Exit fullscreen mode

Docker Compose for Local Development

# docker-compose.yml
version: "3.8"

services:
  scraper:
    build: .
    environment:
      - PASSXAPI_KEY=${PASSXAPI_KEY}
      - MAX_CONCURRENCY=5
      - OUTPUT_PATH=/data/results.json
    volumes:
      - ./data:/data
    restart: "no"
Enter fullscreen mode Exit fullscreen mode
# Run locally
docker compose up --build

# Check results
cat data/results.json | python -m json.tool | head -20
Enter fullscreen mode Exit fullscreen mode

GitHub Actions: Testing

Run tests on every PR to make sure your CAPTCHA handling works:

# .github/workflows/test.yml
name: Tests

on:
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4      
      - uses: actions/setup-python@v5        with:
          python-version: "3.12"

      - name: Install dependencies
        run: pip install -r requirements.txt pytest pytest-asyncio

      - name: Run tests
        run: pytest tests/ -v
        env:
          PASSXAPI_KEY: ${{ secrets.PASSXAPI_KEY }}
Enter fullscreen mode Exit fullscreen mode
# tests/test_captcha.py
import pytest
from scraper.captcha import CaptchaSolver

@pytest.fixturedef solver():
    return CaptchaSolver(api_key="test-key")

def test_detect_recaptcha(solver):
    html = '<div class="g-recaptcha" data-sitekey="6Lc..."></div>'
    import asyncio
    result = asyncio.run(solver.detect(html, "https://example.com"))
    assert result is not None
    assert result["type"] == "recaptcha_v2"
    assert result["sitekey"] == "6Lc..."

def test_detect_hcaptcha(solver):
    html = '<div class="h-captcha" data-sitekey="abc..."></div>'
    import asyncio
    result = asyncio.run(solver.detect(html, "https://example.com"))
    assert result["type"] == "hcaptcha"

def test_detect_no_captcha(solver):
    html = "<html><body>Normal page</body></html>"
    import asyncio
    result = asyncio.run(solver.detect(html, "https://example.com"))
    assert result is None
Enter fullscreen mode Exit fullscreen mode

GitHub Actions: Scheduled Scraping

Run your scraper on a schedule without any server:

# .github/workflows/scrape.yml
name: Scheduled Scrape

on:
  schedule:
    - cron: "0 */6 * * *"  # Every 6 hours
  workflow_dispatch:        # Manual trigger

jobs:
  scrape:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4      
      - uses: actions/setup-python@v5        with:
          python-version: "3.12"

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Run scraper
        run: python main.py
        env:
          PASSXAPI_KEY: ${{ secrets.PASSXAPI_KEY }}
          MAX_CONCURRENCY: "10"
          OUTPUT_PATH: "results.json"

      - name: Upload results
        uses: actions/upload-artifact@v4        with:
          name: scrape-results-${{ github.run_id }}
          path: results.json
          retention-days: 30

      - name: Notify on failure
        if: failure()
        run: |
          curl -X POST "${{ secrets.SLACK_WEBHOOK }}" \
            -H 'Content-Type: application/json' \
            -d '{"text":"Scraper failed! Check: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"}'
Enter fullscreen mode Exit fullscreen mode

GitHub Actions: Build and Deploy

When you push to main, build the Docker image and deploy:

# .github/workflows/deploy.yml
name: Deploy

on:
  push:
    branches: [main]

jobs:
  build-and-push:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4      
      - name: Build Docker image
        run: docker build -t my-scraper:${{ github.sha }} .

      - name: Run smoke test
        run: |
          docker run --rm \
            -e PASSXAPI_KEY=${{ secrets.PASSXAPI_KEY }} \
            -e MAX_CONCURRENCY=2 \
            my-scraper:${{ github.sha }} \
            python -c "from scraper.config import Config; print('OK')"

      # Push to your registry of choice
      # ghcr.io, Docker Hub, AWS ECR, etc.
Enter fullscreen mode Exit fullscreen mode

Monitoring in Production

Add structured logging so you can track CAPTCHA metrics:

import json
import logging

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
        }
        if hasattr(record, "captcha_type"):
            log_data["captcha_type"] = record.captcha_type
        if hasattr(record, "solve_time"):
            log_data["solve_time"] = record.solve_time
        return json.dumps(log_data)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.root.addHandler(handler)
Enter fullscreen mode Exit fullscreen mode

These structured logs work with any log aggregator — Datadog, Grafana Loki, CloudWatch, etc.

Key Takeaways

  1. Separate concerns — CAPTCHA solving, scraping logic, and config in different modules
  2. Test CAPTCHA detection — unit tests for sitekey extraction don't need real API calls
  3. Docker for consistency — same environment locally and in production
  4. GitHub Actions for free compute — scheduled runs, CI/CD, artifact storage
  5. Structured logging — JSON logs that your monitoring stack can parse

Resources

Full CAPTCHA solving SDK: passxapi-python on GitHub


How do you deploy your scrapers? VPS, serverless, or something else? Let me know in the comments.

Top comments (0)