DEV Community

minericefield
minericefield

Posted on

1

[DDD] Tactical Design Patterns Part 4: Consistency

More than a year ago, I wrote about tactical design patterns in Domain-Driven Design (DDD).
I'm sorry. I'm not good at English.
This article is Part 4 of the series and focuses on patterns for maintaining consistency.

  1. Domain Layer
  2. Application Layer
  3. Presentation/Infrastructure Layer
  4. Consistency (this article)

GitHub Repository

https://github.com/minericefield/ddd-onion-lit

Please refer to the beginning of Part 1 for other details regarding the architecture and themes.

Transaction Management

Referring to IDDD (Implementing Domain-Driven Design), we will review the design approach for transaction management.

  • Fundamentally, an aggregate defines the boundary of strong consistency.
  • A single command should modify only one aggregate.
    • Reading from multiple aggregates is allowed, but only one should be changed per command.

At this stage, both the aggregates and commands are properly designed. Now, we just need to ensure transactional consistency in each use case that has side effects.

Transaction control should be handled at the Application Layer.
Its role is to coordinate the scenario flow.
Letting the Application Layer take charge of things like task progression and data consistency helps increase conceptual cohesion.

Interface Design

This time, we’ve prepared the following interface:

// application/shared/repository-transactor.ts
export abstract class RepositoryTransactor {
  abstract handle<T>(
    manipulation: () => Promise<T>,
  ): ReturnType<typeof manipulation>;
}
Enter fullscreen mode Exit fullscreen mode

This interface abstracts away the entire transaction process, including things like commit and rollback.
By accepting operations with side effects (manipulation) and executing them as a transaction, it ensures atomicity and consistency in repository operations.

There may be situations where, when an exception occurs during a transaction, we need to instruct other external drivers or different stores to rollback or retry in a different way.
To avoid the misconception that this interface guarantees consistency across all such systems, we chose to name it RepositoryTransactor.

Thread-Local Storage

The simplest way to implement a Transactor is to directly assign a reference that can identify the connection or process of the data store to a thread-local store (like in Java), which can later be accessed in the repository implementation.
We’ll use node:async_hooks for this, and the implementation would probably look like the following:

export class TypeormRepositoryTransactor implements RepositoryTransactor {
  constructor(
    private readonly dataSource: DataSource,
    private readonly als: AsyncLocalStorage<EntityManager>,
  ) {}

