DEV Community

Shredded Mustard
Shredded Mustard

Posted on

"Listen to Yourself". Event sourcing for Domain Driven Design ... One Domain Event to Rule Them All

Event-driven architecture is evolving rapidly. Software architects have been developing event-driven architectural patterns to solve the problems that come with such a robust design. Executing a non-idempotent operation might not be as simple as publishing a message to a queue. A non-idempotent operation is an operation that changes the state of our application. There are numerous patterns that dictate when and how to publish a non-idempotent event. When a "Fire and Forget" event happens, we must be careful in dealing with the side effects.

Image description

One problem to deal with is the Dual-Write problem; Writing to two external systems in one transactional context. Often times a process involves writing to the database and then publishing an event to a message broker like Kafka. For example, in Event-Sourcing, after adding or updating a an entity to the database, a message is published to advertise the new state of the application.

This problem can be solved programmatically by implementing rollback mechanisms or by applying patterns like the Outbox pattern for example. The Listen to Yourself pattern is also one of the solutions to such a problem, and it solves it in a very simple way.

Instead of updating the database and then publishing an event, we only publish the event. The same component that publishes the event consumes the published event; hence the name "Listen to Yourself". In other words the component responsible for doing the operation listens to its own topic and consumes the messages that it sent to update its database. This means that when the event is published, the database update is still pending and may or may not happen.

Listen to yourself

Before diving into this, make sure you know what Event-driven architecture is and some common patterns. You can read about it here Event Driven Architectural patterns.

The "Listen to yourself" pattern adds the event publisher to the List of event consumers. You heard it right. The same component that is responsible for updating the database and firing the event becomes one of the event consumers. In other words, the publisher listens to itself. However, the database update will be postponed to after firing the event.

Why is this useful?

  1. Minimize side effects
    Executing all side effects concurrently minimizes the problems that would arise from a faulty message broker. If for example the message broker failed to write the event to the message queue for any reason, there will be no event and therefore no side effects as opposed to writing in the database first and then firing the event. This is useful for event-sourcing which treats an event as state change. If there is no event, there is no state change.

  2. Easy replay implementation
    Event replays in event-sourcing is a very useful technique. If you want to implement a replay for the primary domain, it will be as easy as editing the primary domain consumer implementation, or extending its behavior. This way, we only update the database in the consumer implementation.

  3. Separation of concerns
    When you structure your domain model carefully, the domain event becomes a single source of truth that can provide great consistency across your services. This means that if you want to move some subdomains to a separate services, it can be done in a very short time and the logic will not change as the subdomains are already embedded in the domain event. You can then easily replay all domain events from the new services and tada; You have a new consistent subdomain database. This is the most important aspect of the "Listen to Yourself" pattern and arguably the part where you should spend most of your time contemplating.

Listen to Yourself in practice

Consider an e-commerce store that allows the user to acquire free $10 in credits as soon as they sign up using a gift card.

Whenever a new user is created, $10 is added to his wallet.

In this scenario we have two domains.

The first domain is the "User".
The second domain is the "Gift".

The "Wallet" is a subdomain of the User.

Before you dive in make sure you have:

  1. Java 11 or above installed on your machine
  2. Apache Maven installed
  3. Running instances of Kafka and MongoDB (We will use Docker compose for running both instances so if you have docker installed you do not need to install anything else)
  4. Any IDE or Text Editor

You will find the full code source here https://github.com/Shredded-Mustard/ltys-user-svc

Let us take a look at our dependencies which we will use our pom.xml file to define

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
            <version>3.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
Enter fullscreen mode Exit fullscreen mode

We will be Using the Spring framework's Web package along with Spring-Kafka.


Our controller will look like this

package com.noketchup.shop.user.controller;

import com.noketchup.shop.user.controller.dto.UserRequest;
import com.noketchup.shop.user.controller.dto.UserResponse;
import com.noketchup.shop.user.domain.UserDomainModel;
import com.noketchup.shop.user.domain.WalletDomainModel;
import com.noketchup.shop.user.producer.UserProducer;
import com.noketchup.shop.user.service.UserService;
import com.noketchup.shop.user.service.WalletService;
import com.noketchup.shop.user.service.mapper.UserMapper;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;

