DEV Community

Bob565656
Bob565656

Posted on

Streams consumer not working for AWS MSK Kafka

Hello everyone,

We are currently playing with AWS' MSK (Kafka 2.2.1). This is a 3-node cluster, and we can successfully produce events to it.

We are also able to consume events from topics using some basic consumer code (subscribe or assign).

I wanted to start playing with Kafka Streams, but for the life of me, I'm not able to get the darn thing going... it doesn't seem to want to consume events. I have tried googleing this problem but not a single result is found. All the example code I find is the same and super easy to set up.

I'm wondering if there is some broker config that needs to be specifically setup for streams to work correctly? I have enabled debug logging for Streams and I see messages like "Disconnecting from node -2 due to request timeout." The broker URL is set correctly (it's not using localhost). Not really sure what I'm doing wrong. Here is the sample code I'm using

Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-stream-5");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER);
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, LongSerde.class.getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName());
props.setProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");

KStream<String, String> kstream = builder.stream("stream-test");
kstream.print(Printed.toSysOut());

try (KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
    System.out.println("Starting");
    streams.start();

    for (int i = 0; i < 100; i++) {
        Thread.sleep(1000);
        System.out.println("Sleeping");
    }
}
System.out.println("Done");

Top comments (0)