Here are all the ways you can configure Micronaut Kafka, both regular applications and streams, to use particular serialisers and deserialisers.
A key thing to remember is properties are used first and then the configured serde registries are used.
(as an aside I found it helpful to know a serde is simply and object with both a serialiser and a deserialiser)
For regular kafka
If you want all consumers and producers to have the same config
kafka:
key:
serializer: org.apache.kafka.common.serialization.UUIDSerializer
deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value:
serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
If you want all producers to have the same config
kafka:
producers:
default:
key.serializer: org.apache.kafka.common.serialization.UUIDSerializer
value.serializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
If you want just consumers to have the same config
kafka:
consumers:
default:
key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
If you want just one consumer (with group id “my-consumer-group”) to be configured
kafka:
consumers:
my-consumer-group:
key.deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
value.deserializer: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
The same goes for producers, you can use their id to configure them specifically.
If you want to configure the serialisers and deserialisers in code
Create an instance of a io.micronaut.configuration.kafka.serde.SerdeRegistry
:
@Singleton
public class AvroSerdeRegistry implements SerdeRegistry {
@Inject
private SchemaRegistryClient schemaRegistryClient;
@SuppressWarnings("unchecked")
@Override
public <T> Serde<T> getSerde(Class<T> type) {
if (Arrays.asList(type.getInterfaces()).contains(SpecificRecord.class)) {
SpecificAvroSerde serde = new SpecificAvroSerde(schemaRegistryClient);
Map<String, String> config = Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true",
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "");
serde.configure(config, false);
return (Serde<T>) serde;
}
return null;
}
@Override
public int getOrder() {
// Before JSON Serde
return -1;
}
}
The order is important as this class will be picked up by the CompositeSerdeRegistry and all the SerdeRegistry will be tried in order until one returns a non-null Serde.
Option 2 for code configuration
If you want more control than just ordered Serde Registries, or if you want to make sure JsonSerde is never used, replace the CompositeSerdeRegistry:
@Singleton
@Replaces(CompositeSerdeRegistry.class)
public class StringSerdeRegistry implements SerdeRegistry {
@Override
public <T> Serde<T> getSerde(Class<T> type) {
return (Serde<T>) Serdes.String();
}
}
This will only ever serialise or deserialise Strings.
For Kafka Streams
By properties, for all streams
kafka:
streams:
#Micronaut default - applies this config to all streams
default:
#Kafka streams default
default:
key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
By properties, for just one stream called bank-transfer
kafka:
streams:
bank-transfer:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
In code as properties for just one stream
@Factory
public class TransferStreamFactory {
public static final String BANK_TRANSFER = "bank-transfer";
public static final String INPUT = "transfer-commands";
public static final String OUTPUT = "transfer-events";
@Singleton
@Named(BANK_TRANSFER)
KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) {
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.UUID().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
KStream<UUID, MakeTransfer> source = builder.stream(INPUT);
source.mapValues(value -> TransferEvent.newBuilder()
.setTransferId(UUID.randomUUID())
.setSrcAccountId(value.getSrcAccountId())
.setDestAccountId(value.getDestAccountId())
.setAmount(value.getAmount())
.build())
.to(OUTPUT);
return source;
}
}
In code for just one stream, with different configuration for input and output
@Factory
public class TransferStreamFactory {
public static final String BANK_TRANSFER = "bank-transfer";
public static final String INPUT = "transfer-commands";
public static final String OUTPUT = "transfer-events";
@Singleton
@Named(BANK_TRANSFER)
KStream<UUID, MakeTransfer> bankTransferStream(ConfiguredStreamBuilder builder) {
Properties props = builder.getConfiguration();
Map<String, Object> serdeConfig = Map.of(SCHEMA_REGISTRY_URL_CONFIG, props.get(SCHEMA_REGISTRY_URL_CONFIG));
SpecificAvroSerde<MakeTransfer> inputValueSerde = new SpecificAvroSerde<>();
inputValueSerde.configure(serdeConfig, false);
SpecificAvroSerde<TransferEvent> outputValueSerde = new SpecificAvroSerde<>();
outputValueSerde.configure(serdeConfig, false);
KStream<UUID, MakeTransfer> source = builder.stream(INPUT, Consumed.with(Serdes.UUID(), inputValueSerde));
source.mapValues(value -> TransferEvent.newBuilder()
.setTransferId(UUID.randomUUID())
.setSrcAccountId(value.getSrcAccountId())
.setDestAccountId(value.getDestAccountId())
.setAmount(value.getAmount())
.build())
.to(OUTPUT, Produced.with(Serdes.UUID(), outputValueSerde));
return source;
}
}
In code
You can also, by not configuring any serdes in properties or the stream code allow Micronaut to pick the serde from the SerdeRegistry, so the same advice above about adding new Serde Registries (or replacing existing ones) apply in Kafka Streams!
Conclusion
There’s lots of flexibility here. The best approach is to configure everything to be the same - so use the default properties and then use other more specific approaches as the need arises e.g. if all messages have UUID keys apart from one, configure all producers and consumers to use UUID serdes and then for the one consumer that needs it configure a String serde. This keeps configuration and repetition low. I would also favour properties over class and code configuration as it means less classes to understand when others open the project and less clutter around the business logic.
Top comments (0)