DEV Community

ndr_brt for CherryChain

Posted on

Handle backpressure between Kafka and a database with Vert.x

As we already discussed in the past, asynchronous programming brings many pros in developing reactive and responsive applications. However, it also carries cons and challenges, one of the main ones is the backpressure problem.

What is backpressure?

In physics

it is a resistance or force opposing the desired flow of fluid through pipes (wikipedia)

We can translate the problem on a known scenario: persistence of messages polled from a bus, where there are a huge amount of messages on a bus that our application is polling really fast, but the database underneath is really slow.

real world example

How can be funnel overflow be avoided?

In a synchronous scenario, there's no backpressure's issue, the sync nature of the computation blocks the polling from the bus until the current message is processed.
But, in the async world, the polling is executed without clues on what's happening on the database. So if the database can't handle all the messages coming from the bus, the messages will remain "in between", which means in the memory of our service.
This can lead to failures or, at worst, to service fault.

Let's try to develop an application that persists messages in a database, and make it evolve to handle backpressure

Automatic polling

At first, our verticle will do those operations:

  • initialize the JDBC client
  • initialize the Kafka client
  • subscribe to the topic
  • persist the records

The code is quite simple, and it works well with small amounts of messages.
When the load gets bigger and bigger a problem appears: using the handler of Vertx Kafka consumer means that there's no control on the message ratio, so it will poll continuously without considering the persistance rate, causing memory overload.

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise)
      .handler(record -> {
        persist(jdbc, record)
          .onSuccess(result -> System.out.println("Message persisted"))
          .onFailure(cause -> System.err.println("Message not persisted " + cause));
      });
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }


  private Future<UpdateResult> persist(JDBCClient jdbc, KafkaConsumerRecord<String, String> record) {
    Promise<UpdateResult> promise = Promise.promise();
    JsonArray params = toParams(record);
    jdbc.updateWithParams("insert or update query to persist record", params, promise);
    return promise.future();
  }

  private JsonObject datasourceConfiguration() {
    // TODO datasource configuration
    return null;
  }

  private JsonArray toParams(KafkaConsumerRecord<String, String> record) {
    // TODO: convert the record into params for the sql command
    return null;
  }
}

Explicit polling

To handle the backpressure, explicit polling shall be used, and this can be done by avoiding the kafka consumer's handler setting and by calling poll manually (in the following case, every 100ms).
By using this approach, it can be made so that every poll gets performed only when the batch of previously polled messages are persisted.
This behaviour can be achieved by handling every message's persist future and collecting all of them with the CompositeFuture.all, that will succeed only when all the messages are completed, and only in this case the next polling can be made.
If at least one of the future fails, everything will fail, and the polling will stop.
There are various solutions that can be adopted to make the service handle the failure, e.g. sending the message to a Dead Letter Queue, but we will not cover this case.

The problem with this code is that if a message fails, we will lose it, because the consumer is set to auto-commit, so, it's vertx that commits the topic offset.

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer<String, String> consumer = KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List<Future<UpdateResult>> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .onSuccess(composite -> {
        System.out.println("All messages persisted");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting messages: " + cause))
    ;
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }

  ...
}

Manual commit

Setting the ENABLE_AUTO_COMMIT_CONFIG properties to false, the service takes ownership of the topic offset commit.
The commit will be performed only when every message will be persisted, with this trick the at least once delivery is achieved.

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer<String, String> consumer = KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List<Future<UpdateResult>> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .compose(composite -> {
        Promise<Void> commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println("All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause))
    ;
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
    config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }
  ...
}

Bonus feature: achieve ordering

With a little effort it's possible to achieve ordering:
future composition allows forcing every persist operation to wait the completion of its precedent.
It's achievable by chaining the async computations one to another, so every one will be executed when the precedent future succeeds.
This is a smart pattern to be used when serialization is needed.

public class MainVerticle extends AbstractVerticle {
  ...
  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> IntStream.range(0, records.size())
        .mapToObj(records::recordAt)
        .reduce(Future.<UpdateResult>succeededFuture(),
          (acc, record) -> acc.compose(it -> persist(jdbc, record)),
          (a,b) -> a
        )
      )
      .compose(composite -> {
        Promise<Void> commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println("All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause));
  }
  ...
}

Conclusion

Backpressure is a fundamental topic to cover when working with async programming.
It does not come for free out of the vert.x box, but it can be acheived with some simple tricks.

Top comments (3)

Collapse
 
semo profile image
semo • Edited

Hi. I can't see the why there should be a need for a backpressure mechanism, because messages are consumed by a Consumer, when it's not blocked/busy and when it has enough resources. I see a need for a backpressure mechanism, when talking about a push principle based message queue, for example Redis. But here we are talking explicitly about Kafka. So why should I think of such a seemingly impossible issue? Let's assume a DB is slow and single inserts or updates take their time, then you could at least pause() and then resume() consuming. Where do one needs the described mechanisim? What may I have overseen here?

Collapse
 
kriska profile image
Kristina Gocheva

Great article! You've given really easy to understand example. However, I was wondering something I didn't see you mention. In the first case, when you don't use the polling principle upon failing you see which record exactly has failed. Is it true that with the polling you can only get the batch of records in which a failing record appeared and not the single record itself? And another question, could you explain what would happen if for example the 3 of 10 records in the batch fails? Does this fail that the whole batch of 10 records?

Collapse
 
ndrbrt_23 profile image
ndr_brt

with the polling you can get the single record by setting the config max.poll.records to 1, but it's not good for performances.
With polling you get the a batch of messages, but you can process it one at a time, so, it's possible to know which is failing, the advice is to avoid commit when at least one message fails.
So yes, with this approach one or more fail = all batch fails.