DEV Community

agenthustler
agenthustler

Posted on

Real-Time Data Scraping: WebSockets, SSE, and API Polling

Traditional web scraping fetches static snapshots of data. But what about live stock prices, sports scores, chat messages, or cryptocurrency trades? These require real-time data collection using WebSockets, Server-Sent Events (SSE), or API polling.

In this guide, I'll show you how to capture streaming data with Python.

Three Approaches to Real-Time Data

Method Direction Use Case Complexity
WebSockets Bidirectional Live trades, chat, gaming Medium
SSE Server to client News feeds, notifications Low
API Polling Client to server Any REST API Low

WebSocket Scraping

WebSockets maintain a persistent connection for real-time bidirectional data. Many trading platforms and live data feeds use them:

import asyncio
import websockets
import json
from datetime import datetime

async def scrape_websocket(url, duration_seconds=60):
    """Connect to a WebSocket and collect messages."""
    messages = []

    async with websockets.connect(url) as ws:
        # Some WebSockets require a subscription message
        subscribe_msg = json.dumps({
            "type": "subscribe",
            "channels": ["ticker"],
            "product_ids": ["BTC-USD"]
        })
        await ws.send(subscribe_msg)

        start_time = datetime.now()
        while (datetime.now() - start_time).seconds < duration_seconds:
            try:
                message = await asyncio.wait_for(ws.recv(), timeout=10)
                data = json.loads(message)
                data["received_at"] = datetime.now().isoformat()
                messages.append(data)
                print(f"Received: {data.get('type', 'unknown')} - {data.get('price', 'N/A')}")
            except asyncio.TimeoutError:
                continue

    return messages

# Usage
data = asyncio.run(scrape_websocket("wss://ws-feed.exchange.example.com", duration_seconds=30))
print(f"Collected {len(data)} messages")
Enter fullscreen mode Exit fullscreen mode

Finding WebSocket Endpoints

WebSocket URLs aren't always obvious. Here's how to discover them:

from playwright.sync_api import sync_playwright

def discover_websockets(page_url):
    """Open a page and log all WebSocket connections."""
    ws_urls = []

    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()

        # Listen for WebSocket connections
        page.on("websocket", lambda ws: ws_urls.append(ws.url))

        page.goto(page_url, wait_until="networkidle")
        # Wait for WebSockets to connect
        page.wait_for_timeout(5000)

        browser.close()

    return ws_urls

urls = discover_websockets("https://example-trading-platform.com")
for url in urls:
    print(f"WebSocket found: {url}")
Enter fullscreen mode Exit fullscreen mode

Server-Sent Events (SSE)

SSE is simpler than WebSockets — it's one-directional streaming over HTTP:

import requests
import json

def scrape_sse(url, max_events=100):
    """Consume a Server-Sent Events stream."""
    events = []

    response = requests.get(url, stream=True, headers={
        "Accept": "text/event-stream",
        "Cache-Control": "no-cache",
    })

    event_data = ""
    event_type = "message"

    for line in response.iter_lines(decode_unicode=True):
        if line.startswith("event:"):
            event_type = line[6:].strip()
        elif line.startswith("data:"):
            event_data += line[5:].strip()
        elif line == "" and event_data:
            # Empty line = end of event
            try:
                parsed = json.loads(event_data)
            except json.JSONDecodeError:
                parsed = event_data

            events.append({
                "type": event_type,
                "data": parsed,
                "received_at": datetime.now().isoformat()
            })
            print(f"Event #{len(events)}: {event_type}")

            event_data = ""
            event_type = "message"

            if len(events) >= max_events:
                break

    return events
Enter fullscreen mode Exit fullscreen mode

API Polling with Smart Intervals

When no streaming option exists, poll the API with adaptive intervals:

import requests
import time
import hashlib
from datetime import datetime

