DEV Community

Cover image for Testing a Kafka consumer with Avro schema messages in your Spring Boot application with Testcontainers
kreuzwerker
kreuzwerker

Posted on • Originally published at kreuzwerker.de

Testing a Kafka consumer with Avro schema messages in your Spring Boot application with Testcontainers

Written by Anja Gruss
Originally published on June 3rd 2021

How many testcontainers do you need to test the consumption of an Avro message in your Kafka message consumer?

The short answer is, as usual: it depends. Let’s assume this situation:

You have a Spring Boot microservice to manage user data. That microservice listens to incoming events (such as user creations, updates or deletes) from Kafka, transforms them into your own business objects, writes them into a PostgreSQL database and provides them via REST interface to your frontend. The overall infrastructure provides Avro messages and the Confluent schema registry.

The sample project uses Apache Maven with the avro-maven-plugin to download the schema files and generate the sources, but of course there are plugins for Gradle too.

Now you want to test that your Kafka consumer reads the events, transforms them into your database entities, and saves them.

When you check on the internet for testing in the context of Spring Boot Kafka consumer and Avro schema, you find quite a few variants: using the MockSchemaRegistryClient, or writing your own custom Avro de-/serializers, or setting up a Testcontainers ecosystem with a Kafka, a Zookeeper and a Confluent Schema Registry, or using the EmbeddedKafka provided by Spring in the spring-kafka-test dependency.

All these solutions have their valid pros and cons.

In this blog post, I will present a solution that uses a minimum set of Testcontainers to provide the best compromise between control, speed and efficiency. The other solutions I listed above have their relative pros and cons, which I will cover later in this post.

Let me walk you through the smaller setup, simulating the situation above. We have a Spring Boot application, a PostgreSQL database and our Kafka consumer. The application needs to listen to Kafka messages for users that were added or modified, and has to update the database items accordingly. You can find the full code repository here. I added the UserEvent class for compilation purpse only, normally that would end up in your generated classes via the Avro plugin.

Let's address the first problem: how to interact with the Schema Registry from our testing environment? We need a mock. Hidden in Confluent's schema registry package, in the AbstractKafkaAvroSerDeConfig class, you can find this comment for the schema registry url:

Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas. If you wish to get a connection to a mocked schema registry for testing, you can specify a scope using the 'mock://' pseudo-protocol. For example, 'mock://my-scope-name' corresponds to 'MockSchemaRegistry.getClientForScope("my-scope-name")'.

So that means we can configure the Kafka producer and consumer with an imaginary schema registry url, that only needs to start with “mock://” and you automatically get to work with the MockSchemaRegistryClient. This way you don't need to explicitly initiate the MockSchemaRegistryClient and configure everything accordingly. That also eradicates the need for the Confluent Schema Registry Container. Running the Kafka Testcontainer with the embedded Zookeeper, we no longer need an extra Zookeeper container and we are down to one Testcontainer for the messaging. This way I ended up with only two Testcontainers: Kafka and the database.

Set up and configure your containers


  protected static final PostgreSQLContainer<?> postgreSQLContainer =
      new PostgreSQLContainer<>("postgres:10.9")
          .withPassword("postgres")
          .withUsername("postgres")
          .withExposedPorts(5432)
          .withReuse(true);

  protected static final KafkaContainer kafkaContainer =
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"))
          .withEmbeddedZookeeper()
          .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093 ,BROKER://0.0.0.0:9092")
          .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
          .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
          .withEnv("KAFKA_BROKER_ID", "1")
          .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
          .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
          .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
          .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
          .withNetwork(network);

  static {
    Startables.deepStart(Stream.of(postgreSQLContainer, kafkaContainer)).join();
  }
Enter fullscreen mode Exit fullscreen mode

And create a Kafka producer and consumer with their configurations for the tests.

  public static KafkaProducer<Object, Object> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafkatest");
    return new KafkaProducer<>(props);
  }
Enter fullscreen mode Exit fullscreen mode
  public static KafkaConsumer<String, Object> createEventConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkatest");
    return new KafkaConsumer<>(props);
  }
Enter fullscreen mode Exit fullscreen mode

Here we only add the mock prefixed schema registry url, no special serializers or deserializers. Don’t forget to set the property for the specific.avro.reader to "true" in your Kafka consumer configuration to ensure that the consumer does not fail with a class cast exception.

Now in your tests you create the events you expect the Kafka producer to send, have your Kafka consumer subscribe to the topic, and get the ConsumerRecord from the KafkaTestUtils to be processed by your own listener. And then you test the result of whatever your listener did. That’s all.

Pros and Cons? 🍿

Of course, runtime is an issue. Testcontainers need to start and network before being fully available. A much faster alternative to my setup is to move all the infrastructure to memory, by using EmbeddedKafka as a broker and replacing PostgreSQL with the H2 database. You can find a working version of the EmbeddedKafka in the branch “embeddedKafka” (my naming skill is highly imaginative).

Admittedly, in terms of timing, the minimalistic setup with EmbeddedKafka and H2 is pretty hard to beat. The test contains the same methods as the main branch, and takes on average 5 seconds to run on my machine.

My version with one Kafka and one Postgres container takes on average 15 seconds to run.

Now imagine how long it would run with 4 Testcontainers (Schema Registry, Kafka and Zookeeper, plus the database container). And with more code, with more complex business logic to test. 😕

But for using the EmbeddedKafka, you need to pay attention to the dependencies versions. The Confluent Schema Registry Client comes with it’s own Zookeeper version, and depending on the version of the Schema Registry Client you might end up with a different version of Zookeeper than the one expected by the kafka-test dependency, which would result in a ClassNotFound exception when running the test. For more details, check the version matrix by Confluent and the Kafka client matrix by Spring.

So I stuck to the container setup because of version constraints. In the case for this demo application it did not matter at all, but to avoid unexpected behavior I recommend sticking as close to the production versions as possible. And of course using the H2 might also not always be an option, depending on what features (such as constraints, json datatype, some join statements) you use of your production database. So this is a compromise regarding control and test runtime.

Also, did I already mention ‘it depends’?

You can find the code on github, with the main branch being the demonstration with two containers and the embeddedKafka branch being the container free version.

What's your experience with testing in that kind of setup? Do you have questions or suggestions? Do get in touch with us!


Image on pexels

Oldest comments (0)