Event-driven architecture is evolving rapidly. Software architects have been developing event-driven architectural patterns to solve the problems that come with such a robust design. Executing a non-idempotent operation might not be as simple as publishing a message to a queue. A non-idempotent operation is an operation that changes the state of our application. There are numerous patterns that dictate when and how to publish a non-idempotent event. When a "Fire and Forget" event happens, we must be careful in dealing with the side effects.
One problem to deal with is the Dual-Write problem; Writing to two external systems in one transactional context. Often times a process involves writing to the database and then publishing an event to a message broker like Kafka. For example, in Event-Sourcing, after adding or updating a an entity to the database, a message is published to advertise the new state of the application.
This problem can be solved programmatically by implementing rollback mechanisms or by applying patterns like the Outbox pattern for example. The Listen to Yourself pattern is also one of the solutions to such a problem, and it solves it in a very simple way.
Instead of updating the database and then publishing an event, we only publish the event. The same component that publishes the event consumes the published event; hence the name "Listen to Yourself". In other words the component responsible for doing the operation listens to its own topic and consumes the messages that it sent to update its database. This means that when the event is published, the database update is still pending and may or may not happen.
Listen to yourself
Before diving into this, make sure you know what Event-driven architecture is and some common patterns. You can read about it here Event Driven Architectural patterns.
The "Listen to yourself" pattern adds the event publisher to the List of event consumers. You heard it right. The same component that is responsible for updating the database and firing the event becomes one of the event consumers. In other words, the publisher listens to itself. However, the database update will be postponed to after firing the event.
Why is this useful?
Minimize side effects
Executing all side effects concurrently minimizes the problems that would arise from a faulty message broker. If for example the message broker failed to write the event to the message queue for any reason, there will be no event and therefore no side effects as opposed to writing in the database first and then firing the event. This is useful for event-sourcing which treats an event as state change. If there is no event, there is no state change.Easy replay implementation
Event replays in event-sourcing is a very useful technique. If you want to implement a replay for the primary domain, it will be as easy as editing the primary domain consumer implementation, or extending its behavior. This way, we only update the database in the consumer implementation.Separation of concerns
When you structure your domain model carefully, the domain event becomes a single source of truth that can provide great consistency across your services. This means that if you want to move some subdomains to a separate services, it can be done in a very short time and the logic will not change as the subdomains are already embedded in the domain event. You can then easily replay all domain events from the new services and tada; You have a new consistent subdomain database. This is the most important aspect of the "Listen to Yourself" pattern and arguably the part where you should spend most of your time contemplating.
Listen to Yourself in practice
Consider an e-commerce store that allows the user to acquire free $10 in credits as soon as they sign up using a gift card.
Whenever a new user is created, $10 is added to his wallet.
In this scenario we have two domains.
The first domain is the "User".
The second domain is the "Gift".
The "Wallet" is a subdomain of the User.
Before you dive in make sure you have:
- Java 11 or above installed on your machine
- Apache Maven installed
- Running instances of Kafka and MongoDB (We will use Docker compose for running both instances so if you have docker installed you do not need to install anything else)
- Any IDE or Text Editor
You will find the full code source here https://github.com/Shredded-Mustard/ltys-user-svc
Let us take a look at our dependencies which we will use our pom.xml file to define
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
We will be Using the Spring framework's Web package along with Spring-Kafka.
Our controller will look like this
package com.noketchup.shop.user.controller;
import com.noketchup.shop.user.controller.dto.UserRequest;
import com.noketchup.shop.user.controller.dto.UserResponse;
import com.noketchup.shop.user.domain.UserDomainModel;
import com.noketchup.shop.user.domain.WalletDomainModel;
import com.noketchup.shop.user.producer.UserProducer;
import com.noketchup.shop.user.service.UserService;
import com.noketchup.shop.user.service.WalletService;
import com.noketchup.shop.user.service.mapper.UserMapper;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
@RestController
@RequiredArgsConstructor
@RequestMapping("user")
public class UserController {
private final UserService userService;
private final WalletService walletService;
private final UserProducer userProducer;
private final UserMapper mapper;
@GetMapping
public ResponseEntity<UserResponse> getUserByUniqueIdentifier(
@RequestParam(value = "email", required = false) String email,
@RequestParam(value = "phoneNumber", required = false) String phoneNumber,
@RequestParam(value = "username", required = false) String username
) {
UserResponse userResponse = userService.getUserByUniqueParam(email, phoneNumber, username);
return ResponseEntity.ok(userResponse);
}
@PostMapping
public ResponseEntity<String> createNewUser(@RequestBody @Valid UserRequest userRequest) {
//Validate all fields before taking any action
userService.validateUser(userRequest);
//Map Request to the Domain Model
UserDomainModel userDomainModelObject = mapper.mapToUserDomain(userRequest);
WalletDomainModel wallet = walletService.createNewEmptyWalletDomainObject();
userDomainModelObject.setWallets(List.of(wallet));
//Send Domain event
userProducer.sendUserDomainEvent(userDomainModelObject);
return ResponseEntity.accepted().body("User creation in progress");
}
}
We have two main endpoints.
The GET endpoint gets a single user by email
The POST endpoint Adds a new User to the database
Our producer interface needs only to have one method which is produceUserDomainEvent
package com.noketchup.shop.user.producer;
import com.noketchup.shop.user.domain.UserDomainModel;
public interface UserProducer {
void sendUserDomainEvent(UserDomainModel userDomainModelObject);
}
We also have to have a Consumer Interface which should also have just one method consumeUserDomainEventModel
package com.noketchup.shop.user.consumer;
import com.noketchup.shop.user.domain.UserDomainModel;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface UserConsumer {
void consumeUserDomainEventModel(ConsumerRecord<String, UserDomainModel> userDomainModel);
}
We also have to implement both interfaces like this
package com.noketchup.shop.user.producer;
import com.noketchup.shop.user.domain.UserDomainModel;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class UserProducerImpl implements UserProducer {
@Value("${config.user.domain.topic}")
private String userDomainTopic;
private final KafkaTemplate<String, UserDomainModel> userDomainKafkaTemplate;
public void sendUserDomainEvent(UserDomainModel userDomainModelObject) {
ProducerRecord<String, UserDomainModel> record = new ProducerRecord<String, UserDomainModel>(
userDomainTopic,
userDomainModelObject.getId().toString(),
userDomainModelObject
);
record.headers().add(new DomainEventHeader("NAME", "CREATE_NEW_USER"));
userDomainKafkaTemplate.send(record);
}
public static class DomainEventHeader implements Header {
private final String key;
private final String value;
public DomainEventHeader(String key, String value){
this.key = key;
this.value = value;
}
@Override
public String key() {
return key;
}
@Override
public byte[] value() {
return value.getBytes();
}
}
}
package com.noketchup.shop.user.consumer;
import com.mongodb.MongoException;
import com.noketchup.shop.user.domain.UserDomainModel;
import com.noketchup.shop.user.exception.RetryableException;
import com.noketchup.shop.user.service.UserService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@RequiredArgsConstructor
public class UserConsumerImpl implements UserConsumer {
private final UserService userService;
@Transactional
@KafkaListener(topics = "${config.user.domain.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void consumeUserDomainEventModel(ConsumerRecord<String, UserDomainModel> userDomainModel) {
UserDomainModel userDomainModelObject = userDomainModel.value();
try {
userService.commitDomain(userDomainModelObject);
} catch (MongoException e) {
throw new RetryableException("Retryable Exception", e.getMessage());
}
}
}
And to define our Mongo Repository as well
package com.noketchup.shop.user.repository;
import com.noketchup.shop.user.model.User;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
import java.util.UUID;
@Repository
public interface UserRepository extends MongoRepository<User, UUID> {
}
Make sure to download the full code from https://github.com/Shredded-Mustard/ltys-user-svc or you can define the model and implement the User Service and Mappers in your own way.
And our properties file may look like this
spring.application.name=User Service
# Database Configuration
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=userdb
spring.data.mongodb.username=noketchupadmin
spring.data.mongodb.password=sausage
spring.data.mongodb.authentication-database = admin
spring.data.mongodb.auto-index-creation=true
# Kafka Configuration
spring.kafka.bootstrap-servers=localhost:29092
config.user.domain.topic=user
spring.kafka.consumer.group-id=@artifactId@-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
#spring.kafka.listener.ack-mode=MANUAL
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.listener.fixed-backoff.max-attempts=4
spring.kafka.listener.fixed-backoff.interval=1000
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
To run Kafka and Mongo DB we will add a Docker Compose file
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:7.3.2
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
mongo:
image: mongo:latest
restart: always
ports:
- 27017:27017
environment:
MONGO_INITDB_DATABASE: userdb
MONGO_INITDB_ROOT_USERNAME: noketchupadmin
MONGO_INITDB_ROOT_PASSWORD: sausage
volumes:
- ./mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
mongo-express:
image: mongo-express:latest
restart: always
ports:
- 9081:8081
depends_on:
- mongo
environment:
ME_CONFIG_MONGODB_AUTH_USERNAME: noketchupadmin
ME_CONFIG_MONGODB_AUTH_PASSWORD: sausage
ME_CONFIG_MONGODB_URL: mongodb://noketchupadmin:sausage@mongo:27017/
ME_CONFIG_MONGODB_ENABLE_ADMIN: true
Now let us start our containers and run our application
> docker compose up
> mvn compile
> mvn exec:java -Dexec.mainClass=com.noketchup.shop.user.UserServiceApplication
Now we can add a new User
curl --location 'http://localhost:8080/user' \
--header 'Content-Type: application/json' \
--header 'Cookie: lng=en' \
--data-raw '{
"username": "mustard",
"dateOfBirth": "1996-03-02",
"mobileNumber": "0123456789",
"email": "mustard@onthebeat.com"
}'
We can also fetch our user like this
curl --location 'http://localhost:8080/user?email=mustard%40onthebeat.com' \
--header 'Cookie: lng=en'
and we get
{
"id": "74b1e501-5d5c-4e38-9257-4654c301fd34",
"username": "mustard",
"dateOfBirth": "1996-03-02",
"mobileNumber": "0123456789",
"email": "mustard@onthebeat.com"
}
To recap
- We first exposed our GET and POST endpoints
- We created the main domain event producer and consumer
- We Defined our Mongo Repository
- We configured Kafka and Mongodb
Not only do we have our addNewUser command implemented, we also have a free Event that we can consume from any other service to hook another command to an operation chain at any point in time.
One event to Rule them all
The event is already there, all you have to do is implement your consumer
In the Gifts service we will implement our consumer the same way we did in the User Service
@Component
@RequiredArgsConstructor
public class UserConsumerImpl implements UserConsumer {
private final UserService userService;
@Transactional
@KafkaListener(topics = "${config.user.domain.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void consumeUserDomainEventModel(ConsumerRecord<String, UserDomainModel> userDomainModel) {
UserDomainModel userDomainModelObject = userDomainModel.value();
// Add gift credits to the newly created user or DO WHATEVER YOU WANT based on that information
}
}
Top comments (0)