@RestController
@RequiredArgsConstructor
@RequestMapping("user")
public class UserController {

  private final UserService userService;
  private final WalletService walletService;
  private final UserProducer userProducer;
  private final UserMapper mapper;

  @GetMapping
  public ResponseEntity<UserResponse> getUserByUniqueIdentifier(
          @RequestParam(value = "email", required = false) String email,
          @RequestParam(value = "phoneNumber", required = false) String phoneNumber,
          @RequestParam(value = "username", required = false) String username
  ) {
    UserResponse userResponse = userService.getUserByUniqueParam(email, phoneNumber, username);
    return ResponseEntity.ok(userResponse);
  }

  @PostMapping
  public ResponseEntity<String> createNewUser(@RequestBody @Valid UserRequest userRequest) {
    //Validate all fields before taking any action
    userService.validateUser(userRequest);

    //Map Request to the Domain Model
    UserDomainModel userDomainModelObject = mapper.mapToUserDomain(userRequest);
    WalletDomainModel wallet = walletService.createNewEmptyWalletDomainObject();
    userDomainModelObject.setWallets(List.of(wallet));

    //Send Domain event
    userProducer.sendUserDomainEvent(userDomainModelObject);
    return ResponseEntity.accepted().body("User creation in progress");
  }
}
Enter fullscreen mode Exit fullscreen mode

We have two main endpoints.

The GET endpoint gets a single user by email
The POST endpoint Adds a new User to the database


Our producer interface needs only to have one method which is produceUserDomainEvent

package com.noketchup.shop.user.producer;


import com.noketchup.shop.user.domain.UserDomainModel;

public interface UserProducer {
  void sendUserDomainEvent(UserDomainModel userDomainModelObject);
}
Enter fullscreen mode Exit fullscreen mode

We also have to have a Consumer Interface which should also have just one method consumeUserDomainEventModel

package com.noketchup.shop.user.consumer;

import com.noketchup.shop.user.domain.UserDomainModel;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public interface UserConsumer {
  void consumeUserDomainEventModel(ConsumerRecord<String, UserDomainModel> userDomainModel);
}
Enter fullscreen mode Exit fullscreen mode

We also have to implement both interfaces like this

package com.noketchup.shop.user.producer;

import com.noketchup.shop.user.domain.UserDomainModel;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class UserProducerImpl implements UserProducer {
  @Value("${config.user.domain.topic}")
  private String userDomainTopic;


  private final KafkaTemplate<String, UserDomainModel> userDomainKafkaTemplate;

  public void sendUserDomainEvent(UserDomainModel userDomainModelObject) {
    ProducerRecord<String, UserDomainModel> record = new ProducerRecord<String, UserDomainModel>(
            userDomainTopic,
            userDomainModelObject.getId().toString(),
            userDomainModelObject
    );

    record.headers().add(new DomainEventHeader("NAME", "CREATE_NEW_USER"));
    userDomainKafkaTemplate.send(record);
  }

  public static class DomainEventHeader implements Header {

    private final String key;
    private final String value;

    public DomainEventHeader(String key, String value){
      this.key = key;
      this.value = value;
    }

    @Override
    public String key() {
      return key;
    }

    @Override
    public byte[] value() {
      return value.getBytes();
    }
  }
}
Enter fullscreen mode Exit fullscreen mode
package com.noketchup.shop.user.consumer;

import com.mongodb.MongoException;
import com.noketchup.shop.user.domain.UserDomainModel;
import com.noketchup.shop.user.exception.RetryableException;
import com.noketchup.shop.user.service.UserService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;


@Component
@RequiredArgsConstructor
public class UserConsumerImpl implements UserConsumer {
  private final UserService userService;

  @Transactional
  @KafkaListener(topics = "${config.user.domain.topic}", groupId = "${spring.kafka.consumer.group-id}")
  public void consumeUserDomainEventModel(ConsumerRecord<String, UserDomainModel> userDomainModel) {
    UserDomainModel userDomainModelObject = userDomainModel.value();
    try {
      userService.commitDomain(userDomainModelObject);
    } catch (MongoException e) {
      throw new RetryableException("Retryable Exception", e.getMessage());
    }

  }
}
Enter fullscreen mode Exit fullscreen mode

And to define our Mongo Repository as well

