Traditional web scraping fetches static snapshots of data. But what about live stock prices, sports scores, chat messages, or cryptocurrency trades? These require real-time data collection using WebSockets, Server-Sent Events (SSE), or API polling.
In this guide, I'll show you how to capture streaming data with Python.
Three Approaches to Real-Time Data
| Method | Direction | Use Case | Complexity |
|---|---|---|---|
| WebSockets | Bidirectional | Live trades, chat, gaming | Medium |
| SSE | Server to client | News feeds, notifications | Low |
| API Polling | Client to server | Any REST API | Low |
WebSocket Scraping
WebSockets maintain a persistent connection for real-time bidirectional data. Many trading platforms and live data feeds use them:
import asyncio
import websockets
import json
from datetime import datetime
async def scrape_websocket(url, duration_seconds=60):
"""Connect to a WebSocket and collect messages."""
messages = []
async with websockets.connect(url) as ws:
# Some WebSockets require a subscription message
subscribe_msg = json.dumps({
"type": "subscribe",
"channels": ["ticker"],
"product_ids": ["BTC-USD"]
})
await ws.send(subscribe_msg)
start_time = datetime.now()
while (datetime.now() - start_time).seconds < duration_seconds:
try:
message = await asyncio.wait_for(ws.recv(), timeout=10)
data = json.loads(message)
data["received_at"] = datetime.now().isoformat()
messages.append(data)
print(f"Received: {data.get('type', 'unknown')} - {data.get('price', 'N/A')}")
except asyncio.TimeoutError:
continue
return messages
# Usage
data = asyncio.run(scrape_websocket("wss://ws-feed.exchange.example.com", duration_seconds=30))
print(f"Collected {len(data)} messages")
Finding WebSocket Endpoints
WebSocket URLs aren't always obvious. Here's how to discover them:
from playwright.sync_api import sync_playwright
def discover_websockets(page_url):
"""Open a page and log all WebSocket connections."""
ws_urls = []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# Listen for WebSocket connections
page.on("websocket", lambda ws: ws_urls.append(ws.url))
page.goto(page_url, wait_until="networkidle")
# Wait for WebSockets to connect
page.wait_for_timeout(5000)
browser.close()
return ws_urls
urls = discover_websockets("https://example-trading-platform.com")
for url in urls:
print(f"WebSocket found: {url}")
Server-Sent Events (SSE)
SSE is simpler than WebSockets — it's one-directional streaming over HTTP:
import requests
import json
def scrape_sse(url, max_events=100):
"""Consume a Server-Sent Events stream."""
events = []
response = requests.get(url, stream=True, headers={
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
})
event_data = ""
event_type = "message"
for line in response.iter_lines(decode_unicode=True):
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
event_data += line[5:].strip()
elif line == "" and event_data:
# Empty line = end of event
try:
parsed = json.loads(event_data)
except json.JSONDecodeError:
parsed = event_data
events.append({
"type": event_type,
"data": parsed,
"received_at": datetime.now().isoformat()
})
print(f"Event #{len(events)}: {event_type}")
event_data = ""
event_type = "message"
if len(events) >= max_events:
break
return events
API Polling with Smart Intervals
When no streaming option exists, poll the API with adaptive intervals:
import requests
import time
import hashlib
from datetime import datetime
class SmartPoller:
def __init__(self, url, min_interval=1, max_interval=60):
self.url = url
self.min_interval = min_interval
self.max_interval = max_interval
self.current_interval = min_interval
self.last_hash = None
def poll(self, duration_seconds=300):
"""Poll with adaptive intervals — faster when data changes."""
results = []
start = time.time()
while time.time() - start < duration_seconds:
try:
response = requests.get(self.url, timeout=10)
data = response.json()
data_hash = hashlib.md5(response.text.encode()).hexdigest()
if data_hash != self.last_hash:
# Data changed — collect it and speed up polling
results.append({
"data": data,
"timestamp": datetime.now().isoformat(),
"changed": True
})
self.current_interval = self.min_interval
self.last_hash = data_hash
print(f"Change detected! Interval: {self.current_interval}s")
else:
# No change — slow down polling
self.current_interval = min(
self.current_interval * 1.5,
self.max_interval
)
time.sleep(self.current_interval)
except requests.RequestException as e:
print(f"Error: {e}")
time.sleep(self.max_interval)
return results
# Usage
poller = SmartPoller(
url="https://api.example.com/v1/prices",
min_interval=1,
max_interval=30
)
changes = poller.poll(duration_seconds=120)
print(f"Detected {len(changes)} price changes")
Storing Streaming Data
For high-frequency data, use append-only storage:
import json
from pathlib import Path
from datetime import datetime
class StreamStorage:
def __init__(self, base_dir="stream_data"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True)
self.current_file = None
self.current_date = None
def _get_file(self):
today = datetime.now().strftime("%Y-%m-%d")
if today != self.current_date:
if self.current_file:
self.current_file.close()
filepath = self.base_dir / f"stream_{today}.jsonl"
self.current_file = open(filepath, "a")
self.current_date = today
return self.current_file
def append(self, record):
f = self._get_file()
f.write(json.dumps(record) + "\n")
f.flush()
def read_day(self, date_str):
filepath = self.base_dir / f"stream_{date_str}.jsonl"
if filepath.exists():
with open(filepath) as f:
return [json.loads(line) for line in f]
return []
storage = StreamStorage()
Combining Multiple Streams
import asyncio
import websockets
async def multi_stream_collector(sources):
"""Collect from multiple WebSocket sources simultaneously."""
storage = StreamStorage("multi_stream")
async def collect_from(name, url, subscribe_msg=None):
async with websockets.connect(url) as ws:
if subscribe_msg:
await ws.send(json.dumps(subscribe_msg))
while True:
message = await ws.recv()
data = json.loads(message)
storage.append({
"source": name,
"data": data,
"timestamp": datetime.now().isoformat()
})
tasks = [
collect_from(name, url, msg)
for name, url, msg in sources
]
await asyncio.gather(*tasks)
Handling Connection Issues
Real-time connections drop. Always implement reconnection:
async def resilient_websocket(url, on_message, max_retries=10):
retries = 0
while retries < max_retries:
try:
async with websockets.connect(url) as ws:
retries = 0 # Reset on successful connection
async for message in ws:
await on_message(json.loads(message))
except (websockets.ConnectionClosed, ConnectionError) as e:
retries += 1
wait = min(2 ** retries, 60) # Exponential backoff
print(f"Connection lost. Retry {retries}/{max_retries} in {wait}s")
await asyncio.sleep(wait)
print("Max retries reached. Stopping.")
Scaling with a Scraping API
For production real-time scraping where you need to poll multiple endpoints reliably, ScraperAPI handles proxy rotation and rate limiting so your polling doesn't get blocked.
Conclusion
Real-time data scraping opens up possibilities that static scraping can't match — live market data, instant notifications, and continuous monitoring. Choose WebSockets for bidirectional streams, SSE for server-push data, and smart polling when no streaming option exists. Always implement reconnection logic and use append-only storage for high-frequency data.
Happy scraping!
Top comments (0)