DEV Community

koseran
koseran

Posted on

<!Kafka Help Alert!>

Hello i'm working in a University project and i have really stack at this. I must create my own Kafka Serializer.I have do a lot of tries but i have this error...

The Produxer Error

this is my Producer

package org.example.kafkaApplication.Producer;
import org.example.kafkaApplication.JsonSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerMain {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

        try (org.apache.kafka.clients.producer.Producer<String, Task> producer = new KafkaProducer<>(properties)) {
            for (int i = 0; i < 20; i++) {
                String taskId = "Task" + i;
                String studentId = "Student" + i;
                String subject = "Subject" + (i % 4); // 4 different work topics
                String dateOfSubmission = "2023-01-01";

                Task task = new Task(taskId, studentId, subject, dateOfSubmission);
                ProducerRecord<String, Task> record = new ProducerRecord<>("task.events", task);
                producer.send(record);
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

this is my Object class

package org.example.kafkaApplication.Producer;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

public class Task  {

    @JsonInclude(JsonInclude.Include.NON_NULL)
    @JsonPropertyOrder({
            "taskId",
            "studentId",
            "subject",
            "dateOfSubmission",
    })

    @JsonProperty("taskId")
    private String taskId;

    @JsonProperty("studentId")
    private String studentId;

    @JsonProperty("subject")
    private String subject;

    @JsonProperty("dateOfSubmission")
    private String dateOfSubmission;

    public Task(String taskId, String studentId, String subject, String dateOfSubmission) {
        this.taskId = taskId;
        this.studentId = studentId;
        this.subject = subject;
        this.dateOfSubmission = dateOfSubmission;
    }
    @JsonProperty("taskId")
    public String getTaskId() {
        return taskId;
    }

    @JsonProperty("taskId")
    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }
    @JsonProperty("studentId")
    public String getStudentId() {
        return studentId;
    }
    @JsonProperty("studentId")
    public void setStudentId(String studentId) {
        this.studentId = studentId;
    }
    @JsonProperty("subject")
    public String getSubject() {
        return subject;
    }
    @JsonProperty("subject")
    public void setSubject(String subject) {
        this.subject = subject;
    }
    @JsonProperty("dateOfSubmission")
    public String getDateOfSubmission() {
        return dateOfSubmission;
    }

    @JsonProperty("dateOfSubmission")
    public void setDateOfSubmission(String dateOfSubmission) {
        this.dateOfSubmission = dateOfSubmission;
    }

}

Enter fullscreen mode Exit fullscreen mode

this is my jshonScema

{
  "type": "object",
  "javaType" : "org.example.kafkaApplication.Producer.Task",
  "properties": {
    "taskId": {
      "type": "string"
    },
    "studentId": {
      "type": "string"
    },
    "subject": {
      "type": "string"
    },
    "dateOfSubmission": {
      "type": "string"
    }
  },
  "required": ["taskId", "studentId", "subject", "dateOfSubmission"],
  "additionalProperties": false
}

Enter fullscreen mode Exit fullscreen mode

and this is my pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>KafkaApplication</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jackson.version>2.16.2</jackson.version>
    </properties>

    <build>
        <plugins>
            <!-- Maven Compiler Plugin-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <!-- Json Schema to POJO plugin-->
            <plugin>
                <groupId>org.jsonschema2pojo</groupId>
                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
                <version>0.5.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>generate</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                            <includeAdditionalProperties>false</includeAdditionalProperties>
                            <includeHashcodeAndEquals>false</includeHashcodeAndEquals>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.7</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.5</version>
            <scope>test</scope>
        </dependency>


        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.4.14</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.16.1</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.16.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.16.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.16.1</version>
        </dependency>


        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

    </dependencies>


</project>
Enter fullscreen mode Exit fullscreen mode

Have you any idea?

Image of Datadog

How to Diagram Your Cloud Architecture

Cloud architecture diagrams provide critical visibility into the resources in your environment and how they’re connected. In our latest eBook, AWS Solution Architects Jason Mimick and James Wenzel walk through best practices on how to build effective and professional diagrams.

Download the Free eBook

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs