DEV Community

Yash Tailor
Yash Tailor

Posted on

Circuit Breaker for Kafka using Resilience4j

Kafka is one of the most powerful distributed messaging systems. It is used to communicate data asynchronously between the systems. Consider a system where you have micro service architecture containing several micro services. Each micro service consumes messages from kafka topic, does some kind of processing (may be make an http call, db interaction, etc.) and then produces a message on another kafka topic. What will happen if say the service or the db is down? Let's say we have configured a custom error handler which will propogate all the errors encountered during the consumption to an error topic.

In that case, as and when kafka will consume messages, it will keep on failing and keep sending messages on the error topic. We all know the importance of circuit breaker. Circuit breaker is a pattern used to detect and handle faults in communication between services, preventing them from cascading and causing further damage.

One of the best part about kafka is that we have full control on when to poll for the messages. If we know that a dependency is down, we should ideally stop consuming any more messages so that we don't accumulate more errors. We can always replay the error messages so that they can be processed again but think about it, why to waste any more resources in processing the message if we know that its going to fail? (Also think about transactional systems)

Hence, we need a circuit breaker for our kafka listeners. Resilience4j is a circuit breaker library for java applications and provides a lot of built in methods to create a custom circuit breaker and listen to several transitions. Before we jump into how we can configure a circuit breaker using resilience4j, let us have a quick look over different principles of circuit breaker.

A circuit breaker works with multiple states.

  1. A CLOSED state is where the circuit breaker allows the processing of requests.
  2. An OPEN state is where the circuit breaker doesn't allow the processing of requests.
  3. A HALF OPEN state is where the circuit breaker allows a small number of requests to check if the service is still unhealthy (which in case needs to transition to OPEN state) or healthy (which in case needs to transition to CLOSED state)

Lets us take these states into account and apply for our kafka listeners. Initially, our kafka listener will be in CLOSED state where it will allow messages to be consumed. Once, the circuit breaker identifies that the number of errors caused post listening to those messages have exceeded a threshold, it will transition to OPEN state, where in it won't allow any more consmption of messages. At this point, we will wait for some time hoping that whatever caused the error could recover. For example, if the error was caused because the service we make an http call was down, we hope that it will get recovered. After waiting for a specified time, circuit breaker will transition to HALF OPEN state where it will permit a definite number of calls to check the status of the listener. If the number of errors caused is greater than a specified threshold, it transition back to OPEN state else goes back to CLOSE state if listener is healthy.

Now, this is not enough, at times, if we see consective transitions from OPEN -> HALF OPEN -> OPEN -> HALF OPEN -> ..., the problem is ideally of a bigger cause. In that case, we would want to stay in OPEN state and allow manual triggers to transition to CLOSE state wherever the issue is resolved.

Now, that we know different transitions of circuit breaker, lets see how we can configure circuit breaker for kafka listener using resilience4j in an exisiting spring boot application.

First of all, lets add the circuit breaker resilience4j library.

implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-resilience4j'
Enter fullscreen mode Exit fullscreen mode

Now, that we added resiliene4j, lets create our own custom circuit breaker registry. The circuit breaker registry helps us to create and manage custom circuit breakers.

