DEV Community

yangbongsoo
yangbongsoo

Posted on

Message Compensation Processing Due to DB Failure in EDA

Suddenly, all application servers failed to establish a network connection with the DB server. It's a situation of complete system failure. Let's explore what issues arise with the orderSheet server I am responsible for when there is a problem with the DB and why compensation processing is necessary.

// response message
Could not open JDBC Connection for transaction;
nested exception is java.sql.SQLException: Cannot get a connection, 
pool error Timeout waiting for idle object
Enter fullscreen mode Exit fullscreen mode

When a user clicks the payment button on the orderSheet screen, the payment proceeds after passing through a validation check, and ultimately, the order completion process is carried out.

If a DB issue occurs before the payment is executed, the user may experience inconvenience, but there is no requirement for us to provide data compensation.

However, if a DB issue arises after the payment has been completed, we must perform compensation processing, such as canceling the payment.

The orderSheet server was designed with an EDA, but it communicated directly with the order/payment server via an API. If the DB server goes down, the order completion request (number 2) fails, followed by an attempt to perform the order failure event handling task (number 4), which also ends up failing (number 6).

cf) As of February 2024, the communication in this segment has also been switched to a Kafka publish/subscribe structure.

Image description

Now let's go into detail. When a user clicks the payment button on the orderSheet screen, the orderSheet server proceeds with validation and payment, and then receives an order completion event to start the final process.

@Component
@RequiredArgsConstructor
public static class CommandMessageHandler {
  // number 1
  public void onCompleteOrder(CompleteOrder completeOrder) {
    onCompleteOrder(completeOrder);
  }
}
Enter fullscreen mode Exit fullscreen mode

If the API response fails, it publishes the onCompleteOrderFailed event and sends it to number 4.

public void completeOrder(CompleteOrder completeOrder) { 
  CompleteOrderRequestDto dto = toCompleteOrderRequestDto(completeOrder);

  try {    
    // number 2
    apiClient.completeOrder(dto).block();
  } catch (Exception ex) {
    // publish onCompleteOrderFailed event(go to number 4)
  }
}
Enter fullscreen mode Exit fullscreen mode

Since the DB is down, the order failure processing request (number 5) also fails. However, it merely logs the error in the catch block and ends the process. In other words, it throws an exception to the KafkaListener.

@Component
@RequiredArgsConstructor
public static class MessageMessageHandler {
  // number 4
  public void onCompleteOrderFailed(CompleteOrderFailed completeOrderFailed) {
    onCompleteOrderFailed(completeOrderFailed);
  }
}


public void onCompleteOrderFailed(completeOrderFailed) {
  try {
    // number 5
    apiClient.rollbackForConsistence(completeOrderFailed);
  } catch(Exception ex) {
    log.error("error!!", ex)
  }
}
Enter fullscreen mode Exit fullscreen mode

In springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder, the DefaultAfterRollbackProcessor is used as the afterRollbackProcessor. When creating the DefaultAfterRollbackProcessor object, a BackOff is also provided. If you look at createBackOff, you'll see that the maxAttempts in ConsumerProperties is set to 3.

maxAttempts signifies the number of attempts to process a message in case of failure (including the first attempt). It is a configuration of the RetryTemplate provided by the framework. Since we were using the default setting, it attempts to consume a total of 3 times.

messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
  (record, exception) -> {
    MessagingException payload =
        new MessagingException(((RecordMessageConverter) messageConverter)
              .toMessage(record, null, null, null),
              "Transaction rollback limit exceeded", exception);
    try {
      errorInfrastructure.getErrorChannel()
          .send(new ErrorMessage(
              payload,
              Collections.singletonMap(
                IntegrationMessageHeaderAccessor.SOURCE_DATA,
                record
              )
            )
          );
    }
    catch (Exception e) {
      /*
      * When there is no DLQ, the FinalRethrowingErrorMessageHandler will re-throw
      * the payload; that will subvert the recovery and cause a re-seek of the failed
      * record, so we ignore that here.
      */
      if (!e.equals(payload)) {
        throw e;
      }
    }
  }, createBackOff(extendedConsumerProperties),
  new KafkaTemplate<>(transMan.getProducerFactory()),
  extendedConsumerProperties.getExtension().isTxCommitRecovered()));
Enter fullscreen mode Exit fullscreen mode

Since the attempt to consume the event was retried 3 times but all attempts failed, ultimately, the following error message is received.

Transaction rollback limit exceeded; nested exception is 
org.springframework.kafka.listener.ListenerExecutionFailedException
Enter fullscreen mode Exit fullscreen mode

Now let's look into compensation processing.
After the DB server is normalized, one way to correct the data is by manually republishing the onCompleteOrderFailed event, allowing the order failure process (number 4) to be reattempted.

This will proceed with the order failure handling, and the subsequent rollback process will operate to address the tangled data.

To manually republish the onCompleteOrderFailed event, it is necessary to first know the topic, partition, and offset information.

The topic can be found in the application property settings, and the partition and offset can be located in the error logs (the offset where the producer published the order failure event after an API error occurred in number 3).

Image description

Once the offset is identified, the next step is to find the message at that offset. We located it by searching through our company's operational tools. Then, we copied the message value, generated a new key for it, and manually republished the event.

It's regrettable that I cannot provide detailed explanations on this part due to it being a personal blog.

Finally, what happens when a DB outage occurs in a Kafka pub/sub structure, rather than through API communication? We won't receive an API error response, but command/event objects will remain stagnant at a certain state.

We collect objects that continue to be stagnant even after a certain period and proceed with the compensation processing.

Top comments (0)