DEV Community

Cover image for Building Your Own Scalable Video Streaming Server: Part 2 - Setup & Core Implementation
Awal Hossain
Awal Hossain

Posted on

2 2 2 2 1

Building Your Own Scalable Video Streaming Server: Part 2 - Setup & Core Implementation

Alright, in Part 1, we got the 30,000-foot view of our microservices architecture for video streaming. We talked about the why (handling uploads, conversion, smooth playback) and the what (API Gateway, API Server, Video Conversion Service, HLS).

If you missed Part 1, you can find the link here: Part-1

Now, it's time to roll up our sleeves and get our hands dirty with the how. In this part, we'll focus on:

  1. Setting up your development environment.
  2. Implementing the direct-to-cloud upload flow using pre-signed URLs.
  3. Wiring up the video conversion pipeline using FFmpeg, BullMQ, and RabbitMQ.

1. Setting Up Shop: Your Development Environment

First things first, let's make sure you have the necessary tools installed.

Prerequisites:

  • Node.js: v18.0.0 or later (check with node -v).
  • Yarn: (Or npm) for package management (check with yarn -v).
  • Docker & Docker Compose: To easily run our services and dependencies like Redis and RabbitMQ. Install Docker.
  • FFmpeg: The video processing powerhouse. If you missed it in Part 1, installation guides are often found online (e.g., brew install ffmpeg on Mac, or check the official FFmpeg site).
  • Code Editor: VS Code, WebStorm, Sublime Text - your choice!
  • Git: For cloning the project repository.

Get the Code:

git clone https://github.com/AwalHossain/video-streaming-server
cd video-streaming-server
Enter fullscreen mode Exit fullscreen mode

Environment Variables (.env): The Secret Sauce

Each of our services needs some configuration, like database connection strings, API keys, and secrets. We manage these using .env files. You'll find a .env.example file in each service directory (api-gateway, api-server, video-conversion).

Action: For each service directory:

  1. Copy .env.example to a new file named .env.
  2. Open the .env file and fill in the values specific to your setup.

