Suddenly, all application servers failed to establish a network connection with the DB server. It's a situation of complete system failure. Let's explore what issues arise with the orderSheet server I am responsible for when there is a problem with the DB and why compensation processing is necessary.
// response message
Could not open JDBC Connection for transaction;
nested exception is java.sql.SQLException: Cannot get a connection,
pool error Timeout waiting for idle object
When a user clicks the payment button on the orderSheet screen, the payment proceeds after passing through a validation check, and ultimately, the order completion process is carried out.
If a DB issue occurs before the payment is executed, the user may experience inconvenience, but there is no requirement for us to provide data compensation.
However, if a DB issue arises after the payment has been completed, we must perform compensation processing, such as canceling the payment.
The orderSheet server was designed with an EDA, but it communicated directly with the order/payment server via an API. If the DB server goes down, the order completion request (number 2) fails, followed by an attempt to perform the order failure event handling task (number 4), which also ends up failing (number 6).
cf) As of February 2024, the communication in this segment has also been switched to a Kafka publish/subscribe structure.
Now let's go into detail. When a user clicks the payment button on the orderSheet screen, the orderSheet server proceeds with validation and payment, and then receives an order completion event to start the final process.
@Component
@RequiredArgsConstructor
public static class CommandMessageHandler {
// number 1
public void onCompleteOrder(CompleteOrder completeOrder) {
onCompleteOrder(completeOrder);
}
}
If the API response fails, it publishes the onCompleteOrderFailed event and sends it to number 4.
public void completeOrder(CompleteOrder completeOrder) {
CompleteOrderRequestDto dto = toCompleteOrderRequestDto(completeOrder);
try {
// number 2
apiClient.completeOrder(dto).block();
} catch (Exception ex) {
// publish onCompleteOrderFailed event(go to number 4)
}
}
Since the DB is down, the order failure processing request (number 5) also fails. However, it merely logs the error in the catch block and ends the process. In other words, it throws an exception to the KafkaListener.
@Component
@RequiredArgsConstructor
public static class MessageMessageHandler {
// number 4
public void onCompleteOrderFailed(CompleteOrderFailed completeOrderFailed) {
onCompleteOrderFailed(completeOrderFailed);
}
}
public void onCompleteOrderFailed(completeOrderFailed) {
try {
// number 5
apiClient.rollbackForConsistence(completeOrderFailed);
} catch(Exception ex) {
log.error("error!!", ex)
}
}
In springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder
, the DefaultAfterRollbackProcessor
is used as the afterRollbackProcessor. When creating the DefaultAfterRollbackProcessor
object, a BackOff
is also provided. If you look at createBackOff, you'll see that the maxAttempts in ConsumerProperties is set to 3.
maxAttempts signifies the number of attempts to process a message in case of failure (including the first attempt). It is a configuration of the RetryTemplate
provided by the framework. Since we were using the default setting, it attempts to consume a total of 3 times.
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
MessagingException payload =
new MessagingException(((RecordMessageConverter) messageConverter)
.toMessage(record, null, null, null),
"Transaction rollback limit exceeded", exception);
try {
errorInfrastructure.getErrorChannel()
.send(new ErrorMessage(
payload,
Collections.singletonMap(
IntegrationMessageHeaderAccessor.SOURCE_DATA,
record
)
)
);
}
catch (Exception e) {
/*
* When there is no DLQ, the FinalRethrowingErrorMessageHandler will re-throw
* the payload; that will subvert the recovery and cause a re-seek of the failed
* record, so we ignore that here.
*/
if (!e.equals(payload)) {
throw e;
}
}
}, createBackOff(extendedConsumerProperties),
new KafkaTemplate<>(transMan.getProducerFactory()),
extendedConsumerProperties.getExtension().isTxCommitRecovered()));
Since the attempt to consume the event was retried 3 times but all attempts failed, ultimately, the following error message is received.
Transaction rollback limit exceeded; nested exception is
org.springframework.kafka.listener.ListenerExecutionFailedException
Now let's look into compensation processing.
After the DB server is normalized, one way to correct the data is by manually republishing the onCompleteOrderFailed
event, allowing the order failure process (number 4) to be reattempted.
This will proceed with the order failure handling, and the subsequent rollback process will operate to address the tangled data.
To manually republish the onCompleteOrderFailed event, it is necessary to first know the topic, partition, and offset information.
The topic can be found in the application property settings, and the partition and offset can be located in the error logs (the offset where the producer published the order failure event after an API error occurred in number 3).
Once the offset is identified, the next step is to find the message at that offset. We located it by searching through our company's operational tools. Then, we copied the message value, generated a new key for it, and manually republished the event.
It's regrettable that I cannot provide detailed explanations on this part due to it being a personal blog.
Finally, what happens when a DB outage occurs in a Kafka pub/sub structure, rather than through API communication? We won't receive an API error response, but command/event objects will remain stagnant at a certain state.
We collect objects that continue to be stagnant even after a certain period and proceed with the compensation processing.
Top comments (0)