Video processing jobs take 30-90 seconds. Leaving users staring at a spinner for that duration without feedback is a quick way to drive churn. Real-time progress updates via WebSocket transform "is this thing working?" anxiety into engaged "I can see it happening" confidence.
This post covers the complete WebSocket implementation for video job progress — server setup with ws, integration with BullMQ queue events, client-side hook in React, and graceful reconnection. This is the progress system running on ClipSpeedAI.
Architecture Overview
Client (React) ←WebSocket→ Node.js WS Server ←Bull Events→ Worker Process
The worker updates job progress via BullMQ's progress API. The API server listens for Bull queue events and relays them to the appropriate WebSocket connection (matched by job ID).
Server Setup
// ws/server.js
import { WebSocketServer } from 'ws';
import { QueueEvents } from 'bullmq';
import { verifyJobOwnership } from '../lib/auth.js';
const redisConnection = {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD
};
// Map of jobId → Set of WebSocket connections
const jobConnections = new Map();
export function setupWebSocketServer(httpServer) {
const wss = new WebSocketServer({
server: httpServer,
path: '/ws'
});
// Listen for Bull queue events
const queueEvents = new QueueEvents('video:encode', { connection: redisConnection });
queueEvents.on('progress', ({ jobId, data }) => {
const connections = jobConnections.get(jobId);
if (!connections) return;
const message = JSON.stringify({
type: 'progress',
jobId,
stage: data.stage,
pct: data.pct,
message: data.message
});
for (const ws of connections) {
if (ws.readyState === ws.OPEN) {
ws.send(message);
}
}
});
queueEvents.on('completed', ({ jobId, returnvalue }) => {
const connections = jobConnections.get(jobId);
if (!connections) return;
const message = JSON.stringify({
type: 'completed',
jobId,
result: JSON.parse(returnvalue || '{}')
});
for (const ws of connections) {
if (ws.readyState === ws.OPEN) {
ws.send(message);
ws.close(1000, 'Job complete');
}
}
jobConnections.delete(jobId);
});
queueEvents.on('failed', ({ jobId, failedReason }) => {
const connections = jobConnections.get(jobId);
if (!connections) return;
const message = JSON.stringify({
type: 'failed',
jobId,
error: failedReason
});
for (const ws of connections) {
if (ws.readyState === ws.OPEN) ws.send(message);
}
jobConnections.delete(jobId);
});
// Handle new WebSocket connections
wss.on('connection', async (ws, req) => {
// Parse jobId from URL: /ws?jobId=xxx&token=yyy
const params = new URL(req.url, 'http://localhost').searchParams;
const jobId = params.get('jobId');
const token = params.get('token');
if (!jobId || !token) {
ws.close(4001, 'Missing jobId or token');
return;
}
// Verify the user owns this job
const authorized = await verifyJobOwnership(token, jobId);
if (!authorized) {
ws.close(4003, 'Unauthorized');
return;
}
// Register connection
if (!jobConnections.has(jobId)) {
jobConnections.set(jobId, new Set());
}
jobConnections.get(jobId).add(ws);
// Send current job state immediately
const currentState = await getCurrentJobState(jobId);
if (currentState) {
ws.send(JSON.stringify({ type: 'state', ...currentState }));
}
ws.on('close', () => {
const connections = jobConnections.get(jobId);
if (connections) {
connections.delete(ws);
if (connections.size === 0) jobConnections.delete(jobId);
}
});
// Heartbeat
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
});
// Heartbeat interval to clean up dead connections
const heartbeat = setInterval(() => {
wss.clients.forEach(ws => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false;
ws.ping();
});
}, 30_000);
wss.on('close', () => clearInterval(heartbeat));
return wss;
}
Worker: Emitting Progress
// workers/processors/encode.js
export async function encodeProcessor(job) {
const { videoPath, clipSegments } = job.data;
await job.updateProgress({ stage: 'starting', pct: 5, message: 'Preparing video...' });
// Face detection
let cropX;
try {
await job.updateProgress({ stage: 'analyzing', pct: 15, message: 'Detecting faces...' });
cropX = await detectAndComputeCrop(videoPath);
} catch {
cropX = null; // fallback
}
await job.updateProgress({ stage: 'encoding', pct: 30, message: 'Encoding clips...' });
const outputPaths = [];
for (let i = 0; i < clipSegments.length; i++) {
await encodeClip(videoPath, clipSegments[i], cropX);
const pct = 30 + Math.floor(((i + 1) / clipSegments.length) * 50);
await job.updateProgress({
stage: 'encoding',
pct,
message: `Encoded clip ${i + 1} of ${clipSegments.length}`
});
outputPaths.push(/* output path */);
}
await job.updateProgress({ stage: 'uploading', pct: 85, message: 'Uploading clips...' });
const urls = await uploadClips(outputPaths);
await job.updateProgress({ stage: 'complete', pct: 100, message: 'Done!' });
return { clipUrls: urls };
}
React Client Hook
// hooks/useJobProgress.js
import { useEffect, useRef, useState, useCallback } from 'react';
const RECONNECT_DELAY = 2000;
const MAX_RECONNECTS = 5;
export function useJobProgress(jobId, authToken) {
const [state, setState] = useState({ status: 'connecting', pct: 0, stage: '', message: '' });
const wsRef = useRef(null);
const reconnectCount = useRef(0);
const reconnectTimer = useRef(null);
const connect = useCallback(() => {
if (!jobId || !authToken) return;
const url = `${process.env.NEXT_PUBLIC_WS_URL}/ws?jobId=${jobId}&token=${authToken}`;
const ws = new WebSocket(url);
wsRef.current = ws;
ws.onopen = () => {
reconnectCount.current = 0;
setState(prev => ({ ...prev, status: 'connected' }));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'progress':
setState({ status: 'processing', pct: data.pct, stage: data.stage, message: data.message });
break;
case 'completed':
setState({ status: 'completed', pct: 100, stage: 'done', clips: data.result.clipUrls });
break;
case 'failed':
setState({ status: 'failed', error: data.error, pct: 0, stage: '' });
break;
case 'state':
setState({ status: data.status, pct: data.pct || 0, stage: data.stage || '' });
break;
}
};
ws.onclose = (event) => {
if (event.code === 1000) return; // Normal close
if (reconnectCount.current < MAX_RECONNECTS) {
reconnectCount.current++;
reconnectTimer.current = setTimeout(connect, RECONNECT_DELAY);
} else {
setState(prev => ({ ...prev, status: 'disconnected' }));
}
};
ws.onerror = () => {
ws.close();
};
}, [jobId, authToken]);
useEffect(() => {
connect();
return () => {
clearTimeout(reconnectTimer.current);
wsRef.current?.close(1000, 'Component unmounted');
};
}, [connect]);
return state;
}
React Component
// components/JobProgress.jsx
import { useJobProgress } from '../hooks/useJobProgress';
export function JobProgress({ jobId, token }) {
const { status, pct, stage, message, clips, error } = useJobProgress(jobId, token);
if (status === 'completed') {
return (
<div>
<p>Processing complete! {clips?.length} clips ready.</p>
{clips?.map((url, i) => (
<a key={i} href={url} download>Download clip {i + 1}</a>
))}
</div>
);
}
if (status === 'failed') {
return <p>Processing failed: {error}</p>;
}
return (
<div>
<div style={{ width: `${pct}%`, height: 4, background: '#4ade80', transition: 'width 0.3s' }} />
<p>{message || stage} — {pct}%</p>
</div>
);
}
Connection Management at Scale
One pitfall: if thousands of users have open WebSocket connections and your server restarts, they all reconnect simultaneously. Rate-limit reconnections at the load balancer level, or use jittered reconnection delays on the client:
const delay = RECONNECT_DELAY * (1 + Math.random() * 0.5); // ±25% jitter
reconnectTimer.current = setTimeout(connect, delay);
ClipSpeedAI uses this exact pattern. The progress bar updating in real-time during processing is one of the features users mention most in feedback — it converts a passive wait into an engaged experience.
The full implementation above handles connection cleanup, dead connection detection via heartbeats, job ownership verification, and graceful reconnection. It's the production-ready version of what most tutorials cover only superficially. The complete video processing pipeline it connects to — including AI clip selection and face tracking — is available as a hosted service at ClipSpeedAI.
Top comments (0)