In a Spring application, if messages are not exchanged directly but through Kafka based on an event-driven approach, special attention must be paid to avoiding NullPointerException.
Let's look at the code below.
In onStartOrder
, the order process begins and publishes a point reservation command message to Kafka. Then, in onReservePoint
, it receives and processes the point reservation command message.
The point reservation command message contains the pointReserveDto
, but with a specification change, a new field became necessary. Thus, a newObject
was created and additionally passed, and the modification was made in onReservePoint
, which performs the point reservation, to retrieve and use it.
Here arises a problem. Existing point reservation command messages in Kafka do not contain the newObject
in their pointReserveDto
. Although the Spring application has been deployed with a new version, it still consumes old version messages and executes the code pointReserveDto.getNewObject().getData()
, leading to a NullPointerException.
I have illustrated the entire process to provide a detailed explanation through code. It's beneficial to view the workflow and code together.
cf) The code for steps 5 and 6 has been omitted, as discussing persistence would make the explanation overly lengthy.
The starting point is the kafkaListener
. When it receives a StartOrder
command message from Kafka, it delegates the task to the orderService
.
@Configuration
@RequiredArgsConstructor
public class KafkaListener {
@Component
@RequiredArgsConstructor
public static class KafkaCommandHandler {
private final OrderService orderService;
public void onStartOrder(StartOrder startOrder) {
this.orderService.startOrder(startOrder);
}
}
}
In orderService.startOrder
, a PointReserveDto
is created, and due to a specification change, a NewObject
is also created and included. It is then passed to the orderProcessor
.
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderProcessor orderProcessor;
public void startOrder(StartOrder startOrder) {
String orderId = startOrder.getOrderId();
// create NewObject due to change the specification
PointReserveDto pointReserveDto = PointReserveDto.builder()
.newObject(
NewObject.builder()
.data("newData")
.build()
)
.build();
orderProcessor.processStartOrder(orderId, pointReserveDto);
}
}
In orderProcessor
, business logic is executed. First, it retrieves the Order object from the database using the orderId received from the StartOrder
command. Then, utilizing the Order object, it carries out the tasks related to starting the order. Upon completion, it calls order.applyOrderStarted
to publish an event indicating that the StartOrder
has finished.
@Component
public class OrderProcessor {
public void processStartOrder(String orderId, PointReserveDto pointReserveDto) {
Order order = getOrderById(orderId);
// Do StartOrder process
order.applyOrderStarted(pointReserveDto);
}
// In reality, it is necessary to query the database to retrieve the Order object.
private Order getOrderById(String orderId) {
return Order.builder().id(orderId).build();
}
}
cf) The main purpose is to explain the process leading to a NullPointerException, so we'll skip details, but it's worth noting that the structure of issuing events and commands from Order
is a key point in an EDA.
Next, let's look at the following code. order.applyOrderStarted
takes the received pointReserveDto
, encapsulates it into an OrderStarted
event, and adds it to the pendingEvents list. Lastly, through the apply method, it fills the Order object's pointReserveDto field.
Subsequently, it creates a ReservePoint
command, encapsulates the pointReserveDto
within it, and similarly adds it to the pendingCommands list. The remaining tasks involve publishing the messages in pendingEvents and pendingCommands to Kafka and saving the order in the database. This is the content for step 5 and 6, and the related code is omitted.
@Getter
@Builder
public class Order {
String id;
@Nullable
PointReserveDto pointReserveDto;
List<Event> pendingEvents = new ArrayList<>();
List<Command> pendingCommands = new ArrayList<>();
public void applyOrderStarted(PointReserveDto pointReserveDto) {
this.pendAndApplyEvent(
OrderStarted.builder()
.orderId(this.getId())
.pointReserveDto(pointReserveDto)
.build(),
this::apply
);
this.pendingCommands.add(
ReservePoint.builder()
.orderId(this.getId())
.pointReserveDto(this.pointReserveDto)
.build()
);
}
private <T extends Event> void pendAndApplyEvent(T event, Consumer<T> apply) {
this.pendingEvents.add(event);
apply.accept(event);
}
private void apply(OrderStarted orderStarted) {
this.pointReserveDto = orderStarted.getPointReserveDto();
}
}
Because the ReservePoint
command was published above, the KafkaCommandHandler
consumes it again through onReservePoint
.
@Configuration
@RequiredArgsConstructor
public class KafkaListener {
@Component
@RequiredArgsConstructor
public static class KafkaCommandHandler {
private final OrderService orderService;
public void onReservePoint(ReservePoint reservePoint) {
this.orderService.reservePoint(reservePoint);
}
}
}
In orderProcessor
, attempting to extract the newObject.data
value from the received pointReserveDto
leads to encountering a NullPointerException.
@Component
public class OrderProcessor {
public void processReservePoint(
String orderId,
PointReserveDto pointReserveDto
) {
Order order = getOrderById(orderId);
// Do ReservePoint process
// Fixme: NPE !! pointReserveSource.getNewObject() is Null
String newData = pointReserveDto.getNewObject().getData();
}
// In reality, it is necessary to query the database to retrieve the Order object.
private Order getOrderById(String orderId) {
return Order.builder().id(orderId).build();
}
}
Top comments (2)
Thanks for the post @yangbongsoo !
Going deep into the problem, looks like that you need a way to "versioning" message schemas. Specifically in kafka you can use the kafka schema registry. With this tool is possible register schemas and simplify the process of evolving a schema
@hebertrfreitas Thanks for your comment.
Our team decided not to use Kafka Schema Registry when setting up the initial architecture. Instead We use json message format.
The preferences of those leading the architecture design were reflected. There was no specific reason :)