Here are some key variables you'll need to configure:

  • PORT: The port each service will run on (e.g., 8000, 8001, 8002).
  • MONGO_URI: Your MongoDB connection string (for api-server).
  • RABBITMQ_ENDPOINT: Your RabbitMQ server address (e.g., amqp://user:password@localhost:5672).
  • REDIS_URL: Your Redis connection URL (for video-conversion, e.g., redis://localhost:6379).
  • JWT_SECRET: A long, random secret string for signing authentication tokens (used by api-gateway and api-server).
  • Cloud Storage Credentials (e.g., for Digital Ocean Spaces or AWS S3):
    • DO_SPACES_ACCESS_KEY / AWS_ACCESS_KEY_ID
    • DO_SPACES_SECRET_KEY / AWS_SECRET_ACCESS_KEY
    • DO_SPACES_ENDPOINT / AWS_ENDPOINT (if using non-AWS S3 compatible)
    • DO_SPACES_BUCKET_NAME / AWS_BUCKET_NAME
    • DO_SPACES_REGION / AWS_REGION
  • CLIENT_URL1, CLIENT_URL2: The URLs of your frontend application(s) for CORS configuration.

2. The Grand Upload: Pre-signed URLs in Action

Remember how we wanted to avoid bogging down our servers with large file uploads? Pre-signed URLs are the answer! The client gets a temporary, secure link to upload directly to cloud storage.

API Gateway - The Gatekeeper:

  1. Routing: We need an endpoint to request the pre-signed URL.

    // api-gateway/src/app/routes/index.ts
    import { Router } from 'express';
    import { AuthRoutes } from '../modules/auth/auth.routes';
    import { VideoRoutes } from '../modules/video/video.route';
    
    const router = Router();
    const ModuleRoutes = [
      { path: '/auth', route: AuthRoutes },
      { path: '/videos', route: VideoRoutes }, // Our video routes
    ];
    ModuleRoutes.forEach((route) => router.use(route.path, route.route));
    export default router;
    
    // api-gateway/src/app/modules/video/video.route.ts
    import express from 'express';
    import auth from '../../middleware/auth'; // <-- Authentication middleware
    import { VideoController } from './video.controller';
    
    const router = express.Router();
    // POST endpoint protected by auth middleware
    router.post(
        '/presigned-url',
         auth(), // Ensures only logged-in users can get a URL
         VideoController.getPresignedUrl
        );
    router.post(
        '/confirm-upload',
         auth(),
         VideoController.confirmUpload
         );
    // ... other video routes
    export const VideoRoutes = router;
    
  2. Controller (getPresignedUrl): This function handles the request.

    // api-gateway/src/app/modules/video/video.controller.ts
    import { Request, Response } from 'express';
    import { v4 as uuidv4 } from 'uuid';
    import catchAsync from '../../../shared/catchAsyncError';
    import sendResponse from '../../../shared/response';
    import doSpacesUpload from '../../../utils/doSpacesUpload'; // The utility function
    
    const getPresignedUrl = catchAsync(async (req: Request, res: Response) => {
      const { filename, contentType } = req.body; // Get info from client
    
      if (!filename || !contentType) {
        return res.status(400).json({ /* ... error */ });
      }
    
      const userId = req.user.id; // From auth middleware
    
      // Clean filename and make it unique
      const cleanFilename = filename
        .split('.')[0]
        .replace(/\s+/g, '-')
        // ... more cleaning ...
        + '-' + uuidv4() + '.' + filename.split('.').pop();
    
      // Call the utility to generate the URL
      const result = await doSpacesUpload.generatePresignedUrl({
        filename: cleanFilename,
        contentType: contentType,
        userId: userId,
        expirySeconds: 3600, // URL valid for 1 hour
      });
    
      sendResponse(res, {
        statusCode: 200,
        success: true,
        message: 'Presigned URL generated successfully',
        data: result, // Send URL and fileKey back to client
      });
    });
    
  3. Utility (doSpacesUpload.ts): This interacts with the cloud storage SDK.

    // api-gateway/src/utils/doSpacesUpload.ts
    import AWS from 'aws-sdk'; // Using AWS SDK for S3 compatible storage
    import config from '../config';
    
    const s3 = new AWS.S3({ // Configure S3 client
      endpoint: config.doSpaces.endpoint,
      accessKeyId: config.doSpaces.accessKey,
      secretAccessKey: config.doSpaces.secretKey,
      region: config.doSpaces.region,
      s3ForcePathStyle: true,
      signatureVersion: 'v4'
    });
    
    const generatePresignedUrl = async (options) => {
      const { filename, contentType, userId, expirySeconds = 3600 } = options;
      const timestamp = Date.now();
      // Define the path/key in the bucket
      const key = `uploads/${userId}/${timestamp}-${filename}`;
    
      const params = {
        Bucket: config.doSpaces.bucketName,
        Key: key,
        Expires: expirySeconds,
        // ACL: 'public-read', // Optional: If you want uploaded files to be public
        // ContentType: contentType // Often needed for direct browser uploads
      };
    
      try {
        // Generate the signed URL for PUT operation
        const url = await s3.getSignedUrlPromise('putObject', params);
        const fileUrl = `https://${config.doSpaces.bucketName}.${config.doSpaces.endpoint}/${key}`;
    
        return {
          uploadUrl: url, // The URL the client will use to PUT the file
          fileKey: key,   // The path/key of the file in the bucket
          bucketName: config.doSpaces.bucketName,
          fileUrl, // Direct link to the file (if public)
          fileName: key.split('/').pop(),
        };
      } catch (error) {
        console.error('Error generating presigned URL:', error);
        throw error;
      }
    };
    
    export default { generatePresignedUrl };
    

Client-Side Upload (Conceptual):

Once the client receives the uploadUrl, fileKey, etc., they perform the upload.

// Simple example using fetch
async function uploadFile(presignedData, file) {
  const { uploadUrl, contentType } = presignedData; // Assume contentType was passed back or known

  try {
    const response = await fetch(uploadUrl, {
      method: 'PUT',
      headers: {
        'Content-Type': file.type, // Use the actual file type
        // 'x-amz-acl': 'public-read' // If ACL was set during URL generation
      },
      body: file
    });

    if (!response.ok) {
      throw new Error(`Upload failed: ${response.statusText}`);
    }
    console.log('Upload successful!');
    // Now call the confirmUpload endpoint
  } catch (error) {
    console.error('Upload error:', error);
  }
}
Enter fullscreen mode Exit fullscreen mode

Confirming the Upload:

After the direct upload finishes, the client must notify our backend.

// api-gateway/src/app/modules/video/video.controller.ts (confirmUpload)
const confirmUpload = catchAsync(async (req: Request, res: Response) => {
  // Client sends back fileKey, originalName, etc. received earlier
  const { fileKey, originalName, fileName, bucketName } = req.body;
  const userId = req.user.id;

  // 1. Notify user upload is complete (via RabbitMQ -> WebSocket)
  RabbitMQ.sendToQueue(NOTIFY_EVENTS.NOTIFT_VIDEO_UPLOADING_BUCKET, {
     userId, status: 'completed', /* ... */
  });

  // 2. Prepare metadata payload for API Server
  const metadataPayload = {
    originalName,
    recordingDate: Date.now(),
    author: userId,
    fileName,
    title: originalName.split('.')[0].replace(/[_]/g, ''), // Basic title generation
    // ... other initial metadata
  };

  // 3. Send event to API Server to save metadata
  broadcastVideoEvent(API_SERVER_EVENTS.INSERT_VIDEO_METADATA_EVENT, metadataPayload);

  // 4. Prepare data for Video Conversion Service
  const conversionData = { userId, fileName, bucketName, fileKey };

  // 5. Send event to Video Conversion Service to start processing
  broadcastVideoEvent(
    VIDEO_CONVERSION_SERVER.SEND_VIDEO_METADATA_EVENT,
    conversionData
  );

  sendResponse(res, { /* ... success response */ });
});
Enter fullscreen mode Exit fullscreen mode

Phew! That's the upload flow. The key is the client uploads directly to storage, and then notifies our backend to kick off the next steps via RabbitMQ events.

3. Conversion Central: FFmpeg, BullMQ, and RabbitMQ

Now, the Video Conversion service takes over.

Receiving the Signal:

The service listens for the SEND_VIDEO_METADATA_EVENT on RabbitMQ. When it receives a message, it triggers the download process.

// video-conversion/src/server.ts (Conceptual listener setup)
import RabbitMQ from './shared/rabbitMQ';
import downloadBlob from './processor/downloadFile';
import { VIDEO_CONVERSION_SERVER } from './constant/events';

async function startServer() {
 // ... other setup ...

 await RabbitMQ.connect();

 // Listen for the event from API Gateway
 await RabbitMQ.consume(
   VIDEO_CONVERSION_SERVER.SEND_VIDEO_METADATA_EVENT,
   async (msg, ack) => {
     try {
       const data = JSON.parse(msg.content.toString());
       logger.info('Received video metadata event:', data);
       // Trigger the download process
       await downloadBlob(data.bucketName, data.fileKey, data.userId);
       ack(); // Acknowledge the message
     } catch (error) {
       errorLogger.error('Failed processing metadata event:', error);
       // Decide if you should ack or nack based on the error
       ack(); // Ack for now to prevent reprocessing loop on simple errors
     }
   }
 );

 // ... start express app ...
}
Enter fullscreen mode Exit fullscreen mode

Downloading the File (downloadFile.ts):

This function grabs the file from cloud storage.

// video-conversion/src/processor/downloadFile.ts
import fs from 'fs';
import path from 'path';
import { v4 as uuidv4 } from 'uuid';
import s3 from '../shared/s3Client'; // Configured S3 client
import initiateVideoProcessing from './initiateVideoProcessing'; // Next step
import RabbitMQ from '../shared/rabbitMQ';
import { API_GATEWAY_EVENTS } from '../constant/events';

async function downloadBlob(bucketName, fileKey, userId) {
  try {
    const fileName = path.basename(fileKey);
    const originalName = fileName.substring(fileName.indexOf('-') + 1); // Extract original name

    // Notify download start
    RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_VIDEO_DOWNLOADING, { /* ... */ });

    // Create a unique local directory for processing
    const uploadFolder = `container-${uuidv4()}`;
    const destination = path.normalize(path.join('uploads', uploadFolder, 'videos')).replace(/\\\\/g, '/');
    fs.mkdirSync(destination, { recursive: true });
    const videoPath = path.normalize(path.join(destination, fileName)).replace(/\\\\/g, '/');

    logger.info(`Downloading file from DO Spaces to ${videoPath}`);

    // Perform the download using the S3 SDK stream
    await new Promise<void>((resolve, reject) => {
      const writeStream = fs.createWriteStream(videoPath);
      s3.getObject({ Bucket: bucketName, Key: fileKey })
        .createReadStream()
        .pipe(writeStream)
        .on('error', reject)
        .on('finish', resolve);
    });

    // Notify download complete
    RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_VIDEO_DOWNLOADED, { /* ... */ }); // Typo in original? Should be NOTIFY_VIDEO_DOWNLOADED

    // --- Trigger the next step: Add to processing queue ---
    // (Note: Fetching metadata via API_SERVER_EVENTS.GET_VIDEO_METADATA_EVENT
    // might happen here or be passed in the initial event)
    const videoMetadata = { _id: 'some-id-from-db', /* ... other fields */ }; // Placeholder

    await initiateVideoProcessing({ videoPath, destination, userId, videoMetadata });

  } catch (error) {
    errorLogger.error('Error in downloadBlob:', error);
    RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_FAILED, { /* ... */ });
    throw error;
  }
}
Enter fullscreen mode Exit fullscreen mode