@Configuration
public class CustomCircuitBreakerRegistry {
    @Bean
    public CircuitBreakerRegistry circuitBreakerRegistry(){
        CircuitBreakerConfig.ofDefaults();
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
                .failureRateThreshold(50)
                .enableAutomaticTransitionFromOpenToHalfOpen()
                .permittedNumberOfCallsInHalfOpenState(4)
                .slidingWindowSize(6)
                .waitDurationInOpenState(Duration.ofSeconds(120))
                .build();
        return CircuitBreakerRegistry.of(config);
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, we have configured the following:

  • sildingWindowSize : this is the number of calls we will consider to identify the circuit breaker transitioning (6)
  • failureRateTreshold : this is the threshold above which the circuit breaker will transition to CLOSE state (50%)

Now, what the above two combition means is that if atleast 3 (50% of 6 = 3) calls fail out of the last 6 calls, circuit breaker has to transition to OPEN state.

  • waitDurationInOpenState : this is the duration of time the circuit breaker should stay in the OPEN state after which it can transition to HALF OPEN state
  • enableAutomaticTransitionFromOpenToHalfOpen : if this is not enabled, it will not transition from OPEN to HALF OPEN state if there are no calls, if this not enabled, it makes the system more reactive than pro-active. Though, we will need to enable this property and the reason will be clearer as and when we progress.
  • permittedNumberOfCallsInHalfOpenState : this controls the maximum number of calls that can be made in HALF OPEN state. (4) It will only allow 4 calls to be permmitted in HALF OPEN and if atleast 2 (50% of 4 = 2) calls have failed it will transition back to OPEN state.

Now, there can be multiple kafka listeners in your micro service. We will need to configure an independent circuit breaker for all the kafka listeners. For that, we will make use of the ConsumerStartingEvent of kafka. Basically, whenever a new consumer starts listening on a specific topic and partition, we would want to create a circuit breaker for that listener.

@Component
@Slf4j
public class CustomKafkaListenerContainerManager implements ApplicationListener<ConsumerStartingEvent> {

    @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    // Whenever a new consumer is starting, we configure a circuit breaker for it
    @Override
    public void onApplicationEvent(ConsumerStartingEvent event) {
        List<KafkaMessageListenerContainer> containerList = event.getContainer(ConcurrentMessageListenerContainer.class).getContainers();
        containerList.forEach(container -> {
            Optional<String> topicName = Arrays.stream(Objects.requireNonNull(container.getContainerProperties().getTopics())).findAny();
            topicName.ifPresent(this::addCktBreaker);
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, whenver a new consumer is starting, we get the topic and add a circuit breaker for it. Now, lets see how addCktBreaker can be written :

@Autowired CustomCircuitBreakerRegistry circuitBreakerRegistry;

public void addCktBreaker(String topic){
        circuitBreakerRegistry.circuitBreakerRegistry().circuitBreaker(topic).getEventPublisher().onStateTransition(transition->{
            log.info("state transition: "+transition.getStateTransition().getFromState()+" -> "+transition.getStateTransition().getToState());
            switch (transition.getStateTransition().getToState()){
                case OPEN:
                    pauseKafkaConsumers(topic, CircuitBreaker.State.OPEN);
                    break;
                case HALF_OPEN:
                    resumeKafkaConsumers(topic, CircuitBreaker.State.HALF_OPEN);
                    break;
        })
    }
Enter fullscreen mode Exit fullscreen mode

Here, we create a circuit breaker of a name same as the topic name (advantage we will understand later) and listen to state transition events for that circuit breaker. We want to do the following things : If the circuit breaker goes in the OPEN state we want to pause the kafka consumer and if the circuit breaker goes in the HALF OPEN state we want to resume the kafka consumer.

Let's write the pause and resume consumers :

public MessageListenerContainer getMessageListenerContainer(String topic){
        var container = kafkaListenerEndpointRegistry
                .getAllListenerContainers()
                .stream()
                .filter(messageListenerContainer -> {
                    String[] topics = messageListenerContainer.getContainerProperties().getTopics();
                    if(topics == null)return false;
                    return Arrays.asList(topics).contains(topic);
                }).collect(Collectors.toList());
        return container.size() > 0 ? container.get(0) : null;
    }

public void pauseKafkaConsumers(String topic, CircuitBreaker.State state){
        log.info("circuit breaker in the state: {}",state.name());
        MessageListenerContainer container = getMessageListenerContainer(topic);
        if(container==null){
            log.info("No container found with the name {}",name);
            return;
        }
        container.pause();
    }

    public void resumeKafkaConsumers(String topic, CircuitBreaker.State state){
        log.info("circuit breaker in the state: {}",state.name());
        MessageListenerContainer container = getMessageListenerContainer(topic);
        if(container==null){
            log.info("No container found with the name {}",name);
            return;
        }
        container.resume();
    }
Enter fullscreen mode Exit fullscreen mode

Here, we get the kafka container listening on the topic same as the name of the circuit breaker and call the pause and resume method accordingly.

Now, that our configuration if set up, we have use those circuit breakers and add to our kafka listeners. In order to do that, we make use of our custom error handler.

@Component
@Slf4j
public class CustomKafkaErrorHandler implements ErrorHandler {

    @Autowired CustomCircuitBreakerRegistry circuitBreakerRegistry;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
        assert data != null;
        log.info("Circuit breaker error handler...."+data.topic());
        circuitBreakerRegistry.circuitBreakerRegistry().circuitBreaker(data.topic()).onError(1, TimeUnit.MINUTES,thrownException);
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, whenever there is an error in the processing of the kafka errors, it is redirected to the error handler, wherein we fetch the topic name and acknowledge an error to the circuit breaker of the same name. In this way, indirectly we make our circuit breakers aware of the errors.

Now, lets handle the scenario where there are consecutive number of failures. Lets go to our CustomKafkaListenerContainerManager and handle the errors.

public class CustomKafkaListenerContainerManager ...{
    @Getter public HashMap<String, Integer> errorManager = new HashMap<>();

    @Getter public static final int THRESHOLD = 5;
}

public void addCktBreaker(String topic){
circuitBreakerRegistry.circuitBreakerRegistry().circuitBreaker(name).getEventPublisher().onStateTransition(transition->{
            switch (transition.getStateTransition().getToState()){
       case 
   }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
schemetastic profile image
Schemetastic (Rodrigo)

Hello! Welcome to DEV.

Certainly, this is not my area, hahaha. But excuse me for my ignorance. Where could you use this? I mean, is for app development, software, servers? All of them?