If you've ever worked on a system where "update this record" becomes a landmine — bugs appearing from nowhere, audit logs you can't trust, state that's impossible to replay — you've felt the pain that CQRS and Event Sourcing were designed to solve.
This isn't a "what is CQRS" post. There are a hundred of those. This is a how to actually implement it in NestJS, including the parts nobody talks about: the projections, the event store, the versioning headaches, and what you gain (and lose) by going all-in.
I've been using this stack in NoteCore, a local-first developer knowledge base I'm building, and the patterns here come directly from production code.
Let's go.
The Problem With Traditional CRUD
Before diving into code, let's establish why this matters.
In classic CRUD, your write and read models are the same table:
// The classic way
async updateNote(id: string, dto: UpdateNoteDto) {
return this.noteRepository.update(id, {
title: "dto.title,"
content: dto.content,
updatedAt: new Date(),
});
}
This looks fine until:
- You need to know what changed (audit trail, undo/redo)
- Your reads and writes have completely different shapes (dashboard aggregates vs. raw entities)
- You're building a distributed system where multiple services need to react to changes
- You want to replay history to rebuild state or add new projections retroactively
CQRS + Event Sourcing solves all of this — but it comes with a real cost in complexity. I'll be honest about both sides.
The Big Picture
Before any code, here's the mental model:
Command (intent)
↓
Command Handler (validates, loads aggregate, applies business logic)
↓
Domain Event (fact: "this happened")
↓
Event Store (append-only log of all events)
↓
Projections (rebuild read models from events)
↓
Query Handler (reads from projected read models)
Commands represent intent: CreateNote, ArchiveNote, AddTag.
Events are immutable facts: NoteCreated, NoteArchived, TagAdded.
The event store is your source of truth — not the DB table.
Projections are derived views built by replaying events.
The key insight: you never update state directly. You emit events, and state is always derived by replaying those events.
Project Structure
Here's how I structure a CQRS module:
src/
└── notes/
├── application/
│ ├── commands/
│ │ ├── create-note/
│ │ │ ├── create-note.command.ts
│ │ │ └── create-note.handler.ts
│ │ └── archive-note/
│ │ ├── archive-note.command.ts
│ │ └── archive-note.handler.ts
│ ├── queries/
│ │ ├── get-note/
│ │ │ ├── get-note.query.ts
│ │ │ └── get-note.handler.ts
│ │ └── list-notes/
│ │ ├── list-notes.query.ts
│ │ └── list-notes.handler.ts
│ └── events/
│ ├── note-created.event.ts
│ └── note-archived.event.ts
├── domain/
│ ├── note.aggregate.ts
│ └── note.events.ts
├── infrastructure/
│ ├── event-store/
│ │ ├── event-store.service.ts
│ │ └── stored-event.entity.ts
│ └── projections/
│ ├── note-projection.service.ts
│ └── note-read.entity.ts
└── notes.module.ts
This looks like a lot of files. It is. That's the cost. But every file has exactly one responsibility — and when something breaks, you know exactly where to look.
Part 1: The Aggregate
The aggregate is the heart of this pattern. It's where your business logic lives, and it's responsible for:
- Validating commands
- Emitting domain events
- Rebuilding its own state from a stream of events
// domain/note.aggregate.ts
import { AggregateRoot } from '@nestjs/cqrs';
import { NoteCreatedEvent } from './events/note-created.event';
import { NoteArchivedEvent } from './events/note-archived.event';
import { TagAddedEvent } from './events/tag-added.event';
export class Note extends AggregateRoot {
private id: string;
private title: string;
private content: string;
private tags: string[] = [];
private archived: boolean = false;
private version: number = 0;
// Factory method — prefer this over constructor for creation
static create(id: string, title: string, content: string, authorId: string): Note {
const note = new Note();
note.apply(new NoteCreatedEvent(id, title, content, authorId));
return note;
}
// Reconstitute from event stream (Event Sourcing)
static reconstitute(events: StoredEvent[]): Note {
const note = new Note();
for (const event of events) {
note.applyFromHistory(event);
}
return note;
}
archive(): void {
if (this.archived) {
throw new Error('Note is already archived');
}
this.apply(new NoteArchivedEvent(this.id));
}
addTag(tag: string): void {
if (this.tags.includes(tag)) {
throw new Error(`Tag "${tag}" already exists`);
}
this.apply(new TagAddedEvent(this.id, tag));
}
// Event handlers — these mutate state
// NestJS CqrsModule calls these automatically via apply()
onNoteCreatedEvent(event: NoteCreatedEvent): void {
this.id = event.noteId;
this.title = event.title;
this.content = event.content;
this.version++;
}
onNoteArchivedEvent(event: NoteArchivedEvent): void {
this.archived = true;
this.version++;
}
onTagAddedEvent(event: TagAddedEvent): void {
this.tags.push(event.tag);
this.version++;
}
getId(): string { return this.id; }
getVersion(): number { return this.version; }
}
Notice the separation:
-
Business validation happens before
apply()— inarchive()andaddTag() -
State mutation happens in
on*handlers — always in response to events - The aggregate can be rebuilt entirely from its event history via
reconstitute()
This means your Note object's state at any point in time is a pure function of the events that happened to it. That's powerful.
Part 2: Domain Events
Events are simple value objects — immutable facts. They should carry enough data to rebuild state without needing to look anything up.
// domain/events/note-created.event.ts
export class NoteCreatedEvent {
constructor(
public readonly noteId: string,
public readonly title: string,
public readonly content: string,
public readonly authorId: string,
public readonly occurredAt: Date = new Date(),
) {}
}
// domain/events/note-archived.event.ts
export class NoteArchivedEvent {
constructor(
public readonly noteId: string,
public readonly occurredAt: Date = new Date(),
) {}
}
// domain/events/tag-added.event.ts
export class TagAddedEvent {
constructor(
public readonly noteId: string,
public readonly tag: string,
public readonly occurredAt: Date = new Date(),
) {}
}
Golden rule: Once an event is persisted, it's immutable. You never update events. You never delete events. If you made a mistake, you emit a compensating event.
Part 3: The Event Store
This is the part most tutorials skip. Where do the events actually go?
You need an append-only store. Here's a PostgreSQL-backed implementation:
// infrastructure/event-store/stored-event.entity.ts
import { Entity, Column, PrimaryGeneratedColumn, CreateDateColumn, Index } from 'typeorm';
@Entity('event_store')
@Index(['aggregateId', 'version'], { unique: true })
export class StoredEvent {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column()
@Index()
aggregateId: string;
@Column()
aggregateType: string;
@Column()
eventType: string;
@Column({ type: 'jsonb' })
payload: Record<string, unknown>;
@Column()
version: number;
@CreateDateColumn()
occurredAt: Date;
}
// infrastructure/event-store/event-store.service.ts
import { Injectable, ConflictException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, DataSource } from 'typeorm';
import { StoredEvent } from './stored-event.entity';
@Injectable()
export class EventStoreService {
constructor(
@InjectRepository(StoredEvent)
private readonly eventRepo: Repository<StoredEvent>,
private readonly dataSource: DataSource,
) {}
async append(
aggregateId: string,
aggregateType: string,
events: DomainEvent[],
expectedVersion: number,
): Promise<void> {
await this.dataSource.transaction(async (manager) => {
// Optimistic concurrency check
const currentVersion = await manager
.createQueryBuilder(StoredEvent, 'e')
.where('e.aggregateId = :aggregateId', { aggregateId })
.select('MAX(e.version)', 'max')
.getRawOne();
const latestVersion = currentVersion?.max ?? -1;
if (latestVersion !== expectedVersion) {
throw new ConflictException(
`Concurrency conflict: expected version ${expectedVersion}, got ${latestVersion}`,
);
}
let version = expectedVersion + 1;
for (const event of events) {
await manager.save(StoredEvent, {
aggregateId,
aggregateType,
eventType: event.constructor.name,
payload: JSON.parse(JSON.stringify(event)),
version: version++,
});
}
});
}
async loadEvents(aggregateId: string): Promise<StoredEvent[]> {
return this.eventRepo.find({
where: { aggregateId },
order: { version: 'ASC' },
});
}
async loadEventsSince(aggregateId: string, version: number): Promise<StoredEvent[]> {
return this.eventRepo
.createQueryBuilder('e')
.where('e.aggregateId = :aggregateId', { aggregateId })
.andWhere('e.version > :version', { version })
.orderBy('e.version', 'ASC')
.getMany();
}
}
The expectedVersion parameter is critical — it gives you optimistic concurrency. If two requests try to update the same aggregate simultaneously, only one will succeed. The other gets a ConflictException and can retry.
Part 4: Commands and Handlers
Commands express user intent. A command handler loads the aggregate, calls a method, persists the events, and publishes them.
// application/commands/create-note/create-note.command.ts
export class CreateNoteCommand {
constructor(
public readonly noteId: string,
public readonly title: string,
public readonly content: string,
public readonly authorId: string,
) {}
}
// application/commands/create-note/create-note.handler.ts
import { CommandHandler, ICommandHandler, EventPublisher } from '@nestjs/cqrs';
import { CreateNoteCommand } from './create-note.command';
import { EventStoreService } from '../../infrastructure/event-store/event-store.service';
import { Note } from '../../domain/note.aggregate';
@CommandHandler(CreateNoteCommand)
export class CreateNoteHandler implements ICommandHandler<CreateNoteCommand> {
constructor(
private readonly eventStore: EventStoreService,
private readonly publisher: EventPublisher,
) {}
async execute(command: CreateNoteCommand): Promise<void> {
const { noteId, title, content, authorId } = command;
// Create the aggregate — events are buffered inside it
const note = Note.create(noteId, title, content, authorId);
// Merge the aggregate with EventPublisher so events get dispatched
const mergedNote = this.publisher.mergeObjectContext(note);
// Persist events to the store
const uncommittedEvents = mergedNote.getUncommittedEvents();
await this.eventStore.append(
noteId,
'Note',
uncommittedEvents,
-1, // -1 means "this aggregate should not exist yet"
);
// Dispatch events to NestJS event bus (for projections / sagas)
mergedNote.commit();
}
}
// application/commands/archive-note/archive-note.handler.ts
@CommandHandler(ArchiveNoteCommand)
export class ArchiveNoteHandler implements ICommandHandler<ArchiveNoteCommand> {
constructor(
private readonly eventStore: EventStoreService,
private readonly publisher: EventPublisher,
private readonly eventDeserializer: EventDeserializerService,
) {}
async execute(command: ArchiveNoteCommand): Promise<void> {
const { noteId } = command;
// Load event stream and reconstitute the aggregate
const storedEvents = await this.eventStore.loadEvents(noteId);
if (storedEvents.length === 0) {
throw new NotFoundException(`Note ${noteId} not found`);
}
const domainEvents = storedEvents.map((e) =>
this.eventDeserializer.deserialize(e),
);
const note = Note.reconstitute(domainEvents);
const mergedNote = this.publisher.mergeObjectContext(note);
// Business logic — will throw if already archived
mergedNote.archive();
// Persist new events
const uncommittedEvents = mergedNote.getUncommittedEvents();
await this.eventStore.append(
noteId,
'Note',
uncommittedEvents,
note.getVersion(),
);
mergedNote.commit();
}
}
Part 5: Projections (The Read Side)
Here's where CQRS really shines. Your read models are completely independent of your write model. They're optimized for queries, not for correctness.
// infrastructure/projections/note-read.entity.ts
import { Entity, Column, PrimaryColumn, UpdateDateColumn } from 'typeorm';
@Entity('notes_read')
export class NoteReadEntity {
@PrimaryColumn()
id: string;
@Column()
title: string;
@Column({ type: 'text' })
content: string;
@Column()
authorId: string;
@Column({ type: 'text', array: true, default: '{}' })
tags: string[];
@Column({ default: false })
archived: boolean;
@Column()
version: number;
@UpdateDateColumn()
lastProjectedAt: Date;
}
// infrastructure/projections/note-projection.service.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { NoteReadEntity } from './note-read.entity';
import { NoteCreatedEvent } from '../../domain/events/note-created.event';
import { NoteArchivedEvent } from '../../domain/events/note-archived.event';
import { TagAddedEvent } from '../../domain/events/tag-added.event';
@Injectable()
export class NoteProjectionService {
constructor(
@InjectRepository(NoteReadEntity)
private readonly readRepo: Repository<NoteReadEntity>,
) {}
@OnEvent(NoteCreatedEvent.name)
async onNoteCreated(event: NoteCreatedEvent): Promise<void> {
await this.readRepo.save({
id: event.noteId,
title: event.title,
content: event.content,
authorId: event.authorId,
tags: [],
archived: false,
version: 1,
});
}
@OnEvent(NoteArchivedEvent.name)
async onNoteArchived(event: NoteArchivedEvent): Promise<void> {
await this.readRepo.update(event.noteId, {
archived: true,
});
}
@OnEvent(TagAddedEvent.name)
async onTagAdded(event: TagAddedEvent): Promise<void> {
const note = await this.readRepo.findOneBy({ id: event.noteId });
if (!note) return;
await this.readRepo.update(event.noteId, {
tags: [...note.tags, event.tag],
});
}
}
Your read model can be denormalized however you want. Want a full-text search index? Add it here. Want a separate notes_by_tag table? Add another projection. The event stream is the source of truth — projections are just cached views.
Part 6: Queries
Queries are the simple side of CQRS. They just read from the projection:
// application/queries/get-note/get-note.query.ts
export class GetNoteQuery {
constructor(
public readonly noteId: string,
public readonly requesterId: string,
) {}
}
// application/queries/get-note/get-note.handler.ts
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { GetNoteQuery } from './get-note.query';
import { NoteReadEntity } from '../../infrastructure/projections/note-read.entity';
import { NoteDto } from '../dtos/note.dto';
@QueryHandler(GetNoteQuery)
export class GetNoteHandler implements IQueryHandler<GetNoteQuery> {
constructor(
@InjectRepository(NoteReadEntity)
private readonly readRepo: Repository<NoteReadEntity>,
) {}
async execute(query: GetNoteQuery): Promise<NoteDto | null> {
const note = await this.readRepo.findOneBy({ id: query.noteId });
if (!note) return null;
return {
id: note.id,
title: note.title,
content: note.content,
tags: note.tags,
archived: note.archived,
};
}
}
Queries have zero business logic. They just map database rows to DTOs.
Part 7: The Module
Wiring it all together:
// notes.module.ts
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { TypeOrmModule } from '@nestjs/typeorm';
// Entities
import { StoredEvent } from './infrastructure/event-store/stored-event.entity';
import { NoteReadEntity } from './infrastructure/projections/note-read.entity';
// Command Handlers
import { CreateNoteHandler } from './application/commands/create-note/create-note.handler';
import { ArchiveNoteHandler } from './application/commands/archive-note/archive-note.handler';
// Query Handlers
import { GetNoteHandler } from './application/queries/get-note/get-note.handler';
import { ListNotesHandler } from './application/queries/list-notes/list-notes.handler';
// Services
import { EventStoreService } from './infrastructure/event-store/event-store.service';
import { NoteProjectionService } from './infrastructure/projections/note-projection.service';
import { EventDeserializerService } from './infrastructure/event-store/event-deserializer.service';
const CommandHandlers = [CreateNoteHandler, ArchiveNoteHandler];
const QueryHandlers = [GetNoteHandler, ListNotesHandler];
@Module({
imports: [
CqrsModule,
TypeOrmModule.forFeature([StoredEvent, NoteReadEntity]),
],
providers: [
...CommandHandlers,
...QueryHandlers,
EventStoreService,
NoteProjectionService,
EventDeserializerService,
],
})
export class NotesModule {}
Part 8: Projection Rebuilding
One of the killer features of Event Sourcing: you can rebuild any projection from scratch at any time.
// infrastructure/projections/projection-rebuilder.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, DataSource } from 'typeorm';
import { StoredEvent } from '../event-store/stored-event.entity';
import { NoteReadEntity } from './note-read.entity';
import { EventDeserializerService } from '../event-store/event-deserializer.service';
import { NoteProjectionService } from './note-projection.service';
@Injectable()
export class ProjectionRebuilderService {
private readonly logger = new Logger(ProjectionRebuilderService.name);
constructor(
@InjectRepository(StoredEvent)
private readonly eventRepo: Repository<StoredEvent>,
@InjectRepository(NoteReadEntity)
private readonly readRepo: Repository<NoteReadEntity>,
private readonly deserializer: EventDeserializerService,
private readonly projection: NoteProjectionService,
private readonly dataSource: DataSource,
) {}
async rebuildAll(): Promise<void> {
this.logger.log('Starting full projection rebuild...');
// Wipe the read model
await this.readRepo.clear();
// Load all events, ordered by aggregate and version
const allEvents = await this.eventRepo.find({
where: { aggregateType: 'Note' },
order: { aggregateId: 'ASC', version: 'ASC' },
});
this.logger.log(`Replaying ${allEvents.length} events...`);
for (const stored of allEvents) {
const event = this.deserializer.deserialize(stored);
// Manually invoke projection handlers (bypassing the event bus)
await this.projection.handleEvent(event);
}
this.logger.log('Projection rebuild complete.');
}
}
This is something you can't do with traditional CRUD. You want to add a new read model for full-text search? Write the projection, run the rebuilder against your event history, and it's done. No data migration. No backfills. Just replay.
Part 9: Event Versioning (The Hard Part)
Here's where most tutorials end and real life begins. Events live forever — but your code doesn't. What happens when NoteCreatedEvent needs a new field?
Strategy 1: Additive changes only
Add optional fields with defaults. Old events are still valid:
export class NoteCreatedEvent {
constructor(
public readonly noteId: string,
public readonly title: string,
public readonly content: string,
public readonly authorId: string,
public readonly workspaceId: string = 'default', // ← new, optional
public readonly occurredAt: Date = new Date(),
) {}
}
Strategy 2: Upcasters for breaking changes
If you need to change the shape significantly, write an upcaster:
// infrastructure/event-store/upcasters/note-created-v1-to-v2.upcaster.ts
export class NoteCreatedV1ToV2Upcaster {
upcast(oldPayload: NoteCreatedEventV1): NoteCreatedEventV2 {
return {
...oldPayload,
workspaceId: 'default', // backfill missing field
_eventVersion: 2,
};
}
}
// infrastructure/event-store/event-deserializer.service.ts
@Injectable()
export class EventDeserializerService {
private upcasters = new Map([
['NoteCreatedEvent_v1', new NoteCreatedV1ToV2Upcaster()],
]);
deserialize(stored: StoredEvent): DomainEvent {
let payload = stored.payload;
// Apply upcasters if needed
const upcasterKey = `${stored.eventType}_v${payload._eventVersion ?? 1}`;
const upcaster = this.upcasters.get(upcasterKey);
if (upcaster) {
payload = upcaster.upcast(payload);
}
// Map to the correct event class
switch (stored.eventType) {
case 'NoteCreatedEvent':
return new NoteCreatedEvent(
payload.noteId,
payload.title,
payload.content,
payload.authorId,
payload.workspaceId,
);
case 'NoteArchivedEvent':
return new NoteArchivedEvent(payload.noteId);
default:
throw new Error(`Unknown event type: ${stored.eventType}`);
}
}
}
Versioning is a first-class concern you need to think about before you start writing events to production.
Performance: Snapshots
Loading 10,000 events to reconstitute an aggregate is slow. Snapshots solve this:
// infrastructure/snapshots/snapshot.entity.ts
@Entity('aggregate_snapshots')
export class SnapshotEntity {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column()
aggregateId: string;
@Column({ type: 'jsonb' })
state: Record<string, unknown>;
@Column()
version: number;
@CreateDateColumn()
createdAt: Date;
}
// In ArchiveNoteHandler (modified)
async execute(command: ArchiveNoteCommand): Promise<void> {
let note: Note;
let baseVersion: number;
// Try loading from snapshot first
const snapshot = await this.snapshotRepo.findOne({
where: { aggregateId: command.noteId },
order: { version: 'DESC' },
});
if (snapshot) {
note = Note.fromSnapshot(snapshot.state);
baseVersion = snapshot.version;
// Load only events since the snapshot
const newEvents = await this.eventStore.loadEventsSince(
command.noteId,
snapshot.version,
);
note.applyHistoricalEvents(newEvents.map(this.deserializer.deserialize));
} else {
const allEvents = await this.eventStore.loadEvents(command.noteId);
note = Note.reconstitute(allEvents.map(this.deserializer.deserialize));
}
// ... rest of handler
}
Take a snapshot every N events (100–500 is a reasonable range).
When NOT to Use This
I'd be doing you a disservice if I didn't say this clearly:
Don't reach for CQRS + Event Sourcing as a default.
Use it when you have:
- A genuine need for an audit trail (financial systems, compliance, undo/redo)
- Different scaling requirements for reads vs. writes
- Multiple consumers of your domain events (other services, analytics, notifications)
- Enough domain complexity that the overhead is worth it
Don't use it for:
- Simple CRUD apps (users table, settings, basic content management)
- Small teams without prior ES experience — the learning curve is steep
- Systems where eventual consistency is a hard problem (and you haven't thought it through)
The read model in CQRS is eventually consistent by default. After a command succeeds, the projection might lag by milliseconds. For most use cases this is fine. For others (like showing the user their freshly created note immediately after creation), you need to handle it explicitly — either by returning the data directly from the command handler, or by waiting for the projection to catch up.
The Full Flow: Putting It Together
Here's a controller that uses both sides:
// notes.controller.ts
import { Controller, Post, Get, Body, Param } from '@nestjs/common';
import { CommandBus, QueryBus } from '@nestjs/cqrs';
import { v4 as uuid } from 'uuid';
import { CreateNoteCommand } from './application/commands/create-note/create-note.command';
import { ArchiveNoteCommand } from './application/commands/archive-note/archive-note.command';
import { GetNoteQuery } from './application/queries/get-note/get-note.query';
@Controller('notes')
export class NotesController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Post()
async create(@Body() dto: CreateNoteDto) {
const noteId = uuid();
await this.commandBus.execute(
new CreateNoteCommand(noteId, dto.title, dto.content, dto.authorId),
);
return { id: noteId };
}
@Post(':id/archive')
async archive(@Param('id') id: string) {
await this.commandBus.execute(new ArchiveNoteCommand(id));
}
@Get(':id')
async getNote(@Param('id') id: string) {
return this.queryBus.execute(new GetNoteQuery(id, 'user'));
}
}
The controller knows nothing about aggregates, events, or projections. It dispatches commands and queries — that's it.
What I Learned Building NoteCore With This Stack
A few things I wish I'd known upfront:
1. Design your events around facts, not operations. NoteContentUpdated is better than UpdateNoteCommand. The event should describe what happened, not what was requested.
2. Keep aggregates small and focused. The bigger the aggregate, the more contention you'll have on optimistic locking. A Workspace that contains everything is a concurrency nightmare.
3. Eventual consistency is a UX problem, not just a technical one. Decide early how you'll handle it in your frontend — loading states, optimistic updates, or explicit polling.
4. The event deserializer is where you pay your technical debt. Keep it clean. It grows over time and becomes load-bearing infrastructure.
5. Projections can diverge. Have a rebuild mechanism ready from day one. You will need it.
Conclusion
CQRS and Event Sourcing aren't silver bullets. They're power tools — they make certain problems dramatically easier, and they add real complexity everywhere else.
The things you get for free: full audit history, time travel debugging, the ability to add new read models without touching existing code, horizontal scaling of reads and writes independently.
The things you pay for: more files, eventual consistency, event versioning, serialization concerns, and a steeper learning curve for new team members.
For NoteCore — where I need local-first sync, undo/redo, and the ability to add AI-powered features retroactively against the full history of user actions — it's absolutely the right call.
For a simple REST API that does CRUD on a few tables? Stick with the simple thing.
Building NoteCore in public — follow the journey on Twitter/X or check out HookScope if you work with webhooks.
If this was useful, a ❤️ helps other developers find it. Comments and questions welcome — especially if you've hit different tradeoffs in your own projects.
Top comments (0)