Kicking Off the Queue (initiateVideoProcessing.ts):

After downloading, we add the first job to our BullMQ processing queue.

// video-conversion/src/processor/initiateVideoProcessing.ts
import { QUEUE_EVENTS, API_SERVER_EVENTS } from '../constant/events';
import { addQueueItem } from '../queues/addJobToQueue'; // BullMQ helper
import { getVideoDurationAndResolution } from './videoProcessingHandler';
import RabbitMQ from '../shared/rabbitMQ';

const initiateVideoProcessing = async ({ videoPath, destination, userId, videoMetadata }) => {
  // Get basic video info like duration
  const { videoDuration } = await getVideoDurationAndResolution(videoPath);

  const payload = {
    ...videoMetadata,
    duration: videoDuration,
    id: videoMetadata._id, // Crucial: DB id for tracking
    path: videoPath,       // Local path after download
    destination,           // Local processing directory
    userId,
  };

  // Optional: Update DB immediately with duration/local path
  RabbitMQ.sendToQueue(API_SERVER_EVENTS.UPDATE_METADATA_EVENT, {
    id: videoMetadata._id,
    videoPath, // May not be needed in DB long-term
    duration: videoDuration,
  });

  // Add the first job to the queue!
  await addQueueItem(QUEUE_EVENTS.VIDEO_UPLOADED, payload); // Start the chain
};

