DEV Community

Cover image for Getting Started With Apache Kafka and Java
Clivern
Clivern

Posted on

2 2

Getting Started With Apache Kafka and Java

Installation

To add a dependency using Maven, use the following:

<dependency>
    <groupId>com.clivern</groupId>
    <artifactId>kafka-sdk</artifactId>
    <version>0.1.0</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

To add a dependency using Gradle, use the following:

dependencies {
    compile 'com.clivern:kafka-sdk:0.1.0'
}
Enter fullscreen mode Exit fullscreen mode

To add a dependency using Scala SBT, use the following:

libraryDependencies += "com.clivern" % "kafka-sdk" % "0.1.0"
Enter fullscreen mode Exit fullscreen mode

To Create a Kafka Topic:

import java.util.HashMap;
import com.clivern.kafka.Configs;
import com.clivern.kafka.Utils;


HashMap<String, String> map = new HashMap<String, String>();
map.put("bootstrap.servers", "localhost:9092");
Utils.createTopic("clivern", Configs.fromMap(map));
Enter fullscreen mode Exit fullscreen mode

Kafka Producer:

import com.clivern.kafka.Configs;
import com.clivern.kafka.Producer;
import com.clivern.kafka.Kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


Configs configs = new Configs();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

Producer producer = (new Kafka()).newProducer(configs);

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record =
            new ProducerRecord<>("clivern", null, "Hello World " + i);

    producer.send(record).flush();
}

producer.close();
Enter fullscreen mode Exit fullscreen mode

Kafka Consumer:

import com.clivern.kafka.Configs;
import com.clivern.kafka.Consumer;
import com.clivern.kafka.Kafka;
import com.clivern.kafka.HandlerCallbackInterface;
import com.clivern.kafka.FailureCallbackInterface;
import com.clivern.kafka.SuccessCallbackInterface;
import com.clivern.kafka.exception.MissingHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;


Configs configs = new Configs();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "clivern");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Consumer consumer = (new Kafka()).newConsumer(configs);

HandlerCallbackInterface<ConsumerRecord<String, String>> handler =
        (record) -> {
            System.out.println("Message Received: " + record.value());

            // Throw error if message has error
            if (record.value().equals("error")) {
                throw new Exception("Error!");
            }
        };

SuccessCallbackInterface<ConsumerRecord<String, String>> onSuccess =
        (record) -> {
            System.out.println("Message Succeeded: " + record.value());
        };

FailureCallbackInterface<ConsumerRecord<String, String>> onFailure =
        (record, exception) -> {
            System.out.println(
                    "Message " + record.value() + " Failed: " + exception.getMessage());
        };

consumer.subscribe("clivern")
        .handler(handler)
        .onSuccess(onSuccess)
        .onFailure(onFailure)
        .run();
Enter fullscreen mode Exit fullscreen mode

Please don't forget to replace localhost with kafka host.

For starters, don't miss this tutorial https://dev.to/clivern/getting-started-with-kafka-3mbi

AWS Security LIVE!

Join us for AWS Security LIVE!

Discover the future of cloud security. Tune in live for trends, tips, and solutions from AWS and AWS Partners.

Learn More

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs