Alright, in Part 1, we got the 30,000-foot view of our microservices architecture for video streaming. We talked about the why (handling uploads, conversion, smooth playback) and the what (API Gateway, API Server, Video Conversion Service, HLS).
If you missed Part 1, you can find the link here: Part-1
Now, it's time to roll up our sleeves and get our hands dirty with the how. In this part, we'll focus on:
- Setting up your development environment.
- Implementing the direct-to-cloud upload flow using pre-signed URLs.
- Wiring up the video conversion pipeline using FFmpeg, BullMQ, and RabbitMQ.
1. Setting Up Shop: Your Development Environment
First things first, let's make sure you have the necessary tools installed.
Prerequisites:
- Node.js: v18.0.0 or later (check with
node -v
). - Yarn: (Or npm) for package management (check with
yarn -v
). - Docker & Docker Compose: To easily run our services and dependencies like Redis and RabbitMQ. Install Docker.
- FFmpeg: The video processing powerhouse. If you missed it in Part 1, installation guides are often found online (e.g.,
brew install ffmpeg
on Mac, or check the official FFmpeg site). - Code Editor: VS Code, WebStorm, Sublime Text - your choice!
- Git: For cloning the project repository.
Get the Code:
git clone https://github.com/AwalHossain/video-streaming-server
cd video-streaming-server
Environment Variables (.env): The Secret Sauce
Each of our services needs some configuration, like database connection strings, API keys, and secrets. We manage these using .env
files. You'll find a .env.example
file in each service directory (api-gateway
, api-server
, video-conversion
).
Action: For each service directory:
- Copy
.env.example
to a new file named.env
. - Open the
.env
file and fill in the values specific to your setup.
Here are some key variables you'll need to configure:
-
PORT
: The port each service will run on (e.g., 8000, 8001, 8002). -
MONGO_URI
: Your MongoDB connection string (forapi-server
). -
RABBITMQ_ENDPOINT
: Your RabbitMQ server address (e.g.,amqp://user:password@localhost:5672
). -
REDIS_URL
: Your Redis connection URL (forvideo-conversion
, e.g.,redis://localhost:6379
). -
JWT_SECRET
: A long, random secret string for signing authentication tokens (used byapi-gateway
andapi-server
). - Cloud Storage Credentials (e.g., for Digital Ocean Spaces or AWS S3):
-
DO_SPACES_ACCESS_KEY
/AWS_ACCESS_KEY_ID
-
DO_SPACES_SECRET_KEY
/AWS_SECRET_ACCESS_KEY
-
DO_SPACES_ENDPOINT
/AWS_ENDPOINT
(if using non-AWS S3 compatible) -
DO_SPACES_BUCKET_NAME
/AWS_BUCKET_NAME
-
DO_SPACES_REGION
/AWS_REGION
-
-
CLIENT_URL1
,CLIENT_URL2
: The URLs of your frontend application(s) for CORS configuration.
2. The Grand Upload: Pre-signed URLs in Action
Remember how we wanted to avoid bogging down our servers with large file uploads? Pre-signed URLs are the answer! The client gets a temporary, secure link to upload directly to cloud storage.
API Gateway - The Gatekeeper:
-
Routing: We need an endpoint to request the pre-signed URL.
// api-gateway/src/app/routes/index.ts import { Router } from 'express'; import { AuthRoutes } from '../modules/auth/auth.routes'; import { VideoRoutes } from '../modules/video/video.route'; const router = Router(); const ModuleRoutes = [ { path: '/auth', route: AuthRoutes }, { path: '/videos', route: VideoRoutes }, // Our video routes ]; ModuleRoutes.forEach((route) => router.use(route.path, route.route)); export default router; // api-gateway/src/app/modules/video/video.route.ts import express from 'express'; import auth from '../../middleware/auth'; // <-- Authentication middleware import { VideoController } from './video.controller'; const router = express.Router(); // POST endpoint protected by auth middleware router.post( '/presigned-url', auth(), // Ensures only logged-in users can get a URL VideoController.getPresignedUrl ); router.post( '/confirm-upload', auth(), VideoController.confirmUpload ); // ... other video routes export const VideoRoutes = router;
-
Controller (
getPresignedUrl
): This function handles the request.
// api-gateway/src/app/modules/video/video.controller.ts import { Request, Response } from 'express'; import { v4 as uuidv4 } from 'uuid'; import catchAsync from '../../../shared/catchAsyncError'; import sendResponse from '../../../shared/response'; import doSpacesUpload from '../../../utils/doSpacesUpload'; // The utility function const getPresignedUrl = catchAsync(async (req: Request, res: Response) => { const { filename, contentType } = req.body; // Get info from client if (!filename || !contentType) { return res.status(400).json({ /* ... error */ }); } const userId = req.user.id; // From auth middleware // Clean filename and make it unique const cleanFilename = filename .split('.')[0] .replace(/\s+/g, '-') // ... more cleaning ... + '-' + uuidv4() + '.' + filename.split('.').pop(); // Call the utility to generate the URL const result = await doSpacesUpload.generatePresignedUrl({ filename: cleanFilename, contentType: contentType, userId: userId, expirySeconds: 3600, // URL valid for 1 hour }); sendResponse(res, { statusCode: 200, success: true, message: 'Presigned URL generated successfully', data: result, // Send URL and fileKey back to client }); });
-
Utility (
doSpacesUpload.ts
): This interacts with the cloud storage SDK.
// api-gateway/src/utils/doSpacesUpload.ts import AWS from 'aws-sdk'; // Using AWS SDK for S3 compatible storage import config from '../config'; const s3 = new AWS.S3({ // Configure S3 client endpoint: config.doSpaces.endpoint, accessKeyId: config.doSpaces.accessKey, secretAccessKey: config.doSpaces.secretKey, region: config.doSpaces.region, s3ForcePathStyle: true, signatureVersion: 'v4' }); const generatePresignedUrl = async (options) => { const { filename, contentType, userId, expirySeconds = 3600 } = options; const timestamp = Date.now(); // Define the path/key in the bucket const key = `uploads/${userId}/${timestamp}-${filename}`; const params = { Bucket: config.doSpaces.bucketName, Key: key, Expires: expirySeconds, // ACL: 'public-read', // Optional: If you want uploaded files to be public // ContentType: contentType // Often needed for direct browser uploads }; try { // Generate the signed URL for PUT operation const url = await s3.getSignedUrlPromise('putObject', params); const fileUrl = `https://${config.doSpaces.bucketName}.${config.doSpaces.endpoint}/${key}`; return { uploadUrl: url, // The URL the client will use to PUT the file fileKey: key, // The path/key of the file in the bucket bucketName: config.doSpaces.bucketName, fileUrl, // Direct link to the file (if public) fileName: key.split('/').pop(), }; } catch (error) { console.error('Error generating presigned URL:', error); throw error; } }; export default { generatePresignedUrl };
Client-Side Upload (Conceptual):
Once the client receives the uploadUrl
, fileKey
, etc., they perform the upload.
// Simple example using fetch
async function uploadFile(presignedData, file) {
const { uploadUrl, contentType } = presignedData; // Assume contentType was passed back or known
try {
const response = await fetch(uploadUrl, {
method: 'PUT',
headers: {
'Content-Type': file.type, // Use the actual file type
// 'x-amz-acl': 'public-read' // If ACL was set during URL generation
},
body: file
});
if (!response.ok) {
throw new Error(`Upload failed: ${response.statusText}`);
}
console.log('Upload successful!');
// Now call the confirmUpload endpoint
} catch (error) {
console.error('Upload error:', error);
}
}
Confirming the Upload:
After the direct upload finishes, the client must notify our backend.
// api-gateway/src/app/modules/video/video.controller.ts (confirmUpload)
const confirmUpload = catchAsync(async (req: Request, res: Response) => {
// Client sends back fileKey, originalName, etc. received earlier
const { fileKey, originalName, fileName, bucketName } = req.body;
const userId = req.user.id;
// 1. Notify user upload is complete (via RabbitMQ -> WebSocket)
RabbitMQ.sendToQueue(NOTIFY_EVENTS.NOTIFT_VIDEO_UPLOADING_BUCKET, {
userId, status: 'completed', /* ... */
});
// 2. Prepare metadata payload for API Server
const metadataPayload = {
originalName,
recordingDate: Date.now(),
author: userId,
fileName,
title: originalName.split('.')[0].replace(/[_]/g, ''), // Basic title generation
// ... other initial metadata
};
// 3. Send event to API Server to save metadata
broadcastVideoEvent(API_SERVER_EVENTS.INSERT_VIDEO_METADATA_EVENT, metadataPayload);
// 4. Prepare data for Video Conversion Service
const conversionData = { userId, fileName, bucketName, fileKey };
// 5. Send event to Video Conversion Service to start processing
broadcastVideoEvent(
VIDEO_CONVERSION_SERVER.SEND_VIDEO_METADATA_EVENT,
conversionData
);
sendResponse(res, { /* ... success response */ });
});
Phew! That's the upload flow. The key is the client uploads directly to storage, and then notifies our backend to kick off the next steps via RabbitMQ events.
3. Conversion Central: FFmpeg, BullMQ, and RabbitMQ
Now, the Video Conversion service takes over.
Receiving the Signal:
The service listens for the SEND_VIDEO_METADATA_EVENT
on RabbitMQ. When it receives a message, it triggers the download process.
// video-conversion/src/server.ts (Conceptual listener setup)
import RabbitMQ from './shared/rabbitMQ';
import downloadBlob from './processor/downloadFile';
import { VIDEO_CONVERSION_SERVER } from './constant/events';
async function startServer() {
// ... other setup ...
await RabbitMQ.connect();
// Listen for the event from API Gateway
await RabbitMQ.consume(
VIDEO_CONVERSION_SERVER.SEND_VIDEO_METADATA_EVENT,
async (msg, ack) => {
try {
const data = JSON.parse(msg.content.toString());
logger.info('Received video metadata event:', data);
// Trigger the download process
await downloadBlob(data.bucketName, data.fileKey, data.userId);
ack(); // Acknowledge the message
} catch (error) {
errorLogger.error('Failed processing metadata event:', error);
// Decide if you should ack or nack based on the error
ack(); // Ack for now to prevent reprocessing loop on simple errors
}
}
);
// ... start express app ...
}
Downloading the File (downloadFile.ts
):
This function grabs the file from cloud storage.
// video-conversion/src/processor/downloadFile.ts
import fs from 'fs';
import path from 'path';
import { v4 as uuidv4 } from 'uuid';
import s3 from '../shared/s3Client'; // Configured S3 client
import initiateVideoProcessing from './initiateVideoProcessing'; // Next step
import RabbitMQ from '../shared/rabbitMQ';
import { API_GATEWAY_EVENTS } from '../constant/events';
async function downloadBlob(bucketName, fileKey, userId) {
try {
const fileName = path.basename(fileKey);
const originalName = fileName.substring(fileName.indexOf('-') + 1); // Extract original name
// Notify download start
RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_VIDEO_DOWNLOADING, { /* ... */ });
// Create a unique local directory for processing
const uploadFolder = `container-${uuidv4()}`;
const destination = path.normalize(path.join('uploads', uploadFolder, 'videos')).replace(/\\\\/g, '/');
fs.mkdirSync(destination, { recursive: true });
const videoPath = path.normalize(path.join(destination, fileName)).replace(/\\\\/g, '/');
logger.info(`Downloading file from DO Spaces to ${videoPath}`);
// Perform the download using the S3 SDK stream
await new Promise<void>((resolve, reject) => {
const writeStream = fs.createWriteStream(videoPath);
s3.getObject({ Bucket: bucketName, Key: fileKey })
.createReadStream()
.pipe(writeStream)
.on('error', reject)
.on('finish', resolve);
});
// Notify download complete
RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_VIDEO_DOWNLOADED, { /* ... */ }); // Typo in original? Should be NOTIFY_VIDEO_DOWNLOADED
// --- Trigger the next step: Add to processing queue ---
// (Note: Fetching metadata via API_SERVER_EVENTS.GET_VIDEO_METADATA_EVENT
// might happen here or be passed in the initial event)
const videoMetadata = { _id: 'some-id-from-db', /* ... other fields */ }; // Placeholder
await initiateVideoProcessing({ videoPath, destination, userId, videoMetadata });
} catch (error) {
errorLogger.error('Error in downloadBlob:', error);
RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_FAILED, { /* ... */ });
throw error;
}
}
Kicking Off the Queue (initiateVideoProcessing.ts
):
After downloading, we add the first job to our BullMQ processing queue.
// video-conversion/src/processor/initiateVideoProcessing.ts
import { QUEUE_EVENTS, API_SERVER_EVENTS } from '../constant/events';
import { addQueueItem } from '../queues/addJobToQueue'; // BullMQ helper
import { getVideoDurationAndResolution } from './videoProcessingHandler';
import RabbitMQ from '../shared/rabbitMQ';
const initiateVideoProcessing = async ({ videoPath, destination, userId, videoMetadata }) => {
// Get basic video info like duration
const { videoDuration } = await getVideoDurationAndResolution(videoPath);
const payload = {
...videoMetadata,
duration: videoDuration,
id: videoMetadata._id, // Crucial: DB id for tracking
path: videoPath, // Local path after download
destination, // Local processing directory
userId,
};
// Optional: Update DB immediately with duration/local path
RabbitMQ.sendToQueue(API_SERVER_EVENTS.UPDATE_METADATA_EVENT, {
id: videoMetadata._id,
videoPath, // May not be needed in DB long-term
duration: videoDuration,
});
// Add the first job to the queue!
await addQueueItem(QUEUE_EVENTS.VIDEO_UPLOADED, payload); // Start the chain
};
export default initiateVideoProcessing;
The BullMQ Workforce:
BullMQ manages our jobs.
-
Queue Setup (
addJobToQueue.ts
): Defines the queues based on event names.
// video-conversion/src/queues/addJobToQueue.ts import { Queue } from 'bullmq'; import { ALL_EVENTS as QUEUE_EVENTS } from '../constant/events'; import { RedisClient } from '../shared/redis'; // Redis connection export const queues = Object.values(QUEUE_EVENTS).map((queueName) => { return { name: queueName, queueObj: new Queue(queueName, { connection: RedisClient.redisConnection }), }; }); export const addQueueItem = async (queueName, item) => { const queue = queues.find((q) => q.name === queueName); if (!queue) throw new Error(`Queue ${queueName} not found`); // Add job to the queue await queue.queueObj.add(queueName, item, { removeOnComplete: true, // Clean up successful jobs removeOnFail: false, // Keep failed jobs for inspection }); };
-
Worker Setup (
jobWorker.ts
): Listens for jobs and calls the right handler.
// video-conversion/src/worker/jobWorker.ts import { Worker } from 'bullmq'; import { QUEUE_EVENTS } from '../constant/events'; import { QUEUE_EVENT_HANDLERS } from '../processor/videoEventHandler'; // Maps event name to function import { RedisClient } from '../shared/redis'; import { logger } from '../shared/logger'; export const listenQueueEvent = (queueName) => { const worker = new Worker( queueName, async (job) => { // Job processor function const handler = QUEUE_EVENT_HANDLERS[queueName]; // Find the right handler if (handler) { logger.info(`Processing job ${job.id} from queue ${queueName}`); return await handler(job); // Execute the handler } throw new Error('No handler found for queue: ' + queueName); }, { connection: RedisClient.redisConnection, concurrency: 3 } // Process up to 3 jobs concurrently ); worker.on('failed', (job, err) => { logger.error(`Job ${job?.id} failed in queue ${queueName}: ${err.message}`, err); }); logger.info(`${queueName} worker started.`); }; export const setupAllQueueEvent = () => { // Called at startup Object.values(QUEUE_EVENTS).forEach((queueName) => { listenQueueEvent(queueName); }); // videoLifecycleHandler(); // Setup lifecycle listeners (e.g., for uploads) return true; };
The Processing Pipeline (videoEventHandler.ts
):
This file contains the functions (handlers) that execute for each job type. They often add the next job in the sequence upon completion.
// video-conversion/src/processor/videoEventHandler.ts
import { Job } from 'bullmq';
import { QUEUE_EVENTS } from '../constant/events';
import { addQueueItem } from '../queues/addJobToQueue';
import processMp4ToHls from './hlsConvertProcessor';
import { generateThumbnail, processRawFileToMp4WithWatermark } from './mp4ConvertProcessor';
// ... other imports like fs, path, logger
const uploadedHandler = async (job: Job) => {
logger.info('Handler: VIDEO_UPLOADED', job.data.id);
// Simply trigger the next step
await addQueueItem(QUEUE_EVENTS.VIDEO_PROCESSING, job.data);
};
const processingHandler = async (job: Job) => { // Convert to MP4 if needed
logger.info('Handler: VIDEO_PROCESSING', job.data.id);
const filePath = job.data.path; // Local path of downloaded file
const uploadPath = `${job.data.destination}/processed`; // Output dir for MP4
const thumbnailPath = `${job.data.destination}/thumbnails`; // Output dir for thumbs
fs.mkdirSync(uploadPath, { recursive: true });
fs.mkdirSync(thumbnailPath, { recursive: true });
// Generate thumbnail (can happen in parallel or sequence)
await generateThumbnail(filePath, thumbnailPath, job.data); // Sends RabbitMQ event on completion
if (needsConversion(filePath)) { // Check if it's already mp4/webm etc.
logger.info(`Converting ${filePath} to MP4...`);
// Calls ffmpeg internally, adds VIDEO_PROCESSED job on completion
await processRawFileToMp4WithWatermark(filePath, uploadPath, {
...job.data, next: QUEUE_EVENTS.VIDEO_PROCESSED
});
} else {
logger.info(`Skipping MP4 conversion for ${filePath}.`);
// File is okay, move to next step directly
const destPath = `${uploadPath}/${path.basename(filePath)}`; // Define destination
fs.copyFileSync(filePath, destPath); // Just copy it
// Add the next job manually since we skipped the conversion function
await addQueueItem(QUEUE_EVENTS.VIDEO_PROCESSED, { ...job.data, path: destPath });
}
};
const processedHandler = async (job: Job) => { // MP4 ready, start HLS
logger.info('Handler: VIDEO_PROCESSED', job.data.id);
await addQueueItem(QUEUE_EVENTS.VIDEO_HLS_CONVERTING, job.data);
};
const hlsConvertingHandler = async (job: Job) => { // The core HLS conversion
logger.info('Handler: VIDEO_HLS_CONVERTING', job.data.id);
const hlsUploadPath = `${job.data.destination}/hls`; // Output dir for HLS files
fs.mkdirSync(hlsUploadPath, { recursive: true });
// Calls the main ffmpeg HLS logic, adds VIDEO_HLS_CONVERTED job on completion
await processMp4ToHls(job.data.path, hlsUploadPath, { // job.data.path is now the MP4 path
...job.data, next: QUEUE_EVENTS.VIDEO_HLS_CONVERTED
});
};
const hlsConvertedHandler = async (job: Job) => { // HLS done, trigger upload
logger.info('Handler: VIDEO_HLS_CONVERTED', job.data.id);
// This handler might trigger the upload of the HLS files to cloud storage
// via processHLSFile called from videoLifecycleHandler.ts
// It should also send a final notification event.
};
export const QUEUE_EVENT_HANDLERS = { // Mapping used by the worker
[QUEUE_EVENTS.VIDEO_UPLOADED]: uploadedHandler,
[QUEUE_EVENTS.VIDEO_PROCESSING]: processingHandler,
[QUEUE_EVENTS.VIDEO_PROCESSED]: processedHandler,
[QUEUE_EVENTS.VIDEO_HLS_CONVERTING]: hlsConvertingHandler,
[QUEUE_EVENTS.VIDEO_HLS_CONVERTED]: hlsConvertedHandler,
};
FFmpeg Magic (hlsConvertProcessor.ts
):
This is where the actual HLS conversion command is built and executed.
// video-conversion/src/processor/hlsConvertProcessor.ts
import ffmpegPath from 'ffmpeg-static';
import ffmpeg from 'fluent-ffmpeg';
import path from 'path';
import RabbitMQ from '../shared/rabbitMQ';
import { API_GATEWAY_EVENTS } from '../constant/events';
// ... other imports
ffmpeg.setFfmpegPath(ffmpegPath); // Tell fluent-ffmpeg where ffmpeg is
const processMp4ToHls = async (filePath, outputFolder, jobData) => {
const fileNameWithoutExt = path.basename(filePath, path.extname(filePath));
// --- Define HLS Renditions (Quality Levels) ---
// (Includes logic for vertical vs. landscape videos from your code)
const renditions = [
{ resolution: '854x480', bitrate: '800k', name: '480p' },
{ resolution: '1920x1080', bitrate: '5000k', name: '1080p' },
// Add more if needed
];
const renditionProgress = {}; // Track progress per rendition
renditions.forEach(r => renditionProgress[r.name] = 0);
let lastReportedProgress = 0;
const promises = renditions.map((rendition) => {
return new Promise<void>((resolve, reject) => {
const outputFileName = `${fileNameWithoutExt}_${rendition.name}.m3u8`;
const segmentFileName = `${outputFolder}/${fileNameWithoutExt}_${rendition.name}_%03d.ts`;
ffmpeg(filePath) // Input MP4 file
.output(path.join(outputFolder, outputFileName)) // Output M3U8 playlist for this rendition
.outputOptions([
`-s ${rendition.resolution}`, // Set resolution
`-c:v libx264`, // Video codec
`-crf 23`, // Quality (lower is better)
`-preset fast`, // Encoding speed vs compression
`-b:v ${rendition.bitrate}`, // Target video bitrate
`-g 48`, // Keyframe interval
`-hls_time 10`, // Segment duration (seconds)
`-hls_playlist_type vod`, // Video on Demand playlist
`-hls_flags independent_segments`,
`-hls_segment_filename ${segmentFileName}`, // Naming pattern for TS files
])
.on('start', (cmd) => logger.info(`FFmpeg started for ${rendition.name}: ${cmd}`))
.on('progress', (progress) => { // --- Progress Reporting ---
renditionProgress[rendition.name] = progress.percent || 0;
const totalProgress = Object.values(renditionProgress).reduce((a, b) => a + b, 0);
const overallProgress = Math.round(totalProgress / renditions.length);
// Report progress in ~10% increments
if (overallProgress > 0 && overallProgress - lastReportedProgress >= 10) {
lastReportedProgress = overallProgress;
RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSING, {
userId: jobData.userId,
status: 'processing',
progress: overallProgress,
fileName: fileNameWithoutExt,
/* ... */
});
}
})
.on('end', () => {
logger.info(`FFmpeg finished for ${rendition.name}`);
resolve();
})
.on('error', (err) => {
errorLogger.error(`FFmpeg error for ${rendition.name}:`, err);
RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_FAILED, { /* ... failure data */ });
reject(err);
})
.run();
});
});
await Promise.all(promises); // Wait for all renditions
// --- Create Master Playlist ---
// (Code to generate the master manifest referencing all rendition playlists)
// fs.writeFileSync(path.join(outputFolder, `${fileNameWithoutExt}_master.m3u8`), masterPlaylistContent);
logger.info('HLS conversion completed for:', fileNameWithoutExt);
// Send final success notification
RabbitMQ.sendToQueue(API_GATEWAY_EVENTS.NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSED, {
userId: jobData.userId, status: 'completed', /* ... */
});
// Add the next job (e.g., upload HLS files) or mark as complete
// This is handled by videoLifecycleHandler in your code
// which listens for QUEUE_EVENTS.VIDEO_HLS_CONVERTED and calls processHLSFile
};
Uploading Processed Files:
The videoLifecycleHandler.ts
listens for the QUEUE_EVENTS.VIDEO_HLS_CONVERTED
event emitted by BullMQ (when the hlsConvertedHandler
completes). It then calls processHLSFile.ts
, which in turn likely uses uploadToCloud.ts
to upload the generated .m3u8
playlists and .ts
segment files back to your cloud storage.
4. Tying it Together: RabbitMQ & Real-time Updates
We saw RabbitMQ used everywhere!
- API Gateway -> API Server (Save Metadata)
- API Gateway -> Video Conversion (Start Processing)
- Video Conversion -> API Gateway (Progress/Status Updates)
- Video Conversion -> API Server (Update Metadata like duration, status)
The key for real-time updates is:
- Video Conversion sends progress events (e.g.,
NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSING
) to RabbitMQ, including theuserId
. - API Gateway consumes these events from RabbitMQ.
- Using Socket.io, the API Gateway finds the WebSocket connection associated with that
userId
and emits the progress data directly to the connected client's browser.
// api-gateway/src/socket/index.ts (Conceptual Example)
import { Server as SocketIOServer, Socket } from 'socket.io';
import http from 'http';
import RabbitMQ from '../shared/rabbitMQ';
import { API_GATEWAY_EVENTS } from '../constants/event'; // Or NOTIFY_EVENTS
let io: SocketIOServer;
const users: Map<string, string> = new Map(); // Map userId to socketId
export const initSocketIO = (server: http.Server) => {
io = new SocketIOServer(server, { cors: { /* ... */ } });
io.on('connection', (socket: Socket) => {
console.log('A user connected:', socket.id);
// Store user association (needs authentication, e.g., JWT verification on connect)
const userId = getUserIdFromSocket(socket); // Implement this securely
if (userId) {
users.set(userId, socket.id);
console.log(`User ${userId} mapped to socket ${socket.id}`);
}
socket.on('disconnect', () => {
console.log('User disconnected:', socket.id);
// Clean up user mapping
users.forEach((socketId, uId) => {
if (socketId === socket.id) {
users.delete(uId);
}
});
});
});
// Listen for RabbitMQ progress events
listenForProgressUpdates();
return io;
};
const listenForProgressUpdates = async () => {
await RabbitMQ.consume(
API_GATEWAY_EVENTS.NOTIFY_EVENTS_VIDEO_BIT_RATE_PROCESSING, // Example event
(msg, ack) => {
try {
const data = JSON.parse(msg.content.toString());
const userId = data.userId;
const socketId = users.get(userId); // Find the user's socket
if (socketId && io) {
console.log(`Sending progress to user ${userId} (socket ${socketId})`);
// Emit event directly to the specific user's socket
io.to(socketId).emit('video_progress', data);
} else {
console.log(`Socket not found for user ${userId}, unable to send progress.`);
}
ack();
} catch (error) {
console.error("Error processing progress update:", error);
ack(); // Ack to avoid requeue loops
}
}
);
// ... listen for other notification events ...
};
// Helper to securely get userId from socket handshake/query
function getUserIdFromSocket(socket: Socket): string | null {
// IMPORTANT: Implement proper authentication here!
// E.g., verify JWT passed in handshake query or auth object
return socket.handshake.query.userId as string || null;
}
export { io }; // Export for potential use elsewhere
Docker Installation
The easiest way to run all services and their dependencies (MongoDB, RabbitMQ, Redis) is using Docker Compose. We have docker-compose.yml
files defined. Let's look at the one in the root directory:
# docker-compose.yml (Root Directory - Simplified Example)
version: "3.7"
services:
api-gateway:
build: ./api-gateway
ports:
- "8000:8000" # Example port
env_file:
- ./api-gateway/.env
depends_on:
- rabbitmq
volumes:
- ./api-gateway:/app
- /app/node_modules
api-server:
build: ./api-server
ports:
- "8001:8001" # Example port
env_file:
- ./api-server/.env
depends_on:
- mongodb
- rabbitmq
volumes:
- ./api-server:/app
- /app/node_modules
video-conversion:
build: ./video-conversion
ports:
- "8002:8002" # Example port
env_file:
- ./video-conversion/.env
depends_on:
- redis
- rabbitmq
volumes:
- ./video-conversion:/app
- ./video-conversion/uploads:/app/uploads # Mount uploads volume
- /app/node_modules
mongodb:
image: mongo:latest
ports:
- "27017:27017"
volumes:
- mongo-data:/data/db
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # For AMQP
- "15672:15672" # For Management UI
redis:
image: redis:latest
ports:
- "6379:6379"
volumes:
mongo-data:
To start everything:
docker-compose up -d --build
This command builds the images for our services (if they don't exist) and starts all containers in the background.
5. Conclusion & What's Next
We covered a lot of ground! We set up the environment, built the direct-to-cloud upload flow with pre-signed URLs, and wired up the complex video conversion pipeline using FFmpeg, BullMQ for queuing, and RabbitMQ for communication and real-time updates.
You should now have a much clearer understanding of the core mechanics.
Coming up in Part 3:
- Diving deeper into the API Server and MongoDB schema.
- Exploring deployment strategies using Docker Swarm or Kubernetes.
- Discussing error handling and monitoring.
- Potential optimizations and scaling considerations.
Part-3: Check out
Explore the full implementation code from the GitHub repository:
Do follow me on my Socials to stay updated on tech stuff:
Top comments (2)
Amazing work! This guide makes complex processes seem so manageable and clear.
My pleasure! 😊