DEV Community

Fahim Hasnain Fahad
Fahim Hasnain Fahad

Posted on

Scale to 10M Users: CQRS in NestJS for API Performance

Scale to 10M Users: CQRS in NestJS for API Performance

In today's digital landscape, building applications that can handle millions of users requires thoughtful architecture decisions. Command Query Responsibility Segregation (CQRS) is one such pattern that helps maintain performance at scale. Let's explore!

What is CQRS and Why Use It?

CQRS splits your application into two models:

  • Command model: Handles create, update, and delete operations
  • Query model: Manages read operations

This separation addresses the reality that most applications have asymmetric read/write loads—typically with reads far outnumbering writes.

Setting Up CQRS in NestJS

First, install the required packages:

npm install @nestjs/cqrs uuid kafkajs redis mongodb pg
Enter fullscreen mode Exit fullscreen mode

Directory Structure

src/
├── commands/
│   ├── handlers/
│   ├── impl/
├── queries/
│   ├── handlers/
│   ├── impl/
├── events/
├── models/
├── controllers/
└── app.module.ts
Enter fullscreen mode Exit fullscreen mode

Basic Implementation

Let's start with our app.module.ts:

import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { MongooseModule } from '@nestjs/mongoose';
import { TypeOrmModule } from '@nestjs/typeorm';
import { CommandHandlers } from './commands/handlers';
import { QueryHandlers } from './queries/handlers';
import { EventHandlers } from './events/handlers';
import { Controllers } from './controllers';

@Module({
  imports: [
    CqrsModule,
    MongooseModule.forRoot('mongodb://localhost:27017/cqrs_read'),
    TypeOrmModule.forRoot({
      type: 'postgres',
      host: 'localhost',
      port: 5432,
      username: 'postgres',
      password: 'password',
      database: 'cqrs_write',
      entities: [__dirname + '/**/*.entity{.ts,.js}'],
      synchronize: true,
    }),
  ],
  controllers: [...Controllers],
  providers: [
    ...CommandHandlers,
    ...QueryHandlers,
    ...EventHandlers,
  ],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Now, let's define a command:

// commands/impl/create-user.command.ts
export class CreateUserCommand {
  constructor(
    public readonly email: string,
    public readonly name: string,
  ) {}
}
Enter fullscreen mode Exit fullscreen mode

And its handler:

// commands/handlers/create-user.handler.ts
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { CreateUserCommand } from '../impl/create-user.command';
import { User } from '../../models/user.entity';
import { UserCreatedEvent } from '../../events/impl/user-created.event';

@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(
    @InjectRepository(User)
    private userRepository: Repository<User>,
    private eventBus: EventBus,
  ) {}

  async execute(command: CreateUserCommand): Promise<User> {
    const { email, name } = command;

    const user = new User();
    user.email = email;
    user.name = name;

    const savedUser = await this.userRepository.save(user);

    // Publish event for read model synchronization
    this.eventBus.publish(new UserCreatedEvent(savedUser.id, email, name));

    return savedUser;
  }
}
Enter fullscreen mode Exit fullscreen mode

Query Side with MongoDB

For read operations, we'll use MongoDB for its superior query performance:

// queries/impl/get-user.query.ts
export class GetUserQuery {
  constructor(public readonly id: string) {}
}

// queries/handlers/get-user.handler.ts
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { GetUserQuery } from '../impl/get-user.query';
import { UserReadModel } from '../../models/user.read-model';

@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
  constructor(
    @InjectModel(UserReadModel.name)
    private userModel: Model<UserReadModel>,
  ) {}

  async execute(query: GetUserQuery): Promise<UserReadModel> {
    return this.userModel.findOne({ userId: query.id }).exec();
  }
}
Enter fullscreen mode Exit fullscreen mode

Synchronizing Read/Write Models with Kafka

To keep our read model updated, we'll use Kafka for event sourcing:

// events/handlers/user-created.handler.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { Inject } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { UserCreatedEvent } from '../impl/user-created.event';

@EventsHandler(UserCreatedEvent)
export class UserCreatedHandler implements IEventHandler<UserCreatedEvent> {
  constructor(
    @Inject('KAFKA_SERVICE') private kafkaClient: ClientKafka
  ) {}

  handle(event: UserCreatedEvent) {
    this.kafkaClient.emit('user-created', {
      id: event.id,
      email: event.email,
      name: event.name,
      timestamp: new Date().toISOString()
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

And our consumer service:

// services/read-model-sync.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { Consumer, Kafka } from 'kafkajs';
import { UserReadModel } from '../models/user.read-model';

@Injectable()
export class ReadModelSyncService implements OnModuleInit {
  private consumer: Consumer;

  constructor(
    @InjectModel(UserReadModel.name)
    private userModel: Model<UserReadModel>,
  ) {
    const kafka = new Kafka({
      clientId: 'read-model-sync',
      brokers: ['localhost:9092'],
    });
    this.consumer = kafka.consumer({ groupId: 'read-model-sync-group' });
  }

  async onModuleInit() {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'user-created', fromBeginning: true });

    await this.consumer.run({
      eachMessage: async ({ topic, message }) => {
        const eventData = JSON.parse(message.value.toString());

        if (topic === 'user-created') {
          await this.userModel.updateOne(
            { userId: eventData.id },
            {
              userId: eventData.id,
              email: eventData.email,
              name: eventData.name,
              updatedAt: new Date()
            },
            { upsert: true }
          );
        }
      },
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

To further enhance read performance, we can add Redis Caching for Read Queries.

Top comments (0)