  async handle<T>(
    manipulation: () => Promise<T>,
  ): ReturnType<typeof manipulation> {
    return this.dataSource.transaction((entityManager) => {
      return this.als.run(entityManager, manipulation);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

On the other hand, there may be a need to consistently retain different types of information within the same scope throughout the entire request-response lifecycle.
In this application, we’ve decided to initialize the corresponding thread-local store at the beginning of each interaction, so that arbitrary data can be referenced and stored within it.
With that in mind, we’ll go ahead and implement the store and the Transactor.

// infrastructure/node-async-hooks/als-thread-context.ts
export class AlsThreadContext {
  constructor(private readonly als: AsyncLocalStorage<Map<string, any>>) {}

  run<T>(run: () => Promise<T>): Promise<T> {
    const threadId = Math.random().toString(32).substring(2);

    return this.als.run(new Map([['threadId', threadId]]), run);
  }

  get<T>(key: string): T | undefined {
    return this.als.getStore().get(key);
  }

  set(key: string, value: any) {
    this.als.getStore().set(key, value); // or enterWith
  }
}
Enter fullscreen mode Exit fullscreen mode

We use a Map for the store, and while we're at it, we assign a random threadId.
(If the store is already initialized — for example, during event handling — it might also be interesting to push the previous context like a stack.)

It's also possible to implement similar functionality more easily by simply holding the store instance using request-scoped injection without relying on node:async_hooks.
However, in our case, the store is designed to be accessible when needed, even in headless applications, without being tied to specific operations.

We hook this into middleware like so:

// app.module.ts
export class AppModule implements NestModule {
  constructor(private readonly alsThreadContext: AlsThreadContext) {}

  configure(consumer: MiddlewareConsumer) {
    consumer
      .apply((_req, _res, next) => {
        this.alsThreadContext.run(next);
      })
      .forRoutes('*');
  }
}
Enter fullscreen mode Exit fullscreen mode

Transactor Implementation

We implement the Transactor using the store.

// infrastructure/mysql/typeorm/transactors/typeorm-repository-transactor.ts
export class TypeormRepositoryTransactor implements RepositoryTransactor {
  constructor(
    private readonly dataSource: DataSource,
    private readonly alsThreadContext: AlsThreadContext,
  ) {}

  async handle<T>(
    manipulation: () => Promise<T>,
  ): ReturnType<typeof manipulation> {
    return this.dataSource.transaction((entityManager) => {
      this.alsThreadContext.set(
        ENTITY_MANAGER_THREAD_CONTEXT_KEY,
        entityManager,
      );

      return manipulation();
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Here, we store the EntityManager in the thread-local store.

Repository Implementation

We retrieve TypeORM repositories using the EntityManager.

First, we created a helper class to get TypeORM repositories from the EntityManager:

// infrastructure/mysql/typeorm/repositories/shared/get-typeorm-repositories.ts
export default class GetTypeormRepositories {
  constructor(
    private readonly threadContext: AlsThreadContext,
    private readonly dataSource: DataSource,
  ) {}

  handle<T extends readonly Models[keyof Models][]>(
    ...models: T
  ): { [K in keyof T]: Repository<InstanceType<T[K]>> } {
    const entityManager = this.threadContext.get<EntityManager>(
      ENTITY_MANAGER_THREAD_CONTEXT_KEY,
    );

    /**
     * map cannot recognize tuple types
     */
    return models.map((model) =>
      (entityManager ?? this.dataSource).getRepository(model),
    ) as { [K in keyof T]: Repository<InstanceType<T[K]>> };
  }
}
Enter fullscreen mode Exit fullscreen mode

In the actual repository implementations, we use this helper to access the TypeORM repositories:

// infrastructure/mysql/typeorm/repositories/task.typeorm-repository.ts
export class TaskTypeormRepository implements TaskRepository {
  constructor(private readonly getRepositories: GetTypeormRepositories) {}

  async update(task: Task) {
    const [taskRepository, taskAssignmentRepository, taskCommentRepository] =
      this.getRepositories.handle(
        models.Task,
        models.TaskAssignment,
        models.TaskComment,
      );

    await taskRepository.update(task.id.value, { name: task.name.value });

    await taskAssignmentRepository.delete({ taskId: task.id.value });
    task.userId &&
      (await taskAssignmentRepository.save({
        taskId: task.id.value,
        userId: task.userId.value,
      }));

    await taskCommentRepository.delete({ taskId: task.id.value });
    await taskCommentRepository.save(
      task.comments.map((comment) => ({
        id: comment.id.value,
        userId: comment.userId.value,
        content: comment.content,
        postedAt: comment.postedAt,
        taskId: task.id.value,
      })),
    );
  }

  async find() {
    const [taskRepository] = this.getRepositories.handle(models.Task);
    const tasks = await taskRepository.find({
      relations: {
        taskAssignment: true,
        taskComments: true,
      },
    });
    return tasks.map((task) =>
      Task.reconstitute(
        new TaskId(task.id),
        new TaskName(task.name),
        task.taskComments.map(
          (taskComment) =>
            new Comment(
              new CommentId(taskComment.id),
              new UserId(taskComment.userId),
              taskComment.content,
              taskComment.postedAt,
            ),
        ),
        task.taskAssignment?.userId && new UserId(task.taskAssignment.userId),
      ),
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Use Case Implementation

We’ll wrap the use case execution with the Transactor.

// application/task/create-task.usecase.ts
export class CreateTaskUseCase {
  constructor(
    private readonly taskRepository: TaskRepository,
    private readonly taskIdFactory: TaskIdFactory,
    private readonly repositoryTransactor: RepositoryTransactor,
  ) {}

  /**
   * @throws {TaskNameCharactersExceededException}
   */
  async handle(
    requestDto: CreateTaskUseCaseRequestDto,
  ): Promise<CreateTaskUseCaseResponseDto> {
    return this.repositoryTransactor.handle(async () => {
      /**
       * Create task.
       */
      const task = Task.create(
        await this.taskIdFactory.handle(),
        new TaskName(requestDto.taskName),
      );

      /**
       * Store it.
       */
      await this.taskRepository.insert(task);

      return new CreateTaskUseCaseResponseDto(task);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

For now, we simply take the approach of wrapping the entire operation with the Transactor.
This guarantees that the aggregate’s state changes will be atomically persisted.
That concludes the section on transaction control.

Logging

This is a bit of a tangent, but since we’ve embedded a threadId into the store, let’s try injecting it into the logs.
It should be useful for tracing and monitoring.

// infrastructure/nestjs-common/nestjs-common-console-logger.ts
export class NestjsCommonConsoleLogger extends ConsoleLogger implements Logger {
  constructor(private readonly alsThreadContext: AlsThreadContext) {
    super();
  }

  log(message: any) {
    super.log({
      threadId: this.alsThreadContext.get('threadId'),
      message: message,
    });
  }
Enter fullscreen mode Exit fullscreen mode

It's quite minimal in functionality, but we’ve built a simple logger that outputs the threadId alongside the message.

Let’s insert some sample logs:

{
  "threadId": "131j1fffra",
  "message": "Request url: /tasks/061a7db5-2d24-416e-9564-d1813db2cb91, method: GET"
}
{
  "threadId": "131j1fffra",
  "message": "task found in FindTaskUseCase: task name: Review the project documentation"
}
{
  "threadId": "131j1fffra",
  "message": "Response time: 95ms"
}
Enter fullscreen mode Exit fullscreen mode

The first and last logs are output from the presentation layer's interceptor.
The middle one is from the use case in the application layer.
Now, thanks to the threadId, we can trace the entire interaction.

Consistency Across Aggregates

Consistency between different aggregates is ensured through eventual consistency.
As an example, let’s add the following requirements and implement a simple scenario:

  • When a user is newly created, onboarding tasks are assigned.
  • The onboarding tasks are:
    • Create your GitHub account
    • Review the project documentation

Domain Event

First, we’ve prepared a domain event for user creation.

// domain/user/user-created.domain-event.ts
export class UserCreated extends DomainEvent {
  constructor(readonly userId: UserId) {
    super();
  }

  get name() {
    return UserCreated.name;
  }
}
Enter fullscreen mode Exit fullscreen mode

Event Publishing

Event Publishing Infrastructure

For this example, we’ve chosen to use @nestjs/event-emitter as a fixed event publishing mechanism.

// domain/shared/domain-event-publisher.ts
export abstract class DomainEventPublisher {
  abstract handle(...domainEvents: DomainEvent[]): void;
}
Enter fullscreen mode Exit fullscreen mode
// infrastructure/nestjs-event-emitter/nestjs-event-emitter-domain-event-publisher.ts
export class NestjsEventEmitterDomainEventPublisher
  implements DomainEventPublisher
{
  constructor(private readonly eventEmitter: EventEmitter2) {}

  handle(...domainEvents: DomainEvent[]) {
    domainEvents.forEach((domainEvent) => {
      this.eventEmitter.emitAsync(domainEvent.name, domainEvent);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

In a real-world application, depending on the event, it may be necessary to transform it into a public language and send it to other contexts or external systems.

When to Generate and Publish Events

There are various options for when to publish events and how to implement it.

  • Publishing events directly from entities.
    • For example, when user.changeName is executed, it internally generates and publishes a UserChangedName event.
      • In some cases, the event is generated and published via a static method inside the domain event class.
      • In others, the entity simply creates the event, and passes it to a publisher.
    • This approach can be combined with an ORM that supports automatic change tracking (dirty checking) via implicit copies.
    • While it allows for maintaining the purity of the domain model, it also has some drawbacks:
      • Performance overhead
      • Difficulty in testing and debugging
      • It may not work well when coordinating across multiple contexts or with asynchronous processing
  • Storing events in entities and deferring publication until a later stage
    • This approach avoids the drawbacks mentioned above.
    • For example, in the dotnet sample code, domain events are published together with entity persistence.
      • In the repository pattern, this means that events are published from the repository.
      • In either case, both approaches align well with the pattern of accumulating events within the entity.

In our case, we’ll take a simple approach:
Let the entity accumulate events, and publish them from the application service.

  • This stays true to the original responsibility of the repository.
  • If, for some reason, events ever need to be published synchronously before the transaction is committed (which is generally not desirable), this approach makes it easier to support.
  • There is a risk of forgetting to publish the events, but this approach makes the flow easier to trace.

Storing Domain Events

We added a minimal mechanism for storing domain events in the User aggregate root.
(There's no clearing of events implemented for now.)

// domain/shared/domain-events-storable-aggregate-root.ts
export abstract class DomainEventStorableAggregateRoot<T extends DomainEvent> {
  private _events: T[] = [];

  get events(): T[] {
    return [...this._events];
  }

  protected addEvent(event: T) {
    this._events = [...this._events, event];
  }
}
Enter fullscreen mode Exit fullscreen mode
// domain/user/user.aggregate-root.ts
export class User extends DomainEventStorableAggregateRoot<
  UserFooBar | UserCreated
> {
  private constructor(
    readonly id: UserId,
    readonly name: string,
    readonly emailAddress: UserEmailAddress,
  ) {
    super();
  }

  static create(id: UserId, name: string, emailAddress: UserEmailAddress) {
    const user = new User(id, name, emailAddress);
    user.addEvent(new UserCreated(id));
    return user;
  }

  static reconstitute(
    id: UserId,
    name: string,
    emailAddress: UserEmailAddress,
  ) {
    return new User(id, name, emailAddress);
  }
}
Enter fullscreen mode Exit fullscreen mode

The call to user.addEvent(new UserCreated(id)) is made inside static create.

Triggering Event Publication from the Application Service

We call domainEventPublisher from the application service:

// application/user/create-user.usecase.ts
export class CreateUserUseCase {
  constructor(
    private readonly userRepository: UserRepository,
    private readonly userIdFactory: UserIdFactory,
    private readonly userEmailAddressIsNotDuplicated: UserEmailAddressIsNotDuplicated,
    private readonly repositoryTransactor: RepositoryTransactor,
    private readonly domainEventPublisher: DomainEventPublisher,
  ) {}

  /**
   * @throws {InvalidUserEmailAddressFormatException}
   * @throws {DuplicatedUserEmailAddressException}
   */
  async handle(
    requestDto: CreateUserUseCaseRequestDto,
  ): Promise<CreateUserUseCaseResponseDto> {
    return this.repositoryTransactor.handle(async () => {
      /**
       * Create userEmailAddress.
       */
      const userEmailAddress = new UserEmailAddress(requestDto.emailAddress);
      await this.userEmailAddressIsNotDuplicated.handle(userEmailAddress);

      /**
       * Create user.
       */
      const user = User.create(
        await this.userIdFactory.handle(),
        requestDto.name,
        userEmailAddress,
      );

      /**
       * Store it.
       */
      await this.userRepository.insert(user);

      /**
       * Publish domain events.
       */
      this.domainEventPublisher.handle(...user.events);

      return new CreateUserUseCaseResponseDto(user);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Although domainEventPublisher uses emitAsync (with asynchronous handlers),
you could also explicitly call it outside of repositoryTransactor to emphasize that publishing is decoupled from the transaction.

Handling Events

In IDDD, events are received by adapters and passed into application services
(since IDDD follows the hexagonal architecture).
We’ll follow a similar approach here.

Below is the application service that creates onboarding tasks:

// application/task/create-onboarding-tasks.usecse.ts
export class CreateOnboardingTasksUseCase {
  constructor(
    private readonly taskRepository: TaskRepository,
    private readonly userRepository: UserRepository,
    private readonly createOnboardingTasks: CreateOnboardingTasks,
    private readonly repositoryTransactor: RepositoryTransactor,
    private readonly logger: Logger,
  ) {}

  /**
   * @throws {NotFoundApplicationException}
   */
  async handle(
    requestDto: CreateOnboardingTasksUseCaseRequestDto,
  ): Promise<CreateOnboardingTasksUseCaseResponseDto> {
    return this.repositoryTransactor.handle(async () => {
      /**
       * Find user.
       */
      const userId = new UserId(requestDto.userId);
      if (!(await this.userRepository.findOneById(userId))) {
        throw new NotFoundApplicationException('User not found.');
      }

      /**
       * Create onboarding tasks.
       */
      const tasks = await this.createOnboardingTasks.handle(userId);
      await this.taskRepository.insertMany(tasks);

      this.logger.log(`Onboarding tasks created for user id: ${userId.value}`);

      return new CreateOnboardingTasksUseCaseResponseDto(tasks);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

The onboarding tasks are generated via a domain service, and finally persisted.

Now we invoke the application service from an event listener:

// infrastructure/nestjs-event-emitter/nestjs-event-emitter-user-created-domain-event-listener.ts
export class NestjsEventEmitterUserCreatedDomainEventListener {
  constructor(
    private readonly createOnboardingTasksUseCase: CreateOnboardingTasksUseCase,
    private readonly alsThreadContext: AlsThreadContext,
  ) {}

  @OnEvent(UserCreated.name, { async: true, promisify: true })
  async handle(userCreated: UserCreated) {
    return this.alsThreadContext.run(async () => {
      await this.createOnboardingTasksUseCase.handle({
        userId: userCreated.userId.value,
      });
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

With this setup, we’ve achieved consistency across multiple aggregates using eventual consistency.

Resources

Top comments (0)