export default initiateVideoProcessing;
Enter fullscreen mode Exit fullscreen mode

The BullMQ Workforce:

BullMQ manages our jobs.

  • Queue Setup (addJobToQueue.ts): Defines the queues based on event names.

    // video-conversion/src/queues/addJobToQueue.ts
    import { Queue } from 'bullmq';
    import { ALL_EVENTS as QUEUE_EVENTS } from '../constant/events';
    import { RedisClient } from '../shared/redis'; // Redis connection
    
    export const queues = Object.values(QUEUE_EVENTS).map((queueName) => {
      return {
        name: queueName,
        queueObj: new Queue(queueName, { connection: RedisClient.redisConnection }),
      };
    });
    
    export const addQueueItem = async (queueName, item) => {
      const queue = queues.find((q) => q.name === queueName);
      if (!queue) throw new Error(`Queue ${queueName} not found`);
      // Add job to the queue
      await queue.queueObj.add(queueName, item, {
        removeOnComplete: true, // Clean up successful jobs
        removeOnFail: false,    // Keep failed jobs for inspection
      });
    };
    
  • Worker Setup (jobWorker.ts): Listens for jobs and calls the right handler.

    // video-conversion/src/worker/jobWorker.ts
    import { Worker } from 'bullmq';
    import { QUEUE_EVENTS } from '../constant/events';
    import { QUEUE_EVENT_HANDLERS } from '../processor/videoEventHandler'; // Maps event name to function
    import { RedisClient } from '../shared/redis';
    import { logger } from '../shared/logger';
    
    export const listenQueueEvent = (queueName) => {
      const worker = new Worker(
        queueName,
        async (job) => { // Job processor function
          const handler = QUEUE_EVENT_HANDLERS[queueName]; // Find the right handler
          if (handler) {
            logger.info(`Processing job ${job.id} from queue ${queueName}`);
            return await handler(job); // Execute the handler
          }
          throw new Error('No handler found for queue: ' + queueName);
        },
        { connection: RedisClient.redisConnection, concurrency: 3 } // Process up to 3 jobs concurrently
      );
    
      worker.on('failed', (job, err) => {
        logger.error(`Job ${job?.id} failed in queue ${queueName}: ${err.message}`, err);
      });
    
      logger.info(`${queueName} worker started.`);
    };
    
    export const setupAllQueueEvent = () => { // Called at startup
      Object.values(QUEUE_EVENTS).forEach((queueName) => {
        listenQueueEvent(queueName);
      });
      // videoLifecycleHandler(); // Setup lifecycle listeners (e.g., for uploads)
      return true;
    };
    

