As business applications grow and more user retention, one of the most challenging aspects is maintaining performance while serving an ever-increasing user base. In this article, we'll explore how I transformed our portfolio performance processing system from a simple linear approach to a sophisticated batching system capable of handling over 1,000,000 users efficiently.
WHERE DOES THE PROBLEM BEGINS
Initially, during the early days of this startup, our system processed portfolio performance reports cron jobs, and other transactional or async processes in a straightforward manner - iterate through all users and send their monthly summaries. This worked fine when we had thousands of users, but as we scaled to over 1,000,000 active users, several critical issues emerged
- Memory Exhaustion - Loading all users into memory simultaneously
- Database Connection Overload - Too many concurrent database queries
- Email Service Rate Limiting - Overwhelming our email provider
- Processing Timeouts - Jobs taking hours to complete
- Poor Error Recovery - One failed user could break the entire process.
THE WAY FORWARD
My approach was to implements a two-tier batching strategy that processes users in manageable chunks while distributing the load across time intervals.
import { Injectable, Logger } from '@nestjs/common';
interface User {
id: number;
email: string;
first_name: string;
last_name: string;
created_at: Date;
}
interface JobLog {
id: number;
job_name: string;
job_status: JobLogStatus;
job_response?: string;
created_at: Date;
}
enum JobLogStatus {
PENDING = 'PENDING',
IN_PROGRESS = 'IN_PROGRESS',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
}
interface TimeInterval {
from: Date;
to: Date;
}
interface BatchProcessingConfig {
batchSize: number;
delayBetweenBatches: number;
maxRetries: number;
concurrencyLimit: number;
}
@Injectable()
export class PortfolioPerformanceProcessor {
private readonly logger = new Logger(PortfolioPerformanceProcessor.name);
// Configuration for batch processing
private readonly config: BatchProcessingConfig = {
batchSize: 100,
delayBetweenBatches: 2000, // 2 seconds
maxRetries: 3,
concurrencyLimit: 5,
};
constructor(
private readonly userRepository: UserRepository,
private readonly statisticService: StatisticService,
private readonly mailService: MailService,
private readonly jobLogRepository: JobLogRepository,
private readonly loggerRepository: LoggerRepository,
) {}
/**
* Main entry point for processing monthly portfolio performance
* Uses temporal segmentation and batching for optimal performance
*/
async processMonthlyPortfolioPerformanceInBatches(): Promise<string> {
const newJobLog = await this.initializeJobLog();
try {
// Generate monthly intervals from start date to now
const intervals = this.generateProcessingIntervals();
let totalProcessed = 0;
let totalErrors = 0;
this.logger.log(`Starting portfolio processing for ${intervals.length} time intervals`);
// Process each time interval sequentially to manage load
for (const [index, interval] of intervals.entries()) {
this.logger.log(`Processing interval ${index + 1}/${intervals.length}: ${interval.from.toISOString()} to ${interval.to.toISOString()}`);
const intervalResult = await this.processTimeInterval(interval);
totalProcessed += intervalResult.processed;
totalErrors += intervalResult.errors;
// Add delay between intervals to prevent overwhelming the system
if (index < intervals.length - 1) {
await this.delay(1000);
}
}
await this.completeJobLog(newJobLog.id, totalProcessed, totalErrors);
return `Portfolio processing completed. Processed: ${totalProcessed}, Errors: ${totalErrors}`;
} catch (error) {
await this.failJobLog(newJobLog.id, error);
throw error;
}
}
/**
* Process all users within a specific time interval using cursor-based pagination
*/
private async processTimeInterval(interval: TimeInterval): Promise<{ processed: number; errors: number }> {
let lastId: number | undefined = undefined;
let totalProcessed = 0;
let totalErrors = 0;
let batch: User[];
do {
// Fetch users in batches using cursor-based pagination
batch = await this.userRepository.getUsersByDateRangeInBatches(
interval.from,
interval.to,
this.config.batchSize,
lastId,
);
if (batch.length === 0) break;
this.logger.debug(`Processing batch of ${batch.length} users`);
// Process current batch with error handling
const batchResult = await this.processBatch(batch);
totalProcessed += batchResult.processed;
totalErrors += batchResult.errors;
// Update cursor for next iteration
lastId = batch[batch.length - 1].id;
// Add delay between batches to prevent rate limiting
if (batch.length === this.config.batchSize) {
await this.delay(this.config.delayBetweenBatches);
}
} while (batch.length === this.config.batchSize);
return { processed: totalProcessed, errors: totalErrors };
}
Instead of processing all over 1 Million users at once, I segment users by their registration date into monthly intervals. This approach ensures each interval contains a manageable number of users. If one interval fails, others continue processing.
/**
* Process a single batch of users with concurrent processing and error handling
*/
private async processBatch(users: User[]): Promise<{ processed: number; errors: number }> {
const promises = users.map(user => this.processUserWithRetry(user));
const results = await Promise.allSettled(promises);
const processed = results.filter(r => r.status === 'fulfilled').length;
const errors = results.filter(r => r.status === 'rejected').length;
// Log any rejections for monitoring
results.forEach((result, index) => {
if (result.status === 'rejected') {
this.logError(`Failed to process user ${users[index].id}`, result.reason);
}
});
return { processed, errors };
}
/**
* Process individual user with retry logic
*/
private async processUserWithRetry(user: User, attempt: number = 1): Promise<void> {
try {
await this.processUser(user);
} catch (error) {
if (attempt < this.config.maxRetries) {
this.logger.warn(`Retry ${attempt}/${this.config.maxRetries} for user ${user.id}`);
await this.delay(1000 * attempt); // Exponential backoff
return this.processUserWithRetry(user, attempt + 1);
}
throw error;
}
}
/**
* Core user processing logic
*/
private async processUser(user: User): Promise<void> {
try {
// Fetch portfolio statistics
const portfolio = await this.statisticService.getWeeklyStats(user.id);
// Send email if user has valid email
if (this.isValidEmail(user.email)) {
const displayName = this.formatUserDisplayName(user);
await this.mailService.sendWeeklyPortfolioSummary(
user.email,
displayName,
portfolio,
);
}
} catch (error) {
this.logError(`Error processing portfolio for user ${user.id}`, error);
throw error;
}
}
/**
* Generate monthly intervals for processing
*/
private generateProcessingIntervals(): TimeInterval[] {
const start = new Date('2022-01-01');
const now = new Date();
return UTILITIES.generateMonthlyIntervals(start, now);
}
/**
* Initialize job logging
*/
private async initializeJobLog(): Promise<JobLog> {
return this.jobLogRepository.saveJobLog({
job_name: 'processMonthlyPortfolioPerformanceInBatches',
job_status: JobLogStatus.IN_PROGRESS,
});
}
/**
* Complete job logging with success status
*/
private async completeJobLog(jobId: number, processed: number, errors: number): Promise<void> {
await this.jobLogRepository.updateJobLog(jobId, {
job_status: JobLogStatus.COMPLETED,
job_response: `Portfolio processing completed. Processed: ${processed} users, Errors: ${errors}`,
});
}
/**
* Mark job as failed
*/
private async failJobLog(jobId: number, error: any): Promise<void> {
await this.jobLogRepository.updateJobLog(jobId, {
job_status: JobLogStatus.FAILED,
job_response: `Job failed: ${error.message}`,
});
}
/**
* Enhanced error logging with structured data
*/
private async logError(message: string, error: any): Promise<void> {
this.logger.error(message, error.stack);
await this.loggerRepository.saveLogger({
log_message: `${message}: ${error.message} - ${JSON.stringify({
stack: error.stack,
timestamp: new Date().toISOString(),
})}`,
log_level: 'ERROR',
log_path: 'src/crons/portfolio-processor.service.ts',
});
}
/**
* Utility methods
*/
private async delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
private isValidEmail(email: string): boolean {
return email && email.includes('@') && email.length > 5;
}
private formatUserDisplayName(user: User): string {
return user.first_name && user.last_name
? `${user.first_name} ${user.last_name}`
: 'There';
}
}
We process users in batches of 100, which provides the optimal balance between:
Memory usage (keeping it low)
Database connection efficiency
Processing speed
Error isolation
// Enhanced Repository with optimized queries
@Injectable()
export class UserRepository {
constructor(private readonly db: DatabaseConnection) {}
/**
* Optimized cursor-based pagination for large datasets
* Uses indexed columns for efficient querying
*/
async getUsersByDateRangeInBatches(
fromDate: Date,
toDate: Date,
batchSize: number,
lastId?: number,
): Promise<User[]> {
const query = `
SELECT id, email, first_name, last_name, created_at
FROM users
WHERE created_at >= $1
AND created_at < $2
${lastId ? 'AND id > $4' : ''}
AND email IS NOT NULL
AND email != ''
ORDER BY id ASC
LIMIT $3
`;
const params = lastId
? [fromDate, toDate, batchSize, lastId]
: [fromDate, toDate, batchSize];
return this.db.query(query, params);
}
}
// Utility class for date and batch management
export class UTILITIES {
/**
* Generate monthly intervals between two dates
*/
static generateMonthlyIntervals(start: Date, end: Date): TimeInterval[] {
const intervals: TimeInterval[] = [];
const current = new Date(start);
while (current < end) {
const from = new Date(current);
const to = new Date(current.getFullYear(), current.getMonth() + 1, 1);
intervals.push({
from,
to: to > end ? end : to,
});
current.setMonth(current.getMonth() + 1);
}
return intervals;
}
/**
* Async delay utility
*/
static delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Enhanced mail service with rate limiting and retry logic
@Injectable()
export class MailService {
private readonly logger = new Logger(MailService.name);
private emailQueue: Array<{ email: string; name: string; portfolio: any }> = [];
private isProcessingQueue = false;
/**
* Queue-based email sending to handle rate limits
*/
async sendWeeklyPortfolioSummary(
email: string,
name: string,
portfolio: any
): Promise<void> {
// Add to queue instead of immediate sending
this.emailQueue.push({ email, name, portfolio });
// Start processing if not already running
if (!this.isProcessingQueue) {
this.processEmailQueue();
}
}
/**
* Process email queue with rate limiting
*/
private async processEmailQueue(): Promise<void> {
this.isProcessingQueue = true;
while (this.emailQueue.length > 0) {
const batch = this.emailQueue.splice(0, 10); // Process 10 emails at once
const promises = batch.map(({ email, name, portfolio }) =>
this.sendEmailWithRetry(email, name, portfolio)
);
await Promise.allSettled(promises);
// Rate limiting delay
if (this.emailQueue.length > 0) {
await UTILITIES.delay(1000); // 1 second between batches
}
}
this.isProcessingQueue = false;
}
/**
* Send individual email with retry logic
*/
private async sendEmailWithRetry(
email: string,
name: string,
portfolio: any,
attempt: number = 1
): Promise<void> {
try {
// Your actual email sending logic here
await this.actualEmailSend(email, name, portfolio);
} catch (error) {
if (attempt < 3) {
this.logger.warn(`Email retry ${attempt}/3 for ${email}`);
await UTILITIES.delay(2000 * attempt);
return this.sendEmailWithRetry(email, name, portfolio, attempt + 1);
}
this.logger.error(`Failed to send email to ${email} after 3 attempts`, error.stack);
throw error;
}
}
private async actualEmailSend(email: string, name: string, portfolio: any): Promise<void> {
// Implementation depends on your email provider
// This could be SendGrid, AWS SES, etc.
}
}
// Advanced monitoring and metrics
@Injectable()
export class ProcessingMetrics {
private processingStats = {
totalUsers: 0,
processedUsers: 0,
failedUsers: 0,
averageProcessingTime: 0,
emailsSent: 0,
startTime: null as Date | null,
};
startProcessing(): void {
this.processingStats.startTime = new Date();
this.resetCounters();
}
recordUserProcessed(): void {
this.processingStats.processedUsers++;
}
recordUserFailed(): void {
this.processingStats.failedUsers++;
}
recordEmailSent(): void {
this.processingStats.emailsSent++;
}
getProcessingReport(): string {
const duration = this.processingStats.startTime
? Date.now() - this.processingStats.startTime.getTime()
: 0;
return `
Processing Report:
- Duration: ${(duration / 1000 / 60).toFixed(2)} minutes
- Users Processed: ${this.processingStats.processedUsers}
- Users Failed: ${this.processingStats.failedUsers}
- Emails Sent: ${this.processingStats.emailsSent}
- Success Rate: ${((this.processingStats.processedUsers / (this.processingStats.processedUsers + this.processingStats.failedUsers)) * 100).toFixed(2)}%
- Processing Rate: ${(this.processingStats.processedUsers / (duration / 1000 / 60)).toFixed(2)} users/minute
`;
}
private resetCounters(): void {
this.processingStats.processedUsers = 0;
this.processingStats.failedUsers = 0;
this.processingStats.emailsSent = 0;
}
}
// Database optimization utilities
export class DatabaseOptimizations {
/**
* Example of an optimized query with proper indexing
*/
static getOptimizedUserQuery(): string {
return `
-- Ensure these indexes exist for optimal performance:
-- CREATE INDEX CONCURRENTLY idx_users_created_at_id ON users(created_at, id);
-- CREATE INDEX CONCURRENTLY idx_users_email_not_null ON users(email) WHERE email IS NOT NULL;
SELECT id, email, first_name, last_name, created_at
FROM users
WHERE created_at >= $1
AND created_at < $2
AND id > COALESCE($4, 0)
AND email IS NOT NULL
AND email != ''
ORDER BY id ASC
LIMIT $3
`;
}
}
// Configuration management
export interface SystemConfig {
database: {
maxConnections: number;
queryTimeout: number;
};
email: {
rateLimit: number;
batchSize: number;
};
processing: {
userBatchSize: number;
intervalDelayMs: number;
maxConcurrentJobs: number;
};
}
export const PRODUCTION_CONFIG: SystemConfig = {
database: {
maxConnections: 20,
queryTimeout: 30000,
},
email: {
rateLimit: 100, // emails per minute
batchSize: 10,
},
processing: {
userBatchSize: 100,
intervalDelayMs: 2000,
maxConcurrentJobs: 3,
},
};
Scaling from thousands to millions of users requires fundamental changes in how we approach batch processing. The key is breaking down large problems into manageable chunks while maintaining system reliability and performance.
The batching strategy reduced our processing time from over 6 hours to under 45 minutes while improving reliability and monitoring capabilities. Most importantly, it's designed to scale further as our user base continues to grow.
Always remember, premature optimization is the root of all evil, but once you hit scale, proper optimization becomes essential for business continuity.
Top comments (0)