How I transformed a monolithic Firebase notification service into a scalable queue-based architecture using NestJS and BullMQ?
When your Firebase Cloud Messaging server needs to send notifications to hundreds of thousands of users, the architecture matters more than you think. I learned this the hard way when my synchronous notification API started timing out at scale. Here's how I transformed a blocking, monolithic service into an elegant asynchronous queue-based system using BullMQ.
The Problem: Synchronous Processing at Scale
My original implementation followed a simple, intuitive pattern: receive an API request, query the database, filter users, send notifications, save logs—all in a single synchronous flow. For small datasets, this worked fine. For 100,000+ users? Disaster.
Before Refactoring: The Monolithic Approach
// firebase.controller.ts - Original synchronous implementation
@Post('send-to-conditional-users')
async sendToConditionalUsers(
@Query() query: SendConditionalNotificationDto,
@Body() body?: SendNotificationDataDto,
): Promise {
const { gender, ageMin, ageMax, auth_kind, platform_type, title, content } = query;
try {
// ❌ Problem: Everything happens synchronously in the HTTP request
// 1. Query database (could take 30+ seconds)
let offset = 0;
const BATCH_SIZE = 10000;
let allValidTokens = [];
while (true) {
const members = await this.memberRepository
.createQueryBuilder('member')
.where('member.marketing_onoff = :marketing_onoff', { marketing_onoff: 'Y' })
.orderBy('member.seq', 'ASC')
.skip(offset) // Offset pagination - slow!
.take(BATCH_SIZE)
.getMany();
if (members.length === 0) break;
// 2. Apply filters (gender, age, auth, etc.)
const filtered = members.filter(m => /* complex filtering logic */);
allValidTokens.push(...filtered.map(m => m.push_token));
offset += BATCH_SIZE;
}
// 3. Send notifications in chunks (could take minutes)
const chunks = chunkArray(allValidTokens, 500);
for (const chunk of chunks) {
await this.firebaseApp.messaging().sendEach(/* messages */);
await delay(30000); // 30 second delay between chunks
}
// 4. Save logs to database (hundreds of INSERT queries)
for (const token of allValidTokens) {
await this.pushLogRepository.save(/* log data */);
}
return CommonResponseDto.messageSendSuccess();
} catch (error) {
// By the time error occurs, HTTP client may have already timed out
throw new HttpException('Failed', HttpStatus.INTERNAL_SERVER_ERROR);
}
}
Problems with This Approach
1. HTTP Timeout Issues
- Client waits 2-5 minutes for response
- Load balancers timeout (typically 60-120 seconds)
- Users see "Request timeout" errors even when processing continues
2. Zero Observability
- Can't track job progress
- No way to retry failed jobs
- If server crashes mid-processing, everything is lost
3. Resource Blocking
- Thread blocked for entire duration
- Can't handle concurrent requests
- Server resources wasted on idle waiting
4. No Failure Recovery
- Database connection issues = entire job fails
- FCM rate limits = job aborts
- No automatic retry mechanism
Real-world impact:
- Average request time: 3-4 minutes
- Timeout rate: 35%
- Failed notifications on retry: impossible (no job tracking)
The Solution: Queue-Based Architecture with BullMQ
The key insight: separate the API request from the actual work. The HTTP endpoint should only schedule the job, not execute it. BullMQ provides the perfect infrastructure for this pattern.
Architecture Overview
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Client │─────▶│ Controller │─────▶│ Queue │
│ (HTTP) │◀─────│ (Responds │ │ (BullMQ) │
└─────────────┘ 200ms │ immediately)│ └──────┬──────┘
└──────────────┘ │
│ async
▼
┌─────────────────┐
│ Processor │
│ (Background │
│ Worker) │
└────────┬────────┘
│
┌────────────────────────────┼────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────────┐ ┌──────────┐
│ Database │ │ Firebase │ │ Logs │
│ Query │ │ FCM │ │ (Async) │
└──────────┘ └──────────────┘ └──────────┘
Implementation: Step by Step
Step 1: Setup BullMQ Module
First, register BullMQ in your NestJS module:
// firebase.module.ts
import { BullModule } from '@nestjs/bullmq';
@Global()
@Module({
imports: [
// Register BullMQ queue
BullModule.registerQueue({
name: 'push-message-queue'
}),
// Your existing imports
TypeOrmModule.forFeature([Member, PushNotificationLog], 'mssqlConnection'),
RedisModule,
],
providers: [
FirebaseService,
FirebaseProcessor, // ← New: Background worker
// ... other providers
],
controllers: [FirebaseController],
})
export class FirebaseModule {}
Why BullMQ?
- Built on Redis (fast, reliable)
- Automatic retry with exponential backoff
- Job progress tracking
- Priority queues
- Rate limiting built-in
- Web UI for monitoring (
@bull-board/express)
Step 2: Refactor Controller (Job Scheduler)
The controller's job is now simple: validate input, create a job ID, and queue the job.
// firebase.controller.ts - Refactored version
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { v4 as uuidv4 } from 'uuid';
@Controller('message')
export class FirebaseController {
constructor(
private readonly firebaseService: FirebaseService,
@InjectQueue('push-message-queue') private readonly pushQueue: Queue
) {}
@Post('send-to-conditional-users')
async sendConditionalNotifications(
@Query() query: SendMultiNotificationDto,
@Body() body?: SendDataDto,
): Promise {
// Generate unique job ID
const jobId = `conditional-${uuidv4()}`;
try {
// Check for duplicate jobs (idempotency)
const existingJob = await this.pushQueue.getJob(jobId);
if (existingJob) {
console.log(`Job ${jobId} already exists in queue`);
return CommonResponseDto.messageSendSuccess();
}
// Prepare job data
const jobData: ConditionalNotificationParams = {
...query,
jobId,
chunkSize: query.chunkSize ?? 500,
chunkDelay: query.chunkDelay ?? 2000,
data: body || {},
};
// ✅ Add job to queue (non-blocking)
await this.pushQueue.add('send-conditional-notification', jobData, {
jobId,
removeOnComplete: true, // Auto-cleanup on success
removeOnFail: false, // Keep failed jobs for debugging
});
// ✅ Respond immediately (under 200ms)
return CommonResponseDto.messageSendSuccess();
} catch (error) {
console.error('[Controller] Error adding job to queue:', error);
throw new HttpException(
CommonResponseDto.messageSendFailure(),
HttpStatus.INTERNAL_SERVER_ERROR,
);
}
}
}
Key Improvements:
- Fast Response: Returns in ~100-200ms instead of 3+ minutes
-
Idempotency:
existingJobcheck prevents duplicate processing -
Job Tracking: UUID-based
jobIdenables monitoring - Error Isolation: Queue failures don't crash the API
Step 3: Create Background Processor (Worker)
The processor handles the actual work asynchronously:
// firebase.processor.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Worker, Job } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class FirebaseProcessor implements OnModuleInit {
private worker: Worker;
constructor(
private readonly firebaseService: FirebaseService,
private readonly configService: ConfigService,
@InjectQueue('push-message-queue') private readonly pushQueue: Queue,
) {}
onModuleInit() {
// Create BullMQ worker
this.worker = new Worker(
'push-message-queue',
async (job: Job) => {
const { name, data } = job;
try {
switch (name) {
case 'send-conditional-notification': {
console.log(`[Worker] Job ${data.jobId} - Started`);
// Execute the actual notification logic
const isSent = await this.firebaseService.sendConditionalNotifications({
...data
});
if (!isSent) {
console.warn(`[Worker] Job ${data.jobId} - Partial failures occurred`);
}
console.log(`[Worker] Job ${data.jobId} - Completed`);
break;
}
default:
throw new Error(`Unknown job type: ${name}`);
}
} catch (error) {
console.error(`[Worker] Job ${name} failed:`, error);
throw error; // BullMQ will handle retry
}
},
{
concurrency: 20, // Process up to 20 jobs concurrently
connection: {
host: this.configService.get('REDIS_HOST'),
port: this.configService.get('REDIS_PORT'),
password: this.configService.get('REDIS_PASSWORD'),
},
}
);
// Event handlers
this.worker.on('completed', (job) => {
console.log(`[Worker] Job ${job.id} completed successfully`);
});
this.worker.on('failed', (job, error) => {
console.error(`[Worker] Job ${job?.id} failed:`, error);
// Could implement Dead Letter Queue (DLQ) here
});
}
}
Processor Features:
-
Concurrency Control:
concurrency: 20allows parallel processing - Automatic Retry: BullMQ retries failed jobs with exponential backoff
- Event Handlers: Track job lifecycle (completed, failed, progress)
- Graceful Shutdown: Workers drain jobs on SIGTERM
Step 4: Simplify Service (Business Logic Only)
The service now focuses purely on business logic:
// firebase.service.ts - Simplified version
@Injectable()
export class FirebaseService {
private readonly DB_FETCH_PAGE_SIZE = 10000;
private readonly PAGE_FETCH_DELAY_MS = 100;
constructor(
@Inject('FIREBASE_ADMIN') private readonly firebaseApp: admin.app.App,
@InjectRepository(Member, 'mssqlConnection')
private readonly memberRepository: Repository,
@InjectRepository(PushNotificationLog, 'mssqlConnection')
private readonly pushNotificationLog: Repository,
) {}
async sendConditionalNotifications(
jobData: ConditionalNotificationParams
): Promise {
console.log(`[Service] Job ${jobData.jobId} - Starting notification process`);
try {
const tokenToSeqMap = new Map();
let lastSeq = 0;
let totalDbRecords = 0;
// ===== Database Query Phase =====
while (true) {
let queryBuilder = this.memberRepository
.createQueryBuilder('member')
.select(['member.seq', 'member.push_token']);
// Apply filters (gender, age, platform, etc.)
queryBuilder = applyMemberFilters(queryBuilder, jobData);
// Cursor-based pagination
queryBuilder = queryBuilder
.andWhere('member.seq > :lastSeq', { lastSeq })
.orderBy('member.seq', 'ASC')
.take(this.DB_FETCH_PAGE_SIZE);
let membersInPage = await queryBuilder.getMany();
if (membersInPage.length === 0) break;
const lastSeqInCurrentPage = membersInPage[membersInPage.length - 1].seq;
totalDbRecords += membersInPage.length;
// Memory-based filtering (if needed)
if (jobData.daysSinceLastLoginMin !== undefined ||
jobData.daysSinceLastLoginMax !== undefined) {
membersInPage = LoginDateFilter(membersInPage, jobData);
}
// Collect unique tokens
for (const member of membersInPage) {
if (!tokenToSeqMap.has(member.push_token)) {
tokenToSeqMap.set(member.push_token, member.seq);
}
}
lastSeq = lastSeqInCurrentPage;
await delay(this.PAGE_FETCH_DELAY_MS);
}
const pushTokens = Array.from(tokenToSeqMap.keys());
console.log(`[Service] Job ${jobData.jobId} - Found ${pushTokens.length} unique tokens`);
if (pushTokens.length === 0) {
return true;
}
// ===== Notification Sending Phase =====
const chunkSize = jobData.chunkSize ?? 500;
const chunkDelay = jobData.chunkDelay ?? 2000;
const chunks = chunkArray(pushTokens, chunkSize);
const sanitizedData: Record = {};
if (jobData.data) {
Object.entries(jobData.data).forEach(([key, val]) => {
if (val !== null && val !== undefined) {
sanitizedData[key] = String(val);
}
});
}
let totalSent = 0;
let totalFailed = 0;
let allSent = true;
const logSavePromises: Promise[] = [];
// Process each chunk
for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
const chunk = chunks[chunkIndex];
try {
if (chunkIndex > 0) {
await delay(chunkDelay);
}
console.log(`[Service] Job ${jobData.jobId} - Sending chunk ${chunkIndex + 1}/${chunks.length}`);
const messaging = this.firebaseApp.messaging();
const messages = chunk.map((token) => ({
token,
notification: {
title: jobData.title,
body: jobData.content
},
data: sanitizedData,
}));
// Send via FCM
const response = await messaging.sendEach(messages);
// ===== Async Log Saving =====
const logPromise = savePushNotificationLogs(
this.pushNotificationLog,
jobData,
messages,
response,
tokenToSeqMap,
chunkIndex,
).catch(error => {
console.error(`[Service] Chunk ${chunkIndex + 1} log error:`, error);
});
logSavePromises.push(logPromise);
totalSent += response.successCount;
totalFailed += response.failureCount;
if (response.failureCount > 0) {
allSent = false;
}
} catch (error) {
allSent = false;
totalFailed += chunk.length;
console.error(`[Service] Chunk ${chunkIndex + 1} error:`, error);
}
}
// Wait for all logs to save
await Promise.all(logSavePromises);
console.log(`[Service] Job ${jobData.jobId} - Completed: ${totalSent} sent, ${totalFailed} failed`);
return allSent;
} catch (error) {
console.error(`[Service] Job ${jobData.jobId} - Fatal error:`, error);
throw error;
}
}
}
Service Improvements:
- No HTTP concerns (timeout, request lifecycle)
- Can run for hours if needed
- Errors are logged but don't crash the API
- Progress can be tracked via logs or job events
Performance Results
After deploying the queue-based architecture:
| Metric | Before (Sync) | After (Queue) | Improvement |
|---|---|---|---|
| API response time | 180-240s | 0.1-0.2s | 1,000x faster |
| Timeout rate | 35% | 0% | 100% fixed |
| Concurrent requests | 1-2 | 100+ | 50x scale |
| Failed job recovery | Manual | Automatic | 0 ops effort |
| Job observability | None | Full tracking | ∞ better |
| Resource utilization | 85% idle wait | 5% idle wait | 17x efficient |
Real-world impact:
- Users get instant confirmation (200ms response)
- Notification jobs complete in background
- Failed jobs auto-retry without manual intervention
- System handles 50x more concurrent requests
Key Implementation Patterns
Pattern 1: Job Idempotency
Prevent duplicate job execution:
const jobId = `conditional-${uuidv4()}`;
// Check if job already exists
const existingJob = await this.pushQueue.getJob(jobId);
if (existingJob) {
console.log(`Job ${jobId} already queued`);
return CommonResponseDto.messageSendSuccess();
}
// Add with unique ID
await this.pushQueue.add('job-name', jobData, { jobId });
Pattern 2: Job Progress Tracking
Update job progress from within the worker:
await job.updateProgress(50); // 50% complete
Monitor via Bull Board UI or custom API.
Pattern 3: Retry with Exponential Backoff
Configure automatic retries:
await this.pushQueue.add('job-name', jobData, {
jobId,
attempts: 3, // Retry up to 3 times
backoff: {
type: 'exponential',
delay: 5000, // 5s, then 25s, then 125s
},
});
Pattern 4: Rate Limiting
Prevent overwhelming downstream services:
await this.pushQueue.add('job-name', jobData, {
jobId,
rateLimiter: {
max: 100, // 100 jobs
duration: 1000, // per second
},
});
Trade-offs and Considerations
When to Use Queue-Based Architecture
✅ Use queues when:
- Long-running tasks (>5 seconds)
- High failure risk (network calls, external APIs)
- Need retry logic
- Background processing acceptable
- Resource-intensive operations
- Need horizontal scaling
❌ Don't use queues when:
- Need immediate synchronous response
- Simple, fast operations (<100ms)
- Stateless transformations
- Real-time requirements (user waiting)
Infrastructure Requirements
Before (Synchronous):
- Application server only
- Simple deployment
After (Queue-based):
- Application server
- Redis instance (for BullMQ)
- Worker processes
- Monitoring setup (Bull Board)
The additional complexity is worth it at scale.
Lessons Learned
1. Separate Concerns Early
Don't wait until you have performance problems. Separate API concerns (HTTP, validation) from business logic (processing, external calls) from day one.
2. HTTP Is Not a Queue
HTTP connections timeout. Load balancers timeout. Users close browsers. Never do long-running work in HTTP handlers.
3. Make Jobs Idempotent
Network failures, server restarts, and manual retries all mean jobs can execute multiple times. Design for it.
4. Monitor Everything
BullMQ provides events—use them. Track:
- Jobs queued per minute
- Average processing time
- Failure rate
- Queue depth
5. Test Failure Scenarios
What happens when:
- Redis goes down?
- Worker crashes mid-job?
- Database connection fails?
Have answers before production.
Conclusion
Transforming my Firebase push notification service from synchronous to queue-based architecture with BullMQ was one of the most impactful refactorings I've made. The 1,000x faster API response time and 100% elimination of timeout errors came from understanding a fundamental principle: long-running work doesn't belong in HTTP request handlers.
By leveraging BullMQ's robust job queue infrastructure, I gained:
- Instant API responses (200ms vs 3+ minutes)
- Automatic retry logic (no manual intervention)
- Horizontal scalability (add more workers)
- Full observability (track every job)
- Graceful failure handling (no data loss)
For any service processing thousands of notifications, exports, reports, or batch operations—queue-based architecture isn't just an optimization; it's a necessity for production reliability.
In Part 2 of this series, I'll explore how I broke down a 1,500-line monolithic service file into clean, reusable utility modules for better maintainability and testability.
Key Takeaways
- Separate API concerns from business logic using job queues
- BullMQ provides production-ready job processing infrastructure
- Controller schedules jobs (fast), Processor executes jobs (async)
- Make jobs idempotent and implement proper retry logic
- Monitor queue depth, processing time, and failure rates
- Queue-based architecture enables horizontal scaling
Top comments (0)