The Processing Pipeline (videoEventHandler.ts):

This file contains the functions (handlers) that execute for each job type. They often add the next job in the sequence upon completion.

// video-conversion/src/processor/videoEventHandler.ts
import { Job } from 'bullmq';
import { QUEUE_EVENTS } from '../constant/events';
import { addQueueItem } from '../queues/addJobToQueue';
import processMp4ToHls from './hlsConvertProcessor';
import { generateThumbnail, processRawFileToMp4WithWatermark } from './mp4ConvertProcessor';
// ... other imports like fs, path, logger

const uploadedHandler = async (job: Job) => {
  logger.info('Handler: VIDEO_UPLOADED', job.data.id);
  // Simply trigger the next step
  await addQueueItem(QUEUE_EVENTS.VIDEO_PROCESSING, job.data);
};

const processingHandler = async (job: Job) => { // Convert to MP4 if needed
  logger.info('Handler: VIDEO_PROCESSING', job.data.id);
  const filePath = job.data.path; // Local path of downloaded file
  const uploadPath = `${job.data.destination}/processed`; // Output dir for MP4
  const thumbnailPath = `${job.data.destination}/thumbnails`; // Output dir for thumbs
  fs.mkdirSync(uploadPath, { recursive: true });
  fs.mkdirSync(thumbnailPath, { recursive: true });

  // Generate thumbnail (can happen in parallel or sequence)
  await generateThumbnail(filePath, thumbnailPath, job.data); // Sends RabbitMQ event on completion

  if (needsConversion(filePath)) { // Check if it's already mp4/webm etc.
     logger.info(`Converting ${filePath} to MP4...`);
     // Calls ffmpeg internally, adds VIDEO_PROCESSED job on completion
     await processRawFileToMp4WithWatermark(filePath, uploadPath, {
       ...job.data, next: QUEUE_EVENTS.VIDEO_PROCESSED
     });
  } else {
     logger.info(`Skipping MP4 conversion for ${filePath}.`);
     // File is okay, move to next step directly
     const destPath = `${uploadPath}/${path.basename(filePath)}`; // Define destination
     fs.copyFileSync(filePath, destPath); // Just copy it
     // Add the next job manually since we skipped the conversion function
     await addQueueItem(QUEUE_EVENTS.VIDEO_PROCESSED, { ...job.data, path: destPath });
  }
};

