DEV Community

Kyle White
Kyle White

Posted on

WebSockets for Real-Time Video Processing Progress Updates

Video processing jobs take 30-90 seconds. Leaving users staring at a spinner for that duration without feedback is a quick way to drive churn. Real-time progress updates via WebSocket transform "is this thing working?" anxiety into engaged "I can see it happening" confidence.

This post covers the complete WebSocket implementation for video job progress — server setup with ws, integration with BullMQ queue events, client-side hook in React, and graceful reconnection. This is the progress system running on ClipSpeedAI.

Architecture Overview

Client (React) ←WebSocket→ Node.js WS Server ←Bull Events→ Worker Process
Enter fullscreen mode Exit fullscreen mode

The worker updates job progress via BullMQ's progress API. The API server listens for Bull queue events and relays them to the appropriate WebSocket connection (matched by job ID).

Server Setup

// ws/server.js
import { WebSocketServer } from 'ws';
import { QueueEvents } from 'bullmq';
import { verifyJobOwnership } from '../lib/auth.js';

const redisConnection = {
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD
};

// Map of jobId → Set of WebSocket connections
const jobConnections = new Map();

export function setupWebSocketServer(httpServer) {
  const wss = new WebSocketServer({
    server: httpServer,
    path: '/ws'
  });

  // Listen for Bull queue events
  const queueEvents = new QueueEvents('video:encode', { connection: redisConnection });

  queueEvents.on('progress', ({ jobId, data }) => {
    const connections = jobConnections.get(jobId);
    if (!connections) return;

    const message = JSON.stringify({
      type: 'progress',
      jobId,
      stage: data.stage,
      pct: data.pct,
      message: data.message
    });

    for (const ws of connections) {
      if (ws.readyState === ws.OPEN) {
        ws.send(message);
      }
    }
  });

  queueEvents.on('completed', ({ jobId, returnvalue }) => {
    const connections = jobConnections.get(jobId);
    if (!connections) return;

    const message = JSON.stringify({
      type: 'completed',
      jobId,
      result: JSON.parse(returnvalue || '{}')
    });

    for (const ws of connections) {
      if (ws.readyState === ws.OPEN) {
        ws.send(message);
        ws.close(1000, 'Job complete');
      }
    }

    jobConnections.delete(jobId);
  });

  queueEvents.on('failed', ({ jobId, failedReason }) => {
    const connections = jobConnections.get(jobId);
    if (!connections) return;

    const message = JSON.stringify({
      type: 'failed',
      jobId,
      error: failedReason
    });

    for (const ws of connections) {
      if (ws.readyState === ws.OPEN) ws.send(message);
    }

    jobConnections.delete(jobId);
  });

  // Handle new WebSocket connections
  wss.on('connection', async (ws, req) => {
    // Parse jobId from URL: /ws?jobId=xxx&token=yyy
    const params = new URL(req.url, 'http://localhost').searchParams;
    const jobId = params.get('jobId');
    const token = params.get('token');

    if (!jobId || !token) {
      ws.close(4001, 'Missing jobId or token');
      return;
    }

    // Verify the user owns this job
    const authorized = await verifyJobOwnership(token, jobId);
    if (!authorized) {
      ws.close(4003, 'Unauthorized');
      return;
    }

    // Register connection
    if (!jobConnections.has(jobId)) {
      jobConnections.set(jobId, new Set());
    }
    jobConnections.get(jobId).add(ws);

    // Send current job state immediately
    const currentState = await getCurrentJobState(jobId);
    if (currentState) {
      ws.send(JSON.stringify({ type: 'state', ...currentState }));
    }

    ws.on('close', () => {
      const connections = jobConnections.get(jobId);
      if (connections) {
        connections.delete(ws);
        if (connections.size === 0) jobConnections.delete(jobId);
      }
    });

    // Heartbeat
    ws.isAlive = true;
    ws.on('pong', () => { ws.isAlive = true; });
  });

  // Heartbeat interval to clean up dead connections
  const heartbeat = setInterval(() => {
    wss.clients.forEach(ws => {
      if (!ws.isAlive) return ws.terminate();
      ws.isAlive = false;
      ws.ping();
    });
  }, 30_000);

  wss.on('close', () => clearInterval(heartbeat));

  return wss;
}
Enter fullscreen mode Exit fullscreen mode

Worker: Emitting Progress

