DEV Community

ahmet gedik
ahmet gedik

Posted on

WebSocket Real-Time Notifications for Video Platforms with Python

Why Real-Time Notifications Matter for Video Platforms

When a K-pop music video suddenly goes viral in South Korea, or a Japanese gaming stream breaks into the trending charts, users of TopVideoHub should know about it within seconds — not the next time they refresh. WebSockets make that possible.

This article walks through building a WebSocket notification server that broadcasts "new trending video" events to connected clients as soon as the cron fetcher discovers them.

Architecture Overview

Cron Fetcher (PHP) ──→ Redis Pub/Sub ──→ Python WS Server ──→ Browser clients
Enter fullscreen mode Exit fullscreen mode

The PHP cron fetcher publishes events to Redis. A lightweight Python WebSocket server subscribes to Redis and fans those events out to all connected browsers. This decoupling means the fetcher never blocks waiting for browser connections.

Python WebSocket Server

Install dependencies:

pip install websockets redis asyncio
Enter fullscreen mode Exit fullscreen mode

The server:

import asyncio
import json
import logging
import redis.asyncio as aioredis
import websockets
from websockets.server import WebSocketServerProtocol

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

connected: set[WebSocketServerProtocol] = set()

async def broadcast(message: str) -> None:
    if not connected:
        return
    dead = set()
    results = await asyncio.gather(
        *(ws.send(message) for ws in connected),
        return_exceptions=True,
    )
    for ws, result in zip(list(connected), results):
        if isinstance(result, Exception):
            dead.add(ws)
    connected.difference_update(dead)

async def handler(websocket: WebSocketServerProtocol) -> None:
    connected.add(websocket)
    region = websocket.request_headers.get('X-Region', 'US')
    logger.info(f'Client connected (region={region}), total={len(connected)}')
    try:
        await websocket.wait_closed()
    finally:
        connected.discard(websocket)

async def redis_subscriber() -> None:
    client = aioredis.from_url('redis://localhost:6379', decode_responses=True)
    pubsub = client.pubsub()
    await pubsub.subscribe('topvideohub:trending')

    async for message in pubsub.listen():
        if message['type'] != 'message':
            continue
        try:
            payload = json.loads(message['data'])
            payload['title'] = payload.get('title', '')
            await broadcast(json.dumps(payload, ensure_ascii=False))
        except (json.JSONDecodeError, KeyError) as e:
            logger.warning(f'Bad message: {e}')

async def main() -> None:
    server = await websockets.serve(handler, '0.0.0.0', 8765)
    logger.info('WebSocket server listening on :8765')
    await asyncio.gather(server.wait_closed(), redis_subscriber())

if __name__ == '__main__':
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Publishing Events from PHP

After inserting a new trending video, the PHP fetcher publishes to Redis:

<?php
declare(strict_types=1);

class NotificationPublisher
{
    private \Redis $redis;

    public function __construct()
    {
        $this->redis = new \Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function publishTrending(array $video, string $region): void
    {
        $payload = json_encode([
            'event'     => 'new_trending',
            'region'    => $region,
            'video_id'  => $video['video_id'],
            'title'     => $video['title'],
            'thumbnail' => $video['thumbnail_url'],
            'channel'   => $video['channel_title'],
            'ts'        => time(),
        ], JSON_UNESCAPED_UNICODE);

        $this->redis->publish('topvideohub:trending', $payload);
    }
}

// In the fetch loop:
$publisher = new NotificationPublisher();
foreach ($newVideos as $video) {
    $db->insertVideo($video, $region);
    $publisher->publishTrending($video, $region);
}
Enter fullscreen mode Exit fullscreen mode

Browser Client

class TrendingNotifier {
  constructor(region = 'JP') {
    this.region = region;
    this.ws = null;
    this.reconnectDelay = 1000;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket('wss://ws.topvideohub.com/');
    this.ws.onopen = () => { this.reconnectDelay = 1000; };
    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      if (data.event === 'new_trending') this.showToast(data);
    };
    this.ws.onclose = () => {
      setTimeout(() => this.connect(), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
    };
  }

  showToast(video) {
    const toast = document.createElement('div');
    toast.className = 'trending-toast';
    toast.innerHTML = `<img src="${video.thumbnail}" alt=""><div><strong>New in ${video.region}</strong><span>${video.title}</span></div>`;
    document.body.appendChild(toast);
    setTimeout(() => toast.remove(), 5000);
  }
}

const notifier = new TrendingNotifier(userRegion);
Enter fullscreen mode Exit fullscreen mode

CJK Considerations

Asian video titles contain multi-byte characters. Two pitfalls to avoid:

  1. PHP: always use JSON_UNESCAPED_UNICODE so "title": "新しい動画" stays readable.
  2. Python: pass ensure_ascii=False to json.dumps() for the same reason.
  3. WebSocket frame size: CJK strings are longer in bytes (3 bytes/char UTF-8). Keep notification payloads small.

Connection Lifecycle and Scaling

For TopVideoHub, which serves 9 regions, a single Python process handles hundreds of concurrent WebSocket connections comfortably. If you need to scale horizontally, replace the in-process connected set with a Redis-backed pub/sub fan-out.


This article is part of the Building TopVideoHub series. Check out TopVideoHub to see these techniques in action.

Top comments (0)