const processedHandler = async (job: Job) => { // MP4 ready, start HLS
   logger.info('Handler: VIDEO_PROCESSED', job.data.id);
   await addQueueItem(QUEUE_EVENTS.VIDEO_HLS_CONVERTING, job.data);
};

const hlsConvertingHandler = async (job: Job) => { // The core HLS conversion
  logger.info('Handler: VIDEO_HLS_CONVERTING', job.data.id);
  const hlsUploadPath = `${job.data.destination}/hls`; // Output dir for HLS files
  fs.mkdirSync(hlsUploadPath, { recursive: true });

  // Calls the main ffmpeg HLS logic, adds VIDEO_HLS_CONVERTED job on completion
  await processMp4ToHls(job.data.path, hlsUploadPath, { // job.data.path is now the MP4 path
      ...job.data, next: QUEUE_EVENTS.VIDEO_HLS_CONVERTED
  });
};

const hlsConvertedHandler = async (job: Job) => { // HLS done, trigger upload
    logger.info('Handler: VIDEO_HLS_CONVERTED', job.data.id);
    // This handler might trigger the upload of the HLS files to cloud storage
    // via processHLSFile called from videoLifecycleHandler.ts
    // It should also send a final notification event.
};

export const QUEUE_EVENT_HANDLERS = { // Mapping used by the worker
  [QUEUE_EVENTS.VIDEO_UPLOADED]: uploadedHandler,
  [QUEUE_EVENTS.VIDEO_PROCESSING]: processingHandler,
  [QUEUE_EVENTS.VIDEO_PROCESSED]: processedHandler,
  [QUEUE_EVENTS.VIDEO_HLS_CONVERTING]: hlsConvertingHandler,
  [QUEUE_EVENTS.VIDEO_HLS_CONVERTED]: hlsConvertedHandler,
};
Enter fullscreen mode Exit fullscreen mode

FFmpeg Magic (hlsConvertProcessor.ts):

This is where the actual HLS conversion command is built and executed.

// video-conversion/src/processor/hlsConvertProcessor.ts
import ffmpegPath from 'ffmpeg-static';
import ffmpeg from 'fluent-ffmpeg';
import path from 'path';
import RabbitMQ from '../shared/rabbitMQ';
import { API_GATEWAY_EVENTS } from '../constant/events';
// ... other imports

ffmpeg.setFfmpegPath(ffmpegPath); // Tell fluent-ffmpeg where ffmpeg is