// workers/processors/encode.js
export async function encodeProcessor(job) {
  const { videoPath, clipSegments } = job.data;

  await job.updateProgress({ stage: 'starting', pct: 5, message: 'Preparing video...' });

  // Face detection
  let cropX;
  try {
    await job.updateProgress({ stage: 'analyzing', pct: 15, message: 'Detecting faces...' });
    cropX = await detectAndComputeCrop(videoPath);
  } catch {
    cropX = null; // fallback
  }

  await job.updateProgress({ stage: 'encoding', pct: 30, message: 'Encoding clips...' });

  const outputPaths = [];
  for (let i = 0; i < clipSegments.length; i++) {
    await encodeClip(videoPath, clipSegments[i], cropX);
    const pct = 30 + Math.floor(((i + 1) / clipSegments.length) * 50);
    await job.updateProgress({
      stage: 'encoding',
      pct,
      message: `Encoded clip ${i + 1} of ${clipSegments.length}`
    });
    outputPaths.push(/* output path */);
  }

  await job.updateProgress({ stage: 'uploading', pct: 85, message: 'Uploading clips...' });
  const urls = await uploadClips(outputPaths);

  await job.updateProgress({ stage: 'complete', pct: 100, message: 'Done!' });
  return { clipUrls: urls };
}
Enter fullscreen mode Exit fullscreen mode

React Client Hook

// hooks/useJobProgress.js
import { useEffect, useRef, useState, useCallback } from 'react';

const RECONNECT_DELAY = 2000;
const MAX_RECONNECTS = 5;

export function useJobProgress(jobId, authToken) {
  const [state, setState] = useState({ status: 'connecting', pct: 0, stage: '', message: '' });
  const wsRef = useRef(null);
  const reconnectCount = useRef(0);
  const reconnectTimer = useRef(null);

  const connect = useCallback(() => {
    if (!jobId || !authToken) return;

    const url = `${process.env.NEXT_PUBLIC_WS_URL}/ws?jobId=${jobId}&token=${authToken}`;
    const ws = new WebSocket(url);
    wsRef.current = ws;

    ws.onopen = () => {
      reconnectCount.current = 0;
      setState(prev => ({ ...prev, status: 'connected' }));
    };

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);

      switch (data.type) {
        case 'progress':
          setState({ status: 'processing', pct: data.pct, stage: data.stage, message: data.message });
          break;
        case 'completed':
          setState({ status: 'completed', pct: 100, stage: 'done', clips: data.result.clipUrls });
          break;
        case 'failed':
          setState({ status: 'failed', error: data.error, pct: 0, stage: '' });
          break;
        case 'state':
          setState({ status: data.status, pct: data.pct || 0, stage: data.stage || '' });
          break;
      }
    };

    ws.onclose = (event) => {
      if (event.code === 1000) return; // Normal close

      if (reconnectCount.current < MAX_RECONNECTS) {
        reconnectCount.current++;
        reconnectTimer.current = setTimeout(connect, RECONNECT_DELAY);
      } else {
        setState(prev => ({ ...prev, status: 'disconnected' }));
      }
    };

    ws.onerror = () => {
      ws.close();
    };
  }, [jobId, authToken]);

  useEffect(() => {
    connect();
    return () => {
      clearTimeout(reconnectTimer.current);
      wsRef.current?.close(1000, 'Component unmounted');
    };
  }, [connect]);

  return state;
}
Enter fullscreen mode Exit fullscreen mode

React Component

// components/JobProgress.jsx
import { useJobProgress } from '../hooks/useJobProgress';

export function JobProgress({ jobId, token }) {
  const { status, pct, stage, message, clips, error } = useJobProgress(jobId, token);

  if (status === 'completed') {
    return (
      <div>
        <p>Processing complete! {clips?.length} clips ready.</p>
        {clips?.map((url, i) => (
          <a key={i} href={url} download>Download clip {i + 1}</a>
        ))}
      </div>
    );
  }

  if (status === 'failed') {
    return <p>Processing failed: {error}</p>;
  }

  return (
    <div>
      <div style={{ width: `${pct}%`, height: 4, background: '#4ade80', transition: 'width 0.3s' }} />
      <p>{message || stage}{pct}%</p>
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

Connection Management at Scale

One pitfall: if thousands of users have open WebSocket connections and your server restarts, they all reconnect simultaneously. Rate-limit reconnections at the load balancer level, or use jittered reconnection delays on the client:

const delay = RECONNECT_DELAY * (1 + Math.random() * 0.5); // ±25% jitter
reconnectTimer.current = setTimeout(connect, delay);
Enter fullscreen mode Exit fullscreen mode

ClipSpeedAI uses this exact pattern. The progress bar updating in real-time during processing is one of the features users mention most in feedback — it converts a passive wait into an engaged experience.

The full implementation above handles connection cleanup, dead connection detection via heartbeats, job ownership verification, and graceful reconnection. It's the production-ready version of what most tutorials cover only superficially. The complete video processing pipeline it connects to — including AI clip selection and face tracking — is available as a hosted service at ClipSpeedAI.

Top comments (0)