DEV Community

ahmet gedik
ahmet gedik

Posted on

WebSocket Real-Time Notifications for Video Platforms with Python

The Case for Real-Time Viral Alerts

When a video is going viral across European regions, every minute counts. At ViralVidVault, our cron pipeline detects virality spikes across Poland, Netherlands, Sweden, Norway, Austria, and the UK -- but without real-time push, users only see updates on their next page refresh. WebSockets fix that.

Architecture Overview

The data flows through three stages:

  1. Cron detects a virality spike and sends an HTTP POST to the WebSocket server
  2. WebSocket server broadcasts a viral alert to all connected browsers
  3. Clients render a real-time notification with video details

The WebSocket Server

import asyncio
import json
import websockets
from datetime import datetime
from typing import Set, Dict

CONNECTIONS: Set[websockets.WebSocketServerProtocol] = set()
REGION_SUBS: Dict[str, Set[websockets.WebSocketServerProtocol]] = {}

async def handler(websocket: websockets.WebSocketServerProtocol):
    """Handle client connections with optional region subscription."""
    CONNECTIONS.add(websocket)
    subscribed_regions = set()

    try:
        async for message in websocket:
            data = json.loads(message)

            if data.get("type") == "subscribe_region":
                region = data["region"].upper()
                subscribed_regions.add(region)
                REGION_SUBS.setdefault(region, set()).add(websocket)
                await websocket.send(json.dumps({
                    "type": "subscribed",
                    "region": region,
                }))

            elif data.get("type") == "ping":
                await websocket.send(json.dumps({"type": "pong"}))

    except websockets.ConnectionClosed:
        pass
    finally:
        CONNECTIONS.discard(websocket)
        for region in subscribed_regions:
            REGION_SUBS.get(region, set()).discard(websocket)

async def broadcast_viral(video: dict, region: str):
    """Send viral alert to clients subscribed to the region or all."""
    message = json.dumps({
        "type": "viral_alert",
        "region": region,
        "video": video,
        "timestamp": datetime.now().isoformat(),
    })

    # Send to region subscribers
    targets = REGION_SUBS.get(region, set()) | REGION_SUBS.get("ALL", set())

    # Fallback: if no subscribers, broadcast to everyone
    if not targets:
        targets = CONNECTIONS.copy()

    await asyncio.gather(
        *(ws.send(message) for ws in targets),
        return_exceptions=True,
    )

async def http_webhook(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    """Accept webhook POST from cron pipeline."""
    data = await reader.read(65536)
    body = data.split(b"\r\n\r\n", 1)[-1]

    try:
        payload = json.loads(body)
        await broadcast_viral(
            video=payload["video"],
            region=payload.get("region", "EU"),
        )
        writer.write(b"HTTP/1.1 200 OK\r\n\r\nOK")
    except (json.JSONDecodeError, KeyError):
        writer.write(b"HTTP/1.1 400 Bad Request\r\n\r\nBad JSON")

    await writer.drain()
    writer.close()

async def main():
    ws = await websockets.serve(handler, "0.0.0.0", 8765)
    http = await asyncio.start_server(http_webhook, "0.0.0.0", 8766)
    print(f"WS on :8765, webhook on :8766")
    await asyncio.gather(ws.wait_closed(), http.serve_forever())

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

The key enhancement here over a basic broadcast is region-based subscriptions. Users browsing Polish content only get alerts about Polish viral videos.

Client-Side Implementation

class ViralAlertClient {
  constructor(wsUrl, regions = []) {
    this.wsUrl = wsUrl;
    this.regions = regions;
    this.reconnectMs = 1000;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.wsUrl);

    this.ws.onopen = () => {
      this.reconnectMs = 1000;
      // Subscribe to specific regions
      this.regions.forEach(region => {
        this.ws.send(JSON.stringify({ type: 'subscribe_region', region }));
      });
      this.heartbeat = setInterval(() => {
        this.ws.send(JSON.stringify({ type: 'ping' }));
      }, 25000);
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      if (data.type === 'viral_alert') {
        this.showViralToast(data);
      }
    };

    this.ws.onclose = () => {
      clearInterval(this.heartbeat);
      setTimeout(() => this.connect(), this.reconnectMs);
      this.reconnectMs = Math.min(this.reconnectMs * 2, 30000);
    };
  }

  showViralToast(data) {
    const { video, region } = data;
    const toast = document.createElement('div');
    toast.className = 'viral-toast';
    toast.innerHTML = `
      <img src="${video.thumbnail}" alt="" width="80">
      <div>
        <strong>Viral in ${region}!</strong>
        <p>${video.title}</p>
      </div>
    `;
    document.body.appendChild(toast);
    setTimeout(() => toast.classList.add('show'), 10);
    setTimeout(() => { toast.classList.remove('show'); setTimeout(() => toast.remove(), 300); }, 6000);
  }
}

// Subscribe to European regions
const alerts = new ViralAlertClient('wss://viralvidvault.com/ws', ['PL', 'NL', 'SE', 'NO', 'AT', 'GB']);
Enter fullscreen mode Exit fullscreen mode

Triggering from the Virality Scorer

Integrate with the existing ViralVidVault scoring pipeline:

import requests

def on_viral_detected(video: dict, region: str, score: float):
    if score < 85.0:
        return  # Only alert on truly viral content

    payload = {
        "region": region,
        "video": {
            "id": video["video_id"],
            "title": video["title"],
            "thumbnail": video["thumbnail_url"],
            "score": score,
        },
    }
    try:
        requests.post("http://localhost:8766", json=payload, timeout=3)
    except requests.RequestException:
        pass  # Non-critical, don't fail the pipeline
Enter fullscreen mode Exit fullscreen mode

Region-aware WebSocket notifications bring the real-time experience that makes ViralVidVault feel alive. Users browsing Swedish content get Swedish viral alerts instantly, without polling.


This article is part of the Building ViralVidVault series. Check out ViralVidVault to see these techniques in action.

Top comments (0)