Why Real-Time Notifications Matter
When your video platform fetches new trending content every 2 hours across 8 regions, users shouldn't have to refresh to see what's new. At DailyWatch, we built a WebSocket notification layer that pushes "new trending video" alerts to every connected browser tab the moment our cron pipeline finishes.
In this article, I'll walk through the full implementation using Python's websockets library.
The Architecture
The flow is straightforward:
- Cron job fetches new videos and writes them to SQLite
- Cron job sends a POST to our WebSocket server with the new video IDs
- WebSocket server broadcasts a notification to all connected clients
- Browser clients display a toast notification
Setting Up the WebSocket Server
Install the dependency:
pip install websockets
Here's the core server with connection management and broadcast:
import asyncio
import json
import websockets
from datetime import datetime
from typing import Set
# Track all connected clients
CONNECTIONS: Set[websockets.WebSocketServerProtocol] = set()
async def register(websocket: websockets.WebSocketServerProtocol):
"""Add a new client connection."""
CONNECTIONS.add(websocket)
remote = websocket.remote_address
print(f"[{datetime.now():%H:%M:%S}] Client connected: {remote} (total: {len(CONNECTIONS)})")
try:
# Keep connection alive, listen for pings
async for message in websocket:
data = json.loads(message)
if data.get("type") == "ping":
await websocket.send(json.dumps({"type": "pong"}))
except websockets.ConnectionClosed:
pass
finally:
CONNECTIONS.discard(websocket)
print(f"[{datetime.now():%H:%M:%S}] Client disconnected: {remote} (total: {len(CONNECTIONS)})")
async def broadcast(message: dict):
"""Send a message to all connected clients."""
if not CONNECTIONS:
return
payload = json.dumps(message)
# Use gather to send concurrently, ignore individual failures
await asyncio.gather(
*(connection.send(payload) for connection in CONNECTIONS.copy()),
return_exceptions=True,
)
print(f"[{datetime.now():%H:%M:%S}] Broadcast to {len(CONNECTIONS)} clients: {message.get('type')}")
async def notify_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
"""HTTP endpoint that cron jobs POST to when new videos arrive."""
data = await reader.read(65536)
# Parse the HTTP body (skip headers)
body = data.split(b"\r\n\r\n", 1)[-1]
try:
payload = json.loads(body)
videos = payload.get("videos", [])
region = payload.get("region", "unknown")
notification = {
"type": "new_trending",
"region": region,
"count": len(videos),
"videos": videos[:5], # Send top 5 to clients
"timestamp": datetime.now().isoformat(),
}
await broadcast(notification)
response = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nOK"
except json.JSONDecodeError:
response = b"HTTP/1.1 400 Bad Request\r\nContent-Type: text/plain\r\n\r\nInvalid JSON"
writer.write(response)
await writer.drain()
writer.close()
async def main():
# WebSocket server for browser clients
ws_server = await websockets.serve(register, "0.0.0.0", 8765)
# HTTP server for cron job notifications
http_server = await asyncio.start_server(notify_handler, "0.0.0.0", 8766)
print("WebSocket server running on ws://0.0.0.0:8765")
print("Notification endpoint on http://0.0.0.0:8766")
await asyncio.gather(ws_server.wait_closed(), http_server.serve_forever())
if __name__ == "__main__":
asyncio.run(main())
Client-Side JavaScript
On the DailyWatch frontend, we connect with automatic reconnection:
class VideoNotifier {
constructor(wsUrl) {
this.wsUrl = wsUrl;
this.reconnectDelay = 1000;
this.maxReconnectDelay = 30000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = () => {
console.log('Notification channel connected');
this.reconnectDelay = 1000; // Reset on success
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'new_trending') {
this.showNotification(data);
}
};
this.ws.onclose = () => {
clearInterval(this.heartbeat);
console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
};
}
startHeartbeat() {
this.heartbeat = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
}
showNotification(data) {
const toast = document.createElement('div');
toast.className = 'vw-toast';
toast.innerHTML = `
<strong>${data.count} new trending videos</strong>
<span>from ${data.region}</span>
`;
document.body.appendChild(toast);
setTimeout(() => toast.remove(), 5000);
}
}
// Initialize
const notifier = new VideoNotifier('wss://dailywatch.video/ws');
Triggering from Cron
At the end of the fetch pipeline, one curl call pushes the notification:
import requests
def notify_new_videos(region: str, videos: list[dict]):
payload = {
"region": region,
"videos": [
{"id": v["video_id"], "title": v["title"], "thumb": v["thumbnail_url"]}
for v in videos[:5]
],
}
try:
requests.post("http://localhost:8766", json=payload, timeout=5)
except requests.RequestException as e:
print(f"Notification failed: {e}")
Connection Management at Scale
The CONNECTIONS set handles cleanup automatically via the finally block. For production with thousands of connections, consider adding:
- Connection limits: reject new connections above a threshold
- Per-IP rate limiting: prevent a single client from flooding
- Message queuing: buffer broadcasts if a client is slow
This system handles the real-time layer for DailyWatch, keeping users informed about fresh trending content across all 8 regions without polling.
This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.
Top comments (0)