DEV Community

ahmet gedik
ahmet gedik

Posted on

WebSocket Real-Time Notifications for Video Platforms with Python

Why Real-Time Notifications Matter

When your video platform fetches new trending content every 2 hours across 8 regions, users shouldn't have to refresh to see what's new. At DailyWatch, we built a WebSocket notification layer that pushes "new trending video" alerts to every connected browser tab the moment our cron pipeline finishes.

In this article, I'll walk through the full implementation using Python's websockets library.

The Architecture

The flow is straightforward:

  1. Cron job fetches new videos and writes them to SQLite
  2. Cron job sends a POST to our WebSocket server with the new video IDs
  3. WebSocket server broadcasts a notification to all connected clients
  4. Browser clients display a toast notification

Setting Up the WebSocket Server

Install the dependency:

pip install websockets
Enter fullscreen mode Exit fullscreen mode

Here's the core server with connection management and broadcast:

import asyncio
import json
import websockets
from datetime import datetime
from typing import Set

# Track all connected clients
CONNECTIONS: Set[websockets.WebSocketServerProtocol] = set()

async def register(websocket: websockets.WebSocketServerProtocol):
    """Add a new client connection."""
    CONNECTIONS.add(websocket)
    remote = websocket.remote_address
    print(f"[{datetime.now():%H:%M:%S}] Client connected: {remote} (total: {len(CONNECTIONS)})")
    try:
        # Keep connection alive, listen for pings
        async for message in websocket:
            data = json.loads(message)
            if data.get("type") == "ping":
                await websocket.send(json.dumps({"type": "pong"}))
    except websockets.ConnectionClosed:
        pass
    finally:
        CONNECTIONS.discard(websocket)
        print(f"[{datetime.now():%H:%M:%S}] Client disconnected: {remote} (total: {len(CONNECTIONS)})")

async def broadcast(message: dict):
    """Send a message to all connected clients."""
    if not CONNECTIONS:
        return
    payload = json.dumps(message)
    # Use gather to send concurrently, ignore individual failures
    await asyncio.gather(
        *(connection.send(payload) for connection in CONNECTIONS.copy()),
        return_exceptions=True,
    )
    print(f"[{datetime.now():%H:%M:%S}] Broadcast to {len(CONNECTIONS)} clients: {message.get('type')}")

async def notify_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    """HTTP endpoint that cron jobs POST to when new videos arrive."""
    data = await reader.read(65536)
    # Parse the HTTP body (skip headers)
    body = data.split(b"\r\n\r\n", 1)[-1]
    try:
        payload = json.loads(body)
        videos = payload.get("videos", [])
        region = payload.get("region", "unknown")

        notification = {
            "type": "new_trending",
            "region": region,
            "count": len(videos),
            "videos": videos[:5],  # Send top 5 to clients
            "timestamp": datetime.now().isoformat(),
        }
        await broadcast(notification)

        response = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nOK"
    except json.JSONDecodeError:
        response = b"HTTP/1.1 400 Bad Request\r\nContent-Type: text/plain\r\n\r\nInvalid JSON"

    writer.write(response)
    await writer.drain()
    writer.close()

async def main():
    # WebSocket server for browser clients
    ws_server = await websockets.serve(register, "0.0.0.0", 8765)
    # HTTP server for cron job notifications
    http_server = await asyncio.start_server(notify_handler, "0.0.0.0", 8766)

    print("WebSocket server running on ws://0.0.0.0:8765")
    print("Notification endpoint on http://0.0.0.0:8766")

    await asyncio.gather(ws_server.wait_closed(), http_server.serve_forever())

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Client-Side JavaScript

On the DailyWatch frontend, we connect with automatic reconnection:

class VideoNotifier {
  constructor(wsUrl) {
    this.wsUrl = wsUrl;
    this.reconnectDelay = 1000;
    this.maxReconnectDelay = 30000;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.wsUrl);

    this.ws.onopen = () => {
      console.log('Notification channel connected');
      this.reconnectDelay = 1000; // Reset on success
      this.startHeartbeat();
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      if (data.type === 'new_trending') {
        this.showNotification(data);
      }
    };

    this.ws.onclose = () => {
      clearInterval(this.heartbeat);
      console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
      setTimeout(() => this.connect(), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
    };
  }

  startHeartbeat() {
    this.heartbeat = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: 'ping' }));
      }
    }, 30000);
  }

  showNotification(data) {
    const toast = document.createElement('div');
    toast.className = 'vw-toast';
    toast.innerHTML = `
      <strong>${data.count} new trending videos</strong>
      <span>from ${data.region}</span>
    `;
    document.body.appendChild(toast);
    setTimeout(() => toast.remove(), 5000);
  }
}

// Initialize
const notifier = new VideoNotifier('wss://dailywatch.video/ws');
Enter fullscreen mode Exit fullscreen mode

Triggering from Cron

At the end of the fetch pipeline, one curl call pushes the notification:

import requests

def notify_new_videos(region: str, videos: list[dict]):
    payload = {
        "region": region,
        "videos": [
            {"id": v["video_id"], "title": v["title"], "thumb": v["thumbnail_url"]}
            for v in videos[:5]
        ],
    }
    try:
        requests.post("http://localhost:8766", json=payload, timeout=5)
    except requests.RequestException as e:
        print(f"Notification failed: {e}")
Enter fullscreen mode Exit fullscreen mode

Connection Management at Scale

The CONNECTIONS set handles cleanup automatically via the finally block. For production with thousands of connections, consider adding:

  • Connection limits: reject new connections above a threshold
  • Per-IP rate limiting: prevent a single client from flooding
  • Message queuing: buffer broadcasts if a client is slow

This system handles the real-time layer for DailyWatch, keeping users informed about fresh trending content across all 8 regions without polling.


This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.

Top comments (0)