const processMp4ToHls = async (filePath, outputFolder, jobData) => {
  const fileNameWithoutExt = path.basename(filePath, path.extname(filePath));

  // --- Define HLS Renditions (Quality Levels) ---
  // (Includes logic for vertical vs. landscape videos from your code)
  const renditions = [
      { resolution: '854x480', bitrate: '800k', name: '480p' },
      { resolution: '1920x1080', bitrate: '5000k', name: '1080p' },
      // Add more if needed
  ];

  const renditionProgress = {}; // Track progress per rendition
  renditions.forEach(r => renditionProgress[r.name] = 0);
  let lastReportedProgress = 0;

  const promises = renditions.map((rendition) => {
    return new Promise<void>((resolve, reject) => {
      const outputFileName = `${fileNameWithoutExt}_${rendition.name}.m3u8`;
      const segmentFileName = `${outputFolder}/${fileNameWithoutExt}_${rendition.name}_%03d.ts`;

      ffmpeg(filePath) // Input MP4 file
        .output(path.join(outputFolder, outputFileName)) // Output M3U8 playlist for this rendition
        .outputOptions([
          `-s ${rendition.resolution}`,    // Set resolution
          `-c:v libx264`,                 // Video codec
          `-crf 23`,                      // Quality (lower is better)
          `-preset fast`,                 // Encoding speed vs compression
          `-b:v ${rendition.bitrate}`,    // Target video bitrate
          `-g 48`,                        // Keyframe interval
          `-hls_time 10`,                 // Segment duration (seconds)
          `-hls_playlist_type vod`,       // Video on Demand playlist
          `-hls_flags independent_segments`,
          `-hls_segment_filename ${segmentFileName}`, // Naming pattern for TS files
        ])
        .on('start', (cmd) => logger.info(`FFmpeg started for ${rendition.name}: ${cmd}`))
        .on('progress', (progress) => { // --- Progress Reporting ---
          renditionProgress[rendition.name] = progress.percent || 0;
          const totalProgress = Object.values(renditionProgress).reduce((a, b) => a + b, 0);
          const overallProgress = Math.round(totalProgress / renditions.length);

          // Report progress in ~10% increments
          if (overallProgress > 0 && overallProgress - lastReportedProgress >= 10) {
            lastReportedProgress = overallProgress;
            RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSING, {
                userId: jobData.userId,
                status: 'processing',
                progress: overallProgress,
                fileName: fileNameWithoutExt,
                /* ... */
            });
          }
        })
        .on('end', () => {
          logger.info(`FFmpeg finished for ${rendition.name}`);
          resolve();
        })
        .on('error', (err) => {
          errorLogger.error(`FFmpeg error for ${rendition.name}:`, err);
          RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_FAILED, { /* ... failure data */ });
          reject(err);
        })
        .run();
    });
  });

  await Promise.all(promises); // Wait for all renditions

  // --- Create Master Playlist ---
  // (Code to generate the master manifest referencing all rendition playlists)
  // fs.writeFileSync(path.join(outputFolder, `${fileNameWithoutExt}_master.m3u8`), masterPlaylistContent);

  logger.info('HLS conversion completed for:', fileNameWithoutExt);

  // Send final success notification
  RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSED, {
       userId: jobData.userId, status: 'completed', /* ... */
  });

  // Add the next job (e.g., upload HLS files) or mark as complete
  // This is handled by videoLifecycleHandler in your code
  // which listens for QUEUE_EVENTS.VIDEO_HLS_CONVERTED and calls processHLSFile
};
Enter fullscreen mode Exit fullscreen mode

Uploading Processed Files:

The videoLifecycleHandler.ts listens for the QUEUE_EVENTS.VIDEO_HLS_CONVERTED event emitted by BullMQ (when the hlsConvertedHandler completes). It then calls processHLSFile.ts, which in turn likely uses uploadToCloud.ts to upload the generated .m3u8 playlists and .ts segment files back to your cloud storage.

4. Tying it Together: RabbitMQ & Real-time Updates

We saw RabbitMQ used everywhere!

  • API Gateway -> API Server (Save Metadata)
  • API Gateway -> Video Conversion (Start Processing)
  • Video Conversion -> API Gateway (Progress/Status Updates)
  • Video Conversion -> API Server (Update Metadata like duration, status)

The key for real-time updates is:

  1. Video Conversion sends progress events (e.g., NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSING) to RabbitMQ, including the userId.
  2. API Gateway consumes these events from RabbitMQ.
  3. Using Socket.io, the API Gateway finds the WebSocket connection associated with that userId and emits the progress data directly to the connected client's browser.
// api-gateway/src/socket/index.ts (Conceptual Example)
import { Server as SocketIOServer, Socket } from 'socket.io';
import http from 'http';
import RabbitMQ from '../shared/rabbitMQ';
import { API_GATEWAY_EVENTS } from '../constants/event'; // Or NOTIFY_EVENTS

let io: SocketIOServer;
const users: Map<string, string> = new Map(); // Map userId to socketId

export const initSocketIO = (server: http.Server) => {
  io = new SocketIOServer(server, { cors: { /* ... */ } });

  io.on('connection', (socket: Socket) => {
    console.log('A user connected:', socket.id);

    // Store user association (needs authentication, e.g., JWT verification on connect)
    const userId = getUserIdFromSocket(socket); // Implement this securely
    if (userId) {
        users.set(userId, socket.id);
        console.log(`User ${userId} mapped to socket ${socket.id}`);
    }


    socket.on('disconnect', () => {
      console.log('User disconnected:', socket.id);
      // Clean up user mapping
      users.forEach((socketId, uId) => {
          if (socketId === socket.id) {
              users.delete(uId);
          }
      });
    });
  });

  // Listen for RabbitMQ progress events
  listenForProgressUpdates();

  return io;
};