class SmartPoller:
    def __init__(self, url, min_interval=1, max_interval=60):
        self.url = url
        self.min_interval = min_interval
        self.max_interval = max_interval
        self.current_interval = min_interval
        self.last_hash = None

    def poll(self, duration_seconds=300):
        """Poll with adaptive intervals — faster when data changes."""
        results = []
        start = time.time()

        while time.time() - start < duration_seconds:
            try:
                response = requests.get(self.url, timeout=10)
                data = response.json()
                data_hash = hashlib.md5(response.text.encode()).hexdigest()

                if data_hash != self.last_hash:
                    # Data changed — collect it and speed up polling
                    results.append({
                        "data": data,
                        "timestamp": datetime.now().isoformat(),
                        "changed": True
                    })
                    self.current_interval = self.min_interval
                    self.last_hash = data_hash
                    print(f"Change detected! Interval: {self.current_interval}s")
                else:
                    # No change — slow down polling
                    self.current_interval = min(
                        self.current_interval * 1.5,
                        self.max_interval
                    )

                time.sleep(self.current_interval)

            except requests.RequestException as e:
                print(f"Error: {e}")
                time.sleep(self.max_interval)

        return results

# Usage
poller = SmartPoller(
    url="https://api.example.com/v1/prices",
    min_interval=1,
    max_interval=30
)
changes = poller.poll(duration_seconds=120)
print(f"Detected {len(changes)} price changes")
Enter fullscreen mode Exit fullscreen mode

Storing Streaming Data

For high-frequency data, use append-only storage:

import json
from pathlib import Path
from datetime import datetime

class StreamStorage:
    def __init__(self, base_dir="stream_data"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(exist_ok=True)
        self.current_file = None
        self.current_date = None

    def _get_file(self):
        today = datetime.now().strftime("%Y-%m-%d")
        if today != self.current_date:
            if self.current_file:
                self.current_file.close()
            filepath = self.base_dir / f"stream_{today}.jsonl"
            self.current_file = open(filepath, "a")
            self.current_date = today
        return self.current_file

    def append(self, record):
        f = self._get_file()
        f.write(json.dumps(record) + "\n")
        f.flush()

    def read_day(self, date_str):
        filepath = self.base_dir / f"stream_{date_str}.jsonl"
        if filepath.exists():
            with open(filepath) as f:
                return [json.loads(line) for line in f]
        return []

storage = StreamStorage()
Enter fullscreen mode Exit fullscreen mode

Combining Multiple Streams

import asyncio
import websockets

async def multi_stream_collector(sources):
    """Collect from multiple WebSocket sources simultaneously."""
    storage = StreamStorage("multi_stream")

    async def collect_from(name, url, subscribe_msg=None):
        async with websockets.connect(url) as ws:
            if subscribe_msg:
                await ws.send(json.dumps(subscribe_msg))

            while True:
                message = await ws.recv()
                data = json.loads(message)
                storage.append({
                    "source": name,
                    "data": data,
                    "timestamp": datetime.now().isoformat()
                })

    tasks = [
        collect_from(name, url, msg)
        for name, url, msg in sources
    ]
    await asyncio.gather(*tasks)
Enter fullscreen mode Exit fullscreen mode

Handling Connection Issues

Real-time connections drop. Always implement reconnection:

async def resilient_websocket(url, on_message, max_retries=10):
    retries = 0
    while retries < max_retries:
        try:
            async with websockets.connect(url) as ws:
                retries = 0  # Reset on successful connection
                async for message in ws:
                    await on_message(json.loads(message))
        except (websockets.ConnectionClosed, ConnectionError) as e:
            retries += 1
            wait = min(2 ** retries, 60)  # Exponential backoff
            print(f"Connection lost. Retry {retries}/{max_retries} in {wait}s")
            await asyncio.sleep(wait)
    print("Max retries reached. Stopping.")
Enter fullscreen mode Exit fullscreen mode

Scaling with a Scraping API

For production real-time scraping where you need to poll multiple endpoints reliably, ScraperAPI handles proxy rotation and rate limiting so your polling doesn't get blocked.

Conclusion

Real-time data scraping opens up possibilities that static scraping can't match — live market data, instant notifications, and continuous monitoring. Choose WebSockets for bidirectional streams, SSE for server-push data, and smart polling when no streaming option exists. Always implement reconnection logic and use append-only storage for high-frequency data.

Happy scraping!

Top comments (0)