package com.noketchup.shop.user.repository;

import com.noketchup.shop.user.model.User;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;

import java.util.Optional;
import java.util.UUID;

@Repository
public interface UserRepository extends MongoRepository<User, UUID> {
}
Enter fullscreen mode Exit fullscreen mode

Make sure to download the full code from https://github.com/Shredded-Mustard/ltys-user-svc or you can define the model and implement the User Service and Mappers in your own way.

And our properties file may look like this

spring.application.name=User Service

# Database Configuration
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=userdb
spring.data.mongodb.username=noketchupadmin
spring.data.mongodb.password=sausage
spring.data.mongodb.authentication-database = admin
spring.data.mongodb.auto-index-creation=true

# Kafka Configuration
spring.kafka.bootstrap-servers=localhost:29092
config.user.domain.topic=user

spring.kafka.consumer.group-id=@artifactId@-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
#spring.kafka.listener.ack-mode=MANUAL
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.listener.fixed-backoff.max-attempts=4
spring.kafka.listener.fixed-backoff.interval=1000
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Enter fullscreen mode Exit fullscreen mode

To run Kafka and Mongo DB we will add a Docker Compose file

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  mongo:
    image: mongo:latest
    restart: always
    ports:
      - 27017:27017
    environment:
      MONGO_INITDB_DATABASE: userdb
      MONGO_INITDB_ROOT_USERNAME: noketchupadmin
      MONGO_INITDB_ROOT_PASSWORD: sausage
    volumes:
      - ./mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
  mongo-express:
    image: mongo-express:latest
    restart: always
    ports:
      - 9081:8081
    depends_on:
      - mongo
    environment:
      ME_CONFIG_MONGODB_AUTH_USERNAME: noketchupadmin
      ME_CONFIG_MONGODB_AUTH_PASSWORD: sausage
      ME_CONFIG_MONGODB_URL: mongodb://noketchupadmin:sausage@mongo:27017/
      ME_CONFIG_MONGODB_ENABLE_ADMIN: true
Enter fullscreen mode Exit fullscreen mode

Now let us start our containers and run our application

> docker compose up
> mvn compile
> mvn exec:java -Dexec.mainClass=com.noketchup.shop.user.UserServiceApplication
Enter fullscreen mode Exit fullscreen mode

Now we can add a new User

curl --location 'http://localhost:8080/user' \
--header 'Content-Type: application/json' \
--header 'Cookie: lng=en' \
--data-raw '{
    "username": "mustard",
    "dateOfBirth": "1996-03-02",
    "mobileNumber": "0123456789",
    "email": "mustard@onthebeat.com"
}'
Enter fullscreen mode Exit fullscreen mode

We can also fetch our user like this

curl --location 'http://localhost:8080/user?email=mustard%40onthebeat.com' \
--header 'Cookie: lng=en'
Enter fullscreen mode Exit fullscreen mode

and we get

{
    "id": "74b1e501-5d5c-4e38-9257-4654c301fd34",
    "username": "mustard",
    "dateOfBirth": "1996-03-02",
    "mobileNumber": "0123456789",
    "email": "mustard@onthebeat.com"
}
Enter fullscreen mode Exit fullscreen mode

To recap

  1. We first exposed our GET and POST endpoints
  2. We created the main domain event producer and consumer
  3. We Defined our Mongo Repository
  4. We configured Kafka and Mongodb

Not only do we have our addNewUser command implemented, we also have a free Event that we can consume from any other service to hook another command to an operation chain at any point in time.

One event to Rule them all

The event is already there, all you have to do is implement your consumer

In the Gifts service we will implement our consumer the same way we did in the User Service

@Component
@RequiredArgsConstructor
public class UserConsumerImpl implements UserConsumer {
  private final UserService userService;

  @Transactional
  @KafkaListener(topics = "${config.user.domain.topic}", groupId = "${spring.kafka.consumer.group-id}")
  public void consumeUserDomainEventModel(ConsumerRecord<String, UserDomainModel> userDomainModel) {
    UserDomainModel userDomainModelObject = userDomainModel.value();
    // Add gift credits to the newly created user or DO WHATEVER YOU WANT based on that information
  }
}

Enter fullscreen mode Exit fullscreen mode

Top comments (0)