const listenForProgressUpdates = async () => {
    await RabbitMQ.consume(
        API_GATEWAY_EVENTS.NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSING, // Example event
        (msg, ack) => {
            try {
                const data = JSON.parse(msg.content.toString());
                const userId = data.userId;
                const socketId = users.get(userId); // Find the user's socket

                if (socketId && io) {
                     console.log(`Sending progress to user ${userId} (socket ${socketId})`);
                     // Emit event directly to the specific user's socket
                     io.to(socketId).emit('video_progress', data);
                } else {
                    console.log(`Socket not found for user ${userId}, unable to send progress.`);
                }
                ack();
            } catch (error) {
                console.error("Error processing progress update:", error);
                ack(); // Ack to avoid requeue loops
            }
        }
    );
    // ... listen for other notification events ...
};

// Helper to securely get userId from socket handshake/query
function getUserIdFromSocket(socket: Socket): string | null {
    // IMPORTANT: Implement proper authentication here!
    // E.g., verify JWT passed in handshake query or auth object
    return socket.handshake.query.userId as string || null;
}

export { io }; // Export for potential use elsewhere
Enter fullscreen mode Exit fullscreen mode

Docker Installation

The easiest way to run all services and their dependencies (MongoDB, RabbitMQ, Redis) is using Docker Compose. We have docker-compose.yml files defined. Let's look at the one in the root directory:

# docker-compose.yml (Root Directory - Simplified Example)
version: "3.7"
services:
  api-gateway:
    build: ./api-gateway
    ports:
      - "8000:8000" # Example port
    env_file:
      - ./api-gateway/.env
    depends_on:
      - rabbitmq
    volumes:
      - ./api-gateway:/app
      - /app/node_modules

  api-server:
    build: ./api-server
    ports:
      - "8001:8001" # Example port
    env_file:
      - ./api-server/.env
    depends_on:
      - mongodb
      - rabbitmq
    volumes:
      - ./api-server:/app
      - /app/node_modules

  video-conversion:
    build: ./video-conversion
    ports:
      - "8002:8002" # Example port
    env_file:
      - ./video-conversion/.env
    depends_on:
      - redis
      - rabbitmq
    volumes:
      - ./video-conversion:/app
      - ./video-conversion/uploads:/app/uploads # Mount uploads volume
      - /app/node_modules

  mongodb:
    image: mongo:latest
    ports:
      - "27017:27017"
    volumes:
      - mongo-data:/data/db

  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"  # For AMQP
      - "15672:15672" # For Management UI

  redis:
    image: redis:latest
    ports:
      - "6379:6379"

volumes:
  mongo-data:

Enter fullscreen mode Exit fullscreen mode

To start everything:

docker-compose up -d --build
Enter fullscreen mode Exit fullscreen mode

This command builds the images for our services (if they don't exist) and starts all containers in the background.

5. Conclusion & What's Next

We covered a lot of ground! We set up the environment, built the direct-to-cloud upload flow with pre-signed URLs, and wired up the complex video conversion pipeline using FFmpeg, BullMQ for queuing, and RabbitMQ for communication and real-time updates.
You should now have a much clearer understanding of the core mechanics.

Coming up in Part 3:

  • Diving deeper into the API Server and MongoDB schema.
  • Exploring deployment strategies using Docker Swarm or Kubernetes.
  • Discussing error handling and monitoring.
  • Potential optimizations and scaling considerations.

Part-3: Check out

Explore the full implementation code from the GitHub repository:

Server-Code
Server-Client

Do follow me on my Socials to stay updated on tech stuff:

💼 LinkedIn
🐙 Github

Top comments (2)

Collapse
 
nevodavid profile image
Nevo David

Amazing work! This guide makes complex processes seem so manageable and clear.

Collapse
 
awalhossain profile image
Awal Hossain

My pleasure! 😊