DEV Community

koseran
koseran

Posted on

<!Kafka Help Alert!>

Hello i'm working in a University project and i have really stack at this. I must create my own Kafka Serializer.I have do a lot of tries but i have this error...

The Produxer Error

this is my Producer

package org.example.kafkaApplication.Producer;
import org.example.kafkaApplication.JsonSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class ProducerMain {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

        try (org.apache.kafka.clients.producer.Producer<String, Task> producer = new KafkaProducer<>(properties)) {
            for (int i = 0; i < 20; i++) {
                String taskId = "Task" + i;
                String studentId = "Student" + i;
                String subject = "Subject" + (i % 4); // 4 different work topics
                String dateOfSubmission = "2023-01-01";

                Task task = new Task(taskId, studentId, subject, dateOfSubmission);
                ProducerRecord<String, Task> record = new ProducerRecord<>("task.events", task);
                producer.send(record);
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

this is my Object class

package org.example.kafkaApplication.Producer;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

public class Task  {

    @JsonInclude(JsonInclude.Include.NON_NULL)
    @JsonPropertyOrder({
            "taskId",
            "studentId",
            "subject",
            "dateOfSubmission",
    })

    @JsonProperty("taskId")
    private String taskId;

    @JsonProperty("studentId")
    private String studentId;

    @JsonProperty("subject")
    private String subject;

    @JsonProperty("dateOfSubmission")
    private String dateOfSubmission;

    public Task(String taskId, String studentId, String subject, String dateOfSubmission) {
        this.taskId = taskId;
        this.studentId = studentId;
        this.subject = subject;
        this.dateOfSubmission = dateOfSubmission;
    }
    @JsonProperty("taskId")
    public String getTaskId() {
        return taskId;
    }

    @JsonProperty("taskId")
    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }
    @JsonProperty("studentId")
    public String getStudentId() {
        return studentId;
    }
    @JsonProperty("studentId")
    public void setStudentId(String studentId) {
        this.studentId = studentId;
    }
    @JsonProperty("subject")
    public String getSubject() {
        return subject;
    }
    @JsonProperty("subject")
    public void setSubject(String subject) {
        this.subject = subject;
    }
    @JsonProperty("dateOfSubmission")
    public String getDateOfSubmission() {
        return dateOfSubmission;
    }

    @JsonProperty("dateOfSubmission")
    public void setDateOfSubmission(String dateOfSubmission) {
        this.dateOfSubmission = dateOfSubmission;
    }

}

Enter fullscreen mode Exit fullscreen mode

this is my jshonScema

{
  "type": "object",
  "javaType" : "org.example.kafkaApplication.Producer.Task",
  "properties": {
    "taskId": {
      "type": "string"
    },
    "studentId": {
      "type": "string"
    },
    "subject": {
      "type": "string"
    },
    "dateOfSubmission": {
      "type": "string"
    }
  },
  "required": ["taskId", "studentId", "subject", "dateOfSubmission"],
  "additionalProperties": false
}

Enter fullscreen mode Exit fullscreen mode

and this is my pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>KafkaApplication</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jackson.version>2.16.2</jackson.version>
    </properties>

    <build>
        <plugins>
            <!-- Maven Compiler Plugin-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <!-- Json Schema to POJO plugin-->
            <plugin>
                <groupId>org.jsonschema2pojo</groupId>
                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
                <version>0.5.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>generate</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                            <includeAdditionalProperties>false</includeAdditionalProperties>
                            <includeHashcodeAndEquals>false</includeHashcodeAndEquals>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.7</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.5</version>
            <scope>test</scope>
        </dependency>


        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.4.14</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.16.1</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.16.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.16.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.16.1</version>
        </dependency>


        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

    </dependencies>


</project>
Enter fullscreen mode Exit fullscreen mode

Have you any idea?

Top comments (0)