DEV Community

Eduard Gert
Eduard Gert

Posted on • Edited on

16 1

Kafka, AVRO and TypeScript?

In this article I want to show a simple example of how you can produce and consume Kafka messages with the AVRO format using TypeScript/JavaScript and KafkaJS.

What is Kafka?

Apache Kafka is a very popular event streaming platform and used in a lot of companies right now. If you want to learn more about Kafka, check out the official website.

However, since the whole ecosystem is based on JVM (Java, Scala, Kotlin), I never really checked for clients in other languages.

Recently I was playing around with a project in TypeScript and since it would have been handy to stream the results directly into Kafka, I checked for a JavaScript client and found KafkaJS. And it even plays well with AVRO.

How to use it?

Here is a simple example for an AVRO producer and consumer.

Set up a new node project and install these two dependencies. The schema registry is required to work with AVRO schemas.

npm install kafkajs @kafkajs/confluent-schema-registry
Enter fullscreen mode Exit fullscreen mode

Configuring the Kafka connection

This example is in TypeScript but in JS it would work more or less in a similar way.
First import all the dependencies and configure all Kafka related settings.

import { Kafka } from "kafkajs";
import {
  SchemaRegistry,
  readAVSCAsync,
} from "@kafkajs/confluent-schema-registry";

const TOPIC = "my_topic";

// configure Kafka broker
const kafka = new Kafka({
  clientId: "some-client-id",
  brokers: ["localhost:29092"],
});

// If we use AVRO, we need to configure a Schema Registry
// which keeps track of the schema
const registry = new SchemaRegistry({
  host: "http://localhost:8085",
});

// create a producer which will be used for producing messages
const producer = kafka.producer();

const consumer = kafka.consumer({
  groupId: "group_id_1",
});

// declaring a TypeScript type for our message structure
declare type MyMessage = {
  id: string;
  value: number;
};
Enter fullscreen mode Exit fullscreen mode

Create an AVRO schema

Now we need to make sure we can encode messages in AVRO. Therefore we need to be able to read a schema from a file and register it in the schema registry.

This is how the schema in this example will look like. Pretty straightforward, two fields called id which is a string and value which is an integer.
Insert this to a file called schema.avsc, we will use the confluent-schema-registry package to read it and register the schema in the schema registry.

{
  "name": "example",
  "type": "record",
  "namespace": "com.my.company",
  "doc": "Kafka JS example schema",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "value",
      "type": "int"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Register an AVRO schema

Here is the function which we will use to read an AVRO schema from a file and register it in the schema registry.

// This will create an AVRO schema from an .avsc file
const registerSchema = async () => {
  try {
    const schema = await readAVSCAsync("./avro/schema.avsc");
    const { id } = await registry.register(schema);
    return id;
  } catch (e) {
    console.log(e);
  }
};
Enter fullscreen mode Exit fullscreen mode

Produce a message using the AVRO schema

This is how we can build a producer. Before pushing a message (of type MyMessage which we defined above) we will encode it using the AVRO schema from the registry.

// push the actual message to kafka
const produceToKafka = async (registryId: number, message: MyMessage) => {
  await producer.connect();

  // compose the message: the key is a string
  // the value will be encoded using the avro schema
  const outgoingMessage = {
    key: message.id,
    value: await registry.encode(registryId, message),
  };

  // send the message to the previously created topic
  await producer.send({
    topic: TOPIC,
    messages: [outgoingMessage],
  });

  // disconnect the producer
  await producer.disconnect();
};
Enter fullscreen mode Exit fullscreen mode

Create a Kafka topic

You can skip this if the topic is already present. Before we can produce a message, we need to have a topic. This function also checks if the topic is already present in case you run this multiple times.

// create the kafka topic where we are going to produce the data
const createTopic = async () => {
  try {
    const topicExists = (await kafka.admin().listTopics()).includes(TOPIC);
    if (!topicExists) {
      await kafka.admin().createTopics({
        topics: [
          {
            topic: TOPIC,
            numPartitions: 1,
            replicationFactor: 1,
          },
        ],
      });
    }
  } catch (error) {
    console.log(error);
  }
};
Enter fullscreen mode Exit fullscreen mode

Now we create our producer and consumer functions which publish an example message and consume it again.

const produce = async () => {
  await createTopic();
  try {
    const registryId = await registerSchema();
    // push example message
    if (registryId) {
      const message: MyMessage = { id: "1", value: 1 };
      await produceToKafka(registryId, message);
      console.log(`Produced message to Kafka: ${JSON.stringify(message)}`);
    }
  } catch (error) {
    console.log(`There was an error producing the message: ${error}`);
  }
};

async function consume() {
  await consumer.connect();

  await consumer.subscribe({
    topic: TOPIC,
    fromBeginning: true,
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      if (message.value) {
        const value: MyMessage = await registry.decode(message.value);
        console.log(value);
      }
    },
  });
}
Enter fullscreen mode Exit fullscreen mode

And finally we execute both functions one after another.

produce()
  .then(() => consume())
Enter fullscreen mode Exit fullscreen mode

The console should print something like:

Produced message to Kafka: {"id":"1","value":1}
Consumed message from Kafka: Example { id: '1', value: 1 }
Enter fullscreen mode Exit fullscreen mode

Demo repository with this code

I created a repository to demo this example. There is a docker-compose file which takes care of setting up a Kafka Broker and a Schema Registry.

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay