More than a year ago, I wrote about tactical design patterns in Domain-Driven Design (DDD).
I'm sorry. I'm not good at English.
This article is Part 4 of the series and focuses on patterns for maintaining consistency.
- Domain Layer
- Application Layer
- Presentation/Infrastructure Layer
- Consistency (this article)
GitHub Repository
https://github.com/minericefield/ddd-onion-lit
Please refer to the beginning of Part 1 for other details regarding the architecture and themes.
Transaction Management
Referring to IDDD (Implementing Domain-Driven Design), we will review the design approach for transaction management.
- Fundamentally, an aggregate defines the boundary of strong consistency.
- A single command should modify only one aggregate.
- Reading from multiple aggregates is allowed, but only one should be changed per command.
At this stage, both the aggregates and commands are properly designed. Now, we just need to ensure transactional consistency in each use case that has side effects.
Transaction control should be handled at the Application Layer.
Its role is to coordinate the scenario flow.
Letting the Application Layer take charge of things like task progression and data consistency helps increase conceptual cohesion.
Interface Design
This time, we’ve prepared the following interface:
// application/shared/repository-transactor.ts
export abstract class RepositoryTransactor {
abstract handle<T>(
manipulation: () => Promise<T>,
): ReturnType<typeof manipulation>;
}
This interface abstracts away the entire transaction process, including things like commit
and rollback
.
By accepting operations with side effects (manipulation) and executing them as a transaction, it ensures atomicity and consistency in repository operations.
There may be situations where, when an exception occurs during a transaction, we need to instruct other external drivers or different stores to rollback or retry in a different way.
To avoid the misconception that this interface guarantees consistency across all such systems, we chose to name it RepositoryTransactor.
Thread-Local Storage
The simplest way to implement a Transactor is to directly assign a reference that can identify the connection or process of the data store to a thread-local store (like in Java), which can later be accessed in the repository implementation.
We’ll use node:async_hooks for this, and the implementation would probably look like the following:
export class TypeormRepositoryTransactor implements RepositoryTransactor {
constructor(
private readonly dataSource: DataSource,
private readonly als: AsyncLocalStorage<EntityManager>,
) {}
async handle<T>(
manipulation: () => Promise<T>,
): ReturnType<typeof manipulation> {
return this.dataSource.transaction((entityManager) => {
return this.als.run(entityManager, manipulation);
});
}
}
On the other hand, there may be a need to consistently retain different types of information within the same scope throughout the entire request-response lifecycle.
In this application, we’ve decided to initialize the corresponding thread-local store at the beginning of each interaction, so that arbitrary data can be referenced and stored within it.
With that in mind, we’ll go ahead and implement the store and the Transactor.
// infrastructure/node-async-hooks/als-thread-context.ts
export class AlsThreadContext {
constructor(private readonly als: AsyncLocalStorage<Map<string, any>>) {}
run<T>(run: () => Promise<T>): Promise<T> {
const threadId = Math.random().toString(32).substring(2);
return this.als.run(new Map([['threadId', threadId]]), run);
}
get<T>(key: string): T | undefined {
return this.als.getStore().get(key);
}
set(key: string, value: any) {
this.als.getStore().set(key, value); // or enterWith
}
}
We use a Map
for the store, and while we're at it, we assign a random threadId
.
(If the store is already initialized — for example, during event handling — it might also be interesting to push the previous context like a stack.)
It's also possible to implement similar functionality more easily by simply holding the store instance using request-scoped injection without relying on node:async_hooks.
However, in our case, the store is designed to be accessible when needed, even in headless applications, without being tied to specific operations.
We hook this into middleware like so:
// app.module.ts
export class AppModule implements NestModule {
constructor(private readonly alsThreadContext: AlsThreadContext) {}
configure(consumer: MiddlewareConsumer) {
consumer
.apply((_req, _res, next) => {
this.alsThreadContext.run(next);
})
.forRoutes('*');
}
}
Transactor Implementation
We implement the Transactor using the store.
// infrastructure/mysql/typeorm/transactors/typeorm-repository-transactor.ts
export class TypeormRepositoryTransactor implements RepositoryTransactor {
constructor(
private readonly dataSource: DataSource,
private readonly alsThreadContext: AlsThreadContext,
) {}
async handle<T>(
manipulation: () => Promise<T>,
): ReturnType<typeof manipulation> {
return this.dataSource.transaction((entityManager) => {
this.alsThreadContext.set(
ENTITY_MANAGER_THREAD_CONTEXT_KEY,
entityManager,
);
return manipulation();
});
}
}
Here, we store the EntityManager in the thread-local store.
Repository Implementation
We retrieve TypeORM repositories using the EntityManager.
First, we created a helper class to get TypeORM repositories from the EntityManager:
// infrastructure/mysql/typeorm/repositories/shared/get-typeorm-repositories.ts
export default class GetTypeormRepositories {
constructor(
private readonly threadContext: AlsThreadContext,
private readonly dataSource: DataSource,
) {}
handle<T extends readonly Models[keyof Models][]>(
...models: T
): { [K in keyof T]: Repository<InstanceType<T[K]>> } {
const entityManager = this.threadContext.get<EntityManager>(
ENTITY_MANAGER_THREAD_CONTEXT_KEY,
);
/**
* map cannot recognize tuple types
*/
return models.map((model) =>
(entityManager ?? this.dataSource).getRepository(model),
) as { [K in keyof T]: Repository<InstanceType<T[K]>> };
}
}
In the actual repository implementations, we use this helper to access the TypeORM repositories:
// infrastructure/mysql/typeorm/repositories/task.typeorm-repository.ts
export class TaskTypeormRepository implements TaskRepository {
constructor(private readonly getRepositories: GetTypeormRepositories) {}
async update(task: Task) {
const [taskRepository, taskAssignmentRepository, taskCommentRepository] =
this.getRepositories.handle(
models.Task,
models.TaskAssignment,
models.TaskComment,
);
await taskRepository.update(task.id.value, { name: task.name.value });
await taskAssignmentRepository.delete({ taskId: task.id.value });
task.userId &&
(await taskAssignmentRepository.save({
taskId: task.id.value,
userId: task.userId.value,
}));
await taskCommentRepository.delete({ taskId: task.id.value });
await taskCommentRepository.save(
task.comments.map((comment) => ({
id: comment.id.value,
userId: comment.userId.value,
content: comment.content,
postedAt: comment.postedAt,
taskId: task.id.value,
})),
);
}
async find() {
const [taskRepository] = this.getRepositories.handle(models.Task);
const tasks = await taskRepository.find({
relations: {
taskAssignment: true,
taskComments: true,
},
});
return tasks.map((task) =>
Task.reconstitute(
new TaskId(task.id),
new TaskName(task.name),
task.taskComments.map(
(taskComment) =>
new Comment(
new CommentId(taskComment.id),
new UserId(taskComment.userId),
taskComment.content,
taskComment.postedAt,
),
),
task.taskAssignment?.userId && new UserId(task.taskAssignment.userId),
),
);
}
}
Use Case Implementation
We’ll wrap the use case execution with the Transactor.
// application/task/create-task.usecase.ts
export class CreateTaskUseCase {
constructor(
private readonly taskRepository: TaskRepository,
private readonly taskIdFactory: TaskIdFactory,
private readonly repositoryTransactor: RepositoryTransactor,
) {}
/**
* @throws {TaskNameCharactersExceededException}
*/
async handle(
requestDto: CreateTaskUseCaseRequestDto,
): Promise<CreateTaskUseCaseResponseDto> {
return this.repositoryTransactor.handle(async () => {
/**
* Create task.
*/
const task = Task.create(
await this.taskIdFactory.handle(),
new TaskName(requestDto.taskName),
);
/**
* Store it.
*/
await this.taskRepository.insert(task);
return new CreateTaskUseCaseResponseDto(task);
});
}
}
For now, we simply take the approach of wrapping the entire operation with the Transactor.
This guarantees that the aggregate’s state changes will be atomically persisted.
That concludes the section on transaction control.
Logging
This is a bit of a tangent, but since we’ve embedded a threadId into the store, let’s try injecting it into the logs.
It should be useful for tracing and monitoring.
// infrastructure/nestjs-common/nestjs-common-console-logger.ts
export class NestjsCommonConsoleLogger extends ConsoleLogger implements Logger {
constructor(private readonly alsThreadContext: AlsThreadContext) {
super();
}
log(message: any) {
super.log({
threadId: this.alsThreadContext.get('threadId'),
message: message,
});
}
It's quite minimal in functionality, but we’ve built a simple logger that outputs the threadId alongside the message.
Let’s insert some sample logs:
{
"threadId": "131j1fffra",
"message": "Request url: /tasks/061a7db5-2d24-416e-9564-d1813db2cb91, method: GET"
}
{
"threadId": "131j1fffra",
"message": "task found in FindTaskUseCase: task name: Review the project documentation"
}
{
"threadId": "131j1fffra",
"message": "Response time: 95ms"
}
The first and last logs are output from the presentation layer's interceptor.
The middle one is from the use case in the application layer.
Now, thanks to the threadId, we can trace the entire interaction.
Consistency Across Aggregates
Consistency between different aggregates is ensured through eventual consistency.
As an example, let’s add the following requirements and implement a simple scenario:
- When a user is newly created, onboarding tasks are assigned.
- The onboarding tasks are:
- Create your GitHub account
- Review the project documentation
Domain Event
First, we’ve prepared a domain event for user creation.
// domain/user/user-created.domain-event.ts
export class UserCreated extends DomainEvent {
constructor(readonly userId: UserId) {
super();
}
get name() {
return UserCreated.name;
}
}
Event Publishing
Event Publishing Infrastructure
For this example, we’ve chosen to use @nestjs/event-emitter as a fixed event publishing mechanism.
// domain/shared/domain-event-publisher.ts
export abstract class DomainEventPublisher {
abstract handle(...domainEvents: DomainEvent[]): void;
}
// infrastructure/nestjs-event-emitter/nestjs-event-emitter-domain-event-publisher.ts
export class NestjsEventEmitterDomainEventPublisher
implements DomainEventPublisher
{
constructor(private readonly eventEmitter: EventEmitter2) {}
handle(...domainEvents: DomainEvent[]) {
domainEvents.forEach((domainEvent) => {
this.eventEmitter.emitAsync(domainEvent.name, domainEvent);
});
}
}
In a real-world application, depending on the event, it may be necessary to transform it into a public language and send it to other contexts or external systems.
When to Generate and Publish Events
There are various options for when to publish events and how to implement it.
- Publishing events directly from entities.
- For example, when
user.changeName
is executed, it internally generates and publishes aUserChangedName
event.- In some cases, the event is generated and published via a static method inside the domain event class.
- In others, the entity simply creates the event, and passes it to a publisher.
- This approach can be combined with an ORM that supports automatic change tracking (dirty checking) via implicit copies.
- While it allows for maintaining the purity of the domain model, it also has some drawbacks:
- Performance overhead
- Difficulty in testing and debugging
- It may not work well when coordinating across multiple contexts or with asynchronous processing
- For example, when
- Storing events in entities and deferring publication until a later stage
- This approach avoids the drawbacks mentioned above.
- For example, in the dotnet sample code, domain events are published together with entity persistence.
- In the repository pattern, this means that events are published from the repository.
- In either case, both approaches align well with the pattern of accumulating events within the entity.
In our case, we’ll take a simple approach:
Let the entity accumulate events, and publish them from the application service.
- This stays true to the original responsibility of the repository.
- If, for some reason, events ever need to be published synchronously before the transaction is committed (which is generally not desirable), this approach makes it easier to support.
- There is a risk of forgetting to publish the events, but this approach makes the flow easier to trace.
Storing Domain Events
We added a minimal mechanism for storing domain events in the User aggregate root.
(There's no clearing of events implemented for now.)
// domain/shared/domain-events-storable-aggregate-root.ts
export abstract class DomainEventStorableAggregateRoot<T extends DomainEvent> {
private _events: T[] = [];
get events(): T[] {
return [...this._events];
}
protected addEvent(event: T) {
this._events = [...this._events, event];
}
}
// domain/user/user.aggregate-root.ts
export class User extends DomainEventStorableAggregateRoot<
UserFooBar | UserCreated
> {
private constructor(
readonly id: UserId,
readonly name: string,
readonly emailAddress: UserEmailAddress,
) {
super();
}
static create(id: UserId, name: string, emailAddress: UserEmailAddress) {
const user = new User(id, name, emailAddress);
user.addEvent(new UserCreated(id));
return user;
}
static reconstitute(
id: UserId,
name: string,
emailAddress: UserEmailAddress,
) {
return new User(id, name, emailAddress);
}
}
The call to user.addEvent(new UserCreated(id))
is made inside static create
.
Triggering Event Publication from the Application Service
We call domainEventPublisher from the application service:
// application/user/create-user.usecase.ts
export class CreateUserUseCase {
constructor(
private readonly userRepository: UserRepository,
private readonly userIdFactory: UserIdFactory,
private readonly userEmailAddressIsNotDuplicated: UserEmailAddressIsNotDuplicated,
private readonly repositoryTransactor: RepositoryTransactor,
private readonly domainEventPublisher: DomainEventPublisher,
) {}
/**
* @throws {InvalidUserEmailAddressFormatException}
* @throws {DuplicatedUserEmailAddressException}
*/
async handle(
requestDto: CreateUserUseCaseRequestDto,
): Promise<CreateUserUseCaseResponseDto> {
return this.repositoryTransactor.handle(async () => {
/**
* Create userEmailAddress.
*/
const userEmailAddress = new UserEmailAddress(requestDto.emailAddress);
await this.userEmailAddressIsNotDuplicated.handle(userEmailAddress);
/**
* Create user.
*/
const user = User.create(
await this.userIdFactory.handle(),
requestDto.name,
userEmailAddress,
);
/**
* Store it.
*/
await this.userRepository.insert(user);
/**
* Publish domain events.
*/
this.domainEventPublisher.handle(...user.events);
return new CreateUserUseCaseResponseDto(user);
});
}
}
Although domainEventPublisher
uses emitAsync (with asynchronous handlers)
,
you could also explicitly call it outside of repositoryTransactor
to emphasize that publishing is decoupled from the transaction.
Handling Events
In IDDD, events are received by adapters and passed into application services
(since IDDD follows the hexagonal architecture).
We’ll follow a similar approach here.
Below is the application service that creates onboarding tasks:
// application/task/create-onboarding-tasks.usecse.ts
export class CreateOnboardingTasksUseCase {
constructor(
private readonly taskRepository: TaskRepository,
private readonly userRepository: UserRepository,
private readonly createOnboardingTasks: CreateOnboardingTasks,
private readonly repositoryTransactor: RepositoryTransactor,
private readonly logger: Logger,
) {}
/**
* @throws {NotFoundApplicationException}
*/
async handle(
requestDto: CreateOnboardingTasksUseCaseRequestDto,
): Promise<CreateOnboardingTasksUseCaseResponseDto> {
return this.repositoryTransactor.handle(async () => {
/**
* Find user.
*/
const userId = new UserId(requestDto.userId);
if (!(await this.userRepository.findOneById(userId))) {
throw new NotFoundApplicationException('User not found.');
}
/**
* Create onboarding tasks.
*/
const tasks = await this.createOnboardingTasks.handle(userId);
await this.taskRepository.insertMany(tasks);
this.logger.log(`Onboarding tasks created for user id: ${userId.value}`);
return new CreateOnboardingTasksUseCaseResponseDto(tasks);
});
}
}
The onboarding tasks are generated via a domain service, and finally persisted.
Now we invoke the application service from an event listener:
// infrastructure/nestjs-event-emitter/nestjs-event-emitter-user-created-domain-event-listener.ts
export class NestjsEventEmitterUserCreatedDomainEventListener {
constructor(
private readonly createOnboardingTasksUseCase: CreateOnboardingTasksUseCase,
private readonly alsThreadContext: AlsThreadContext,
) {}
@OnEvent(UserCreated.name, { async: true, promisify: true })
async handle(userCreated: UserCreated) {
return this.alsThreadContext.run(async () => {
await this.createOnboardingTasksUseCase.handle({
userId: userCreated.userId.value,
});
});
}
}
With this setup, we’ve achieved consistency across multiple aggregates using eventual consistency.
Top comments (0)