Introduction
This post is a brief example to use the Outbox Design Pattern in Spring Boot and Apache Kafka.
Explanation
Here is a short explanation from the great website (microservices.io) :
The solution is for the service that sends the message to first store the message in the database as part of the transaction that updates the business entities. A separate process then sends the messages (outbox entity in JSON format) to the message broker.
Use cases
Utilize the transactional outbox pattern under the following circumstances:
- When constructing an event-driven application wherein a database modification triggers an event notification.
- When aiming to guarantee atomicity across operations involving two separate services.
- When incorporating the event sourcing pattern into your system architecture.
What do i need ?
IDE (to launch Spring Boot in local mode)
- IntelliJ IDEA
- Visual Code
Java (version 21)
- Spring Boot (3.2.3)
- Spring Kafka (to communicate with Apache Kafka - principally for the consumer part)
- Spring Data (for the Rest Controller)
- Lombok (1.18.30)
- MapStruct (1.5.5.Final)
- Postgresql
- Swagger (springdoc 2.3.0)
Docker
- Apache Kafka
- Apache Kafka-ui
- Zookeeper
- Postgresql (or H2 if you want to work with in-memory DB)
Starting with Spring Boot Application
Folder Structure
Below is the folder structure with the configuration of services, kafka configuration and controller.
java
└── com.outbox.pattern
└── controller/
├── OutboxService.java
└── domain/
├── OrderDomain.java
├── OutboxEventDomain.java
└── entity/
├── OrderEntity.java
├── OutboxEventEntity.java
└── exception/
├── OrderProcessingException.java
└── integration/
├── KafkaProducerConfig.java
└── repositories/
├── OrderRepository.java
├── OutboxRepository.java
└── Services/
└── orders/
├── OrdersService.java
├── OrdersServiceImpl.java
└── outbox/
├── OutboxService.java
├── OutboxServiceImpl.java
└── utils/
├── OrderMapper.java
├── OrderStatus.java
├── OutboxPatternApplication.java
resources/
├── application.yaml
Application Spring Boot
OutboxPatternApplication.java
is the starter of Spring Boot application
@SpringBootApplication
@EnableScheduling
@EnableJpaAuditing
public class OutboxPatternApplication {
public static void main(String[] args) {
SpringApplication.run(OutboxPatternApplication.class, args);
}
}
Controller
OrderController.java
is the Rest Controller to create or cancel an order.
We can call it via the swagger ui (http://localhost:8080/swagger-ui/index.html#/order-controller)
@RestController
@RequestMapping("/order")
public class OrderController {
private final OrdersService ordersService;
@Autowired
public OrderController(OrdersService ordersService) {
this.ordersService = ordersService;
}
// Post an order
@PostMapping(produces = "application/json")
public ResponseEntity<UUID> createOrder(@RequestBody @Valid OrderDomain orderDomain) {
UUID order = ordersService.createOrder(orderDomain);
return ResponseEntity.status(HttpStatus.CREATED)
.body(order);
}
// Cancel an order
@DeleteMapping(value = "/{orderUUID}", produces = "application/json")
public ResponseEntity<Boolean> cancelOrder(@PathVariable @Valid UUID orderUUID) {
return ResponseEntity.status(HttpStatus.OK)
.body(ordersService.cancelOrder(orderUUID));
}
}
Domain
OrderDomain.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderDomain {
private String product_name;
private int quantity;
}
OutboxEventDomain.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OutboxEventDomain {
private OrderStatus eventType;
private String eventPayload;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
come from the MapStruct. It is used to generate the Getter / Setter and the constructor.
Entity - DB entities
OrderEntity.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "orders")
@EntityListeners(AuditingEntityListener.class)
public class OrderEntity {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@Column(name = "status", nullable = false)
@Enumerated(EnumType.STRING)
private OrderStatus status;
@Column(name = "product_name", nullable = false)
private String product_name;
@Column(name = "quantity", nullable = false)
private int quantity;
@Column(name ="created_date", nullable = false)
@CreatedDate
private Instant orderDate;
}
@EntityListeners(AuditingEntityListener.class)
is used in combinaison with @CreatedDate
to retrieve automatically the creation date of the record.
OutboxEventEntity.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "outbox")
public class OutboxEventEntity {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
@Column(name = "event_type", nullable = false)
private OrderStatus eventType;
@Column(name ="event_payload", nullable = false)
private String eventPayload;
}
Exception - Exception Handling
OrderProcessingException.java
public class OrderProcessingException extends RuntimeException {
public OrderProcessingException(String message, String e) {
super(message);
}
}
Integration - Integration of Kafka
KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String kafkaHost;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
${spring.kafka.bootstrap-servers}
this parameter is a property from the application.yaml
Repository - Connection to the DB
OrderRepository.java
public interface OrderRepository extends CrudRepository<OrderEntity, UUID> {
@Modifying
@Query("""
UPDATE OrderEntity o
SET o.status = :status
WHERE o.id = :id
""")
void updateStatus(@Param(value = "id") UUID id,
@Param(value = "status") OrderStatus status);
}
OutboxRepository.java
@Repository
public interface OutboxRepository extends CrudRepository<OutboxEventEntity, UUID> {
//Empty Interface
}
Services - Communication between the Rest Controller and the DB
OrdersService.java
public interface OrdersService {
UUID createOrder(OrderDomain orderDTO);
boolean cancelOrder(UUID orderUUID);
}
OrdersServiceImpl.java
@Service
public class OrdersServiceImpl implements OrdersService {
private final OutboxRepository outboxRepository;
private final OrderRepository orderRepository;
private final OrderMapper orderMapper;
private final ObjectMapper objectMapper;
@Autowired
public OrdersServiceImpl(OutboxRepository outboxRepository, OrderRepository orderRepository, OrderMapper orderMapper, ObjectMapper objectMapper) {
this.outboxRepository = outboxRepository;
this.orderRepository = orderRepository;
this.orderMapper = orderMapper;
this.objectMapper = objectMapper;
}
@Override
@Transactional
public UUID createOrder(OrderDomain orderDomain) {
OrderEntity orderEntity = createOrderInDatabase(orderDomain);
saveOrderEventToDatabase(orderEntity, OrderStatus.CREATED);
return orderEntity.getId();
}
private OrderEntity createOrderInDatabase(OrderDomain orderDomain) {
try {
OrderEntity orderEntity = orderMapper.orderEntityToOrderDomain(orderDomain);
orderEntity.setStatus(OrderStatus.CREATED);
return orderRepository.save(orderEntity);
} catch (Exception e) {
throw new OrderProcessingException("Error processing order creation", e.getMessage());
}
}
private void saveOrderEventToDatabase(OrderEntity orderEntity, OrderStatus eventType) {
OutboxEventEntity outboxEvent = new OutboxEventEntity();
try {
outboxEvent.setEventPayload(objectMapper.writeValueAsString(orderEntity));
outboxEvent.setEventType(eventType);
outboxRepository.save(outboxEvent);
} catch (JsonProcessingException e) {
throw new OrderProcessingException("Error processing order event creation", e.getMessage());
}
}
@Override
@Transactional
public boolean cancelOrder(UUID orderUUID) {
OrderEntity orderEntity = getOrderEntity(orderUUID);
orderRepository.updateStatus(orderUUID, OrderStatus.CANCELLED);
saveOrderEventToDatabase(orderEntity, OrderStatus.CANCELLED);
return true;
}
private OrderEntity getOrderEntity(UUID orderUUID) {
return orderRepository.findById(orderUUID)
.orElseThrow(() -> new OrderProcessingException(String.format("Order entity with id %s not found", orderUUID), "NOT_FOUND"));
}
}
OutboxService.java
public interface OutboxService {
void eventProcessing();
}
OutboxServiceImpl.java
In this OutboxServiceImpl, we have created two different partition in Kafka to split the message payload for CREATED and CANCELLED.
NOTE: It is not the use case of partition for same domain but it has been added for learning purpose.
@Service
public class OutboxServiceImpl implements OutboxService {
private static final Logger LOG = LoggerFactory.getLogger(OutboxServiceImpl.class);
private final KafkaTemplate<String, String> kafkaTemplate;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
@Value("${configuration.kafka.outbox-topic}")
private String outboxTopic;
@Autowired
public OutboxServiceImpl(KafkaTemplate<String, String> kafkaTemplate, OutboxRepository outboxRepository, ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
@Override
@Scheduled(fixedRateString = "${configuration.kafka.scheduled}")
public void eventProcessing() {
List<OutboxEventEntity> listOfOutboxEventEntities = new ArrayList<>();
outboxRepository.findAll().forEach(listOfOutboxEventEntities::add);
LOG.info("Number of outbox events: {}", listOfOutboxEventEntities.size());
if (!listOfOutboxEventEntities.isEmpty()) {
for (OutboxEventEntity outboxEventEntity : listOfOutboxEventEntities) {
LOG.info("Sending event to Kafka");
String eventType = determineEventType(outboxEventEntity.getEventType());
if (eventType != null) {
sendEventToKafka(eventType, outboxEventEntity);
}
outboxRepository.deleteById(outboxEventEntity.getId());
}
}
}
private String determineEventType(OrderStatus eventType) {
return switch (eventType) {
case CREATED -> "CREATED";
case CANCELLED -> "CANCELLED";
default -> null;
};
}
private void sendEventToKafka(String eventType, OutboxEventEntity outboxEventEntity) {
try {
CompletableFuture<SendResult<String, String>> sendResult = kafkaTemplate.send(outboxTopic, eventType, objectMapper.writeValueAsString(outboxEventEntity));
SendResult<String, String> result = sendResult.get();
LOG.info("Partition: {}", result.getRecordMetadata().partition());
} catch (JsonProcessingException | InterruptedException | ExecutionException e) {
LOG.error("Error sending event to Kafka: {}", e.getMessage());
}
}
}
Utils - MapStruct to map entities, domains and OrderStatusEnum
OrderMapper.java
@Mapper(
componentModel = MappingConstants.ComponentModel.SPRING,
unmappedTargetPolicy = ReportingPolicy.IGNORE
)
public interface OrderMapper {
OrderMapper INSTANCE = Mappers.getMapper(OrderMapper.class);
OrderDomain orderDomainToOrderEntity(OrderEntity orderDomain);
OrderEntity orderEntityToOrderDomain(OrderDomain orderDomain);
}
OrderStatus.java
public enum OrderStatus {
CREATED,
CANCELLED
}
Configuration of Spring Boot and Kafka
application.yaml
configuration of Spring Boot and Kafka
spring:
application:
name: outbox-pattern
datasource:
url: jdbc:postgresql://${YOUR_IP_ADDRESS}:5432/microservices
username: postgres
password: postgres
jpa:
database: postgresql
show-sql: true
kafka:
bootstrap-servers: ${YOUR_IP_ADDRESS}:9092
consumer:
group-id: outbox-group-id
configuration:
kafka:
scheduled: 5000
outbox-topic: outbox-topic
${YOUR_IP_ADDRESS}
replace it with your IP
Here is the dependencies
in my pom.xml
that i have used to generate this example :
<project>
...
<properties>
<java.version>21</java.version>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<lombok.version>1.18.30</lombok.version>
<mapstruct.version>1.5.5.Final</mapstruct.version>
<springdoc.version>2.3.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${mapstruct.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>21</source>
<target>21</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path>
<path>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</path>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok-mapstruct-binding</artifactId>
<version>0.2.0</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Postgresql
create table outbox
(
event_payload varchar not null,
event_type varchar not null,
id uuid not null
);
create table orders
(
id uuid not null,
product_name varchar not null,
quantity integer not null,
created_date timestamp with time zone,
status varchar not null
);
Docker
To test all services in local, we are using the docker-compose below to create Kafka-ui, Kafka and Postgresql environment.
You need to have ZooKeeper up and running to have Kafka working.
Copy the file below and run docker compose up
in the same folder as the file.
version: "3.9"
services:
postgresql:
image: postgres:latest
container_name: postgresql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=microservices
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- postgres:/var/lib/postgresql/data
networks: [ "microservices" ]
zoo:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zoo
container_name: zoo
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo:2888:3888
volumes:
- "./zookeeper:/zookeeper"
networks: [ "microservices" ]
kafka:
image: confluentinc/cp-kafka:7.3.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo
volumes:
- "./kafka_data:/kafka"
networks: [ "microservices" ]
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8086:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:19092
networks: [ "microservices" ]
networks:
microservices:
name: microservices
# Define named volumes
volumes:
postgres:
mongo-data:
driver: local
Kafka Administration
In the kafka UI, you have to create a Topic with 2 partitions.
Here is the Kafka topic created with 2 partitions :
Access to the application and Kafka
swagger-ui : http://localhost:8080/swagger-ui/index.html#/
kafka UI : http://localhost:8086
Github code
Here is the link to the project : Github page
Going further
Regarding the application, it is a package that have all component in one jar.
It could be also possible to extract the polling service (with Kafka consumer) to a specific package to deploy it for exemple in a specific pod on Kubernetes to achieve the high availability or scalability implementing load balancing and handling failover scenarios.
Error Handling: Enhance error handling in your services. For example, in the cancelOrder method, if the order is not found, you throw a OrderProcessingException, which is good. However, you might want to catch exceptions thrown during Kafka message sending and handle them appropriately, perhaps by logging or retrying.
Security: Ensure that sensitive information such as database passwords or Kafka server configurations are properly secured, especially if you're sharing this code or deploying it in production environments.
Testing: While you have provided the endpoints via Swagger for testing, consider writing unit tests and integration tests to ensure the correctness of your code, especially for critical business logic and error handling scenarios.
Thank you for reading!
If you have any questions, feedback, or suggestions, please feel free to leave them in the comments below. I'm eager to hear from you and respond to your thoughts!
Top comments (0)