Written by Alexander Nnakwue✏️
Introduction
Most large tech companies get data from their users in various ways, and most of the time, this data comes in raw form. In an intelligible and usable format, data can help drive business needs. The challenge is to process and, if necessary, transform or clean the data to make sense of it.
Basic data streaming applications move data from a source bucket to a destination bucket. More complex applications that involve streams perform some magic on the fly, like altering the structure of the output data or enriching it with new attributes or fields.
In this post, we will learn how to build a minimal real-time data streaming application using Apache Kafka. The post will also address the following:
- Kafka and ZooKeeper as our tools
- Batch data processing and storage
- Installing and running Kafka locally
- Bootstrapping our application
- Installing dependencies
- Creating a Kafka topic
- Producing to the created topic
- Consuming from a topic
According to its website, Kafka is an open-source, highly distributed streaming platform. Built by the engineers at LinkedIn (now part of the Apache software foundation), it prides itself as a reliable, resilient, and scalable system that supports streaming events/applications. It is horizontally scalable, fault-tolerant by default, and offers high speed.
Kafka has a variety of use cases, one of which is to build data pipelines or applications that handle streaming events and/or processing of batch data in real-time.
Using Apache Kafka, we will look at how to build a data pipeline to move batch data. As a little demo, we will simulate a large JSON data store generated at a source.
Afterwards, we will write a producer script that produces/writes this JSON data from a source at, say, point A to a particular topic on our local broker/cluster Kafka setup. Finally, we will write a consumer script that consumes the stored data from the specified Kafka topic.
Note: Data transformation and/or enrichment is mostly handled as it is consumed from an input topic to be used by another application or an output topic.
This is a very common scenario in data engineering, as there is always a need to clean up, transform, aggregate, or even reprocess usually raw and temporarily stored data in a Kafka topic to make it conform to a particular standard or format.
Prerequisites
For you to follow along with this tutorial, you will need:
- The latest versions of Node.js and npm installed on your machine
- The latest Java version (JVM) installed on your machine
- Kafka installed on your local machine. In this tutorial, we will be running through installing Kafka locally on our machines
- A basic understanding of writing Node.js applications
However, before we move on, let’s review some basic concepts and terms about Kafka so we can easily follow along with this tutorial.
ZooKeeper
Kafka is highly dependent on ZooKeeper, which is the service it uses to keep track of its cluster state. ZooKeeper helps control the synchronization and configuration of Kafka brokers or servers, which involves selecting the appropriate leaders. For more detailed information on ZooKeeper, you can check its awesome documentation.
Topic
Kafka topics are a group of partitions or groups across multiple Kafka brokers. To have a clearer understanding, the topic acts as an intermittent storage mechanism for streamed data in the cluster. For each Kafka topic, we can choose to set the replication factor and other parameters like the number of partitions, etc.
Producers, consumers, and clusters
Producers are clients that produce or write data to Kafka brokers or Kafka topics to be more precise. Consumers, on the other hand, read data or — as the name implies — consume data from Kafka topics or Kafka brokers. A cluster is simply a group of brokers or servers that powers a current Kafka instance.
For more detailed information on all these vital concepts, you can check this section of the Apache Kafka documentation.
Installing Kafka
To install Kafka, all we have to do is download the binaries here and extract the archive. We do so by running the following command on our terminal or command prompt:
cd <location-of-downloaded-kafka-binary>
tar -xzf <downloaded-kafka-binary>
cd <name-of_kafka-binary>
The tar
command extracts the downloaded Kafka binary. After that, we navigate to the directory where Kafka is installed. We will see all the files shown below:
Note: The Kafka binaries can be downloaded on any path we so desire on our machines. Also, at the time of writing this article, the latest Kafka version is 2.3.0.
Additionally, if we go a level up (cd ..
), we will find a config
folder inside the downloaded Kafka binary directory. Here, we can configure our Kafka server and include any changes or configurations we may want. Now, let’s play along:
cd ..
ls
cd config
ls
nano server.properties
Now that we know where to configure our Kafka server, it is time to learn how to use Kafka. Later on, we will learn about the fields that we can reconfigure or update on the server.properties
file.
In this tutorial, we will be using the kafka-node client library for Node.js. Note that Kafka has other clients for other programming languages as well, so feel free to use Kafka for any other language of your choice.
Kafka to the rescue
Since we are using Node.js in this exercise, we will begin by bootstrapping a basic application with a minimal structure. To begin, we will create a new directory to house our project and navigate into it, as shown below:
mkdir kafka-sample-app
cd kafka-sample-app
Then we can go ahead and create a package.json
file by running the npm init
command.
Now we can follow the instructions to set up our project as usual. Our package.json
file should look like this when we are done:
{
"name": "kafka-producer_consumer_tutorial",
"version": "1.0.0",
"description": "Building a real-time data streaming application pipeline with Apache Kafka",
"main": "app.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node producer.js"
},
"author": "Alexander Nnakwue",
"license": "MIT",
"dependencies": {
"dotenv": "^8.2.0",
"kafka-node": "^4.1.3"
}
}
Here we have installed two dependencies we will need later on. To install our kafka-node client, we run npm install kafka-node
on the terminal. The documentation for kafka-node is available on npm. The dotenv
package is used for setting up environment variables for our app. To install the package, we can run npm install dotenv
.
Now that we are done installing the dependencies, we can now go ahead and create all the necessary files as shown in the figure below:
The figure above shows all the necessary files needed by our application. Let’s look at each file and understand what is going on.
First of all, to create a new topic manually from the terminal, we can use the command below:
./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
Note that we should not forget to update the <ZOOKEEPER_URL:PORT>
, <NO_OF_REPLICATIONS>
, <NO_OF_PARTITIONS>
, and <TOPIC_NAME>
with real values.
However, in this tutorial, we have a script that handles that for us. The code for creating a new topic can be found in the createTopic.js
file. The code is also shown below:
const kafka = require('kafka-node');
const config = require('./config');
const client = new kafka.KafkaClient({kafkaHost: config.KafkaHost});
const topicToCreate = [{
topic: config.KafkaTopic,
partitions: 1,
replicationFactor: 1
}
];
client.createTopics(topicToCreate, (error, result) => {
// result is an array of any errors if a given topic could not be created
console.log(result, 'topic created successfully');
});
Here, we import the Kafka client and connect to our Kafka setup. You might notice that we never configured a replication factor in our use case. However, this does not mirror a real-life scenario.
In production use cases, we can set up multiple Kafka brokers based on the volume of data or messages we intend to process. Let’s see how we can accomplish that in our local setup.
- Navigate to the config directory in our downloaded binary
cd config
- Open the Kafka
server.properties
file. This file contains all the config for our Kafka server setup. We can open the file using thenano server.properties
command - Now, we can create multiple copies of this file and just alter a few configurations on the other copied files. What we mean here is that in the duplicated files, we can go ahead and change some unique fields like the
broker.id
,log.dirs
, and the broker or host port. For more information on configuring our Kafka setup, you can check the documentation
After creating a topic, we can now produce or write data to it. The code for writing to a topic is found in the producer.js
file. The code is shown below:
const Kafka = require('kafka-node');
const config = require('./config');
const Producer = Kafka.Producer;
const client = new Kafka.KafkaClient({kafkaHost: config.KafkaHost});
const producer = new Producer(client, {requireAcks: 0, partitionerType: 2});
const pushDataToKafka =(dataToPush) => {
try {
let payloadToKafkaTopic = [{topic: config.KafkaTopic, messages: JSON.stringify(dataToPush) }];
console.log(payloadToKafkaTopic);
producer.on('ready', async function() {
producer.send(payloadToKafkaTopic, (err, data) => {
console.log('data: ', data);
});
producer.on('error', function(err) {
// handle error cases here
})
})
}
catch(error) {
console.log(error);
}
};
const jsonData = require('./app_json.js');
pushDataToKafka(jsonData);
Here, we imported the kafka-node library and set up our client to receive a connection from our Kafka broker. Once that connection is set up, we produce our data to the specified Kafka topic. Note that in real-world applications, we are meant to close the client’s connection once done by calling the client.close()
method.
Now when we run our start script with the ./start.sh
command, we get the data written to our Kafka topic.
npm start
To read data from the topic, we can use our consumer script in the consumer.js
file by running node ./consumer.js
. We get the following output:
The code for the consumer.js
file is also shown below:
const kafka = require('kafka-node');
const config = require('./config');
try {
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({idleConnection: 24 * 60 * 60 * 1000, kafkaHost: config.KafkaHost});
let consumer = new Consumer(
client,
[{ topic: config.KafkaTopic, partition: 0 }],
{
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'utf8',
// fromOffset: false
}
);
consumer.on('message', async function(message) {
console.log(
'kafka ',
JSON.parse(message.value)
);
})
consumer.on('error', function(error) {
// handle error
console.log('error', error);
});
}
catch(error) {
// catch error trace
console.log(error);
}
Here, we connect to the Kafka client and consume from the predefined Kafka topic.
Note: Once we are done with our setup and we want to start our application, we have to first start the ZooKeeper server. After that, we can start up our Kafka server. This is because Kafka depends on ZooKeeper to run.
To start the ZooKeeper server, we can run the following command from our terminal:
bin/zookeeper-server-start.sh config/zookeeper.properties
To start up our Kafka server, we can run:
bin/Kafka-server-start.sh config/server.properties
As an aside, we can check the number of available Kafka topics in the broker by running this command:
bin/Kafka-topics.sh --list --zookeeper localhost:2181
Finally, we can also consume data from a Kafka topic by running the consumer console command on the terminal, as shown below:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-example-topic --from-beginning
Additionally, Kafka provides a script to manually allow developers to create a topic on their cluster. The script is shown below:
./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>
Note: We need to compulsorily start the ZooKeeper and Kafka server respectively on separate terminal windows before we can go ahead and create a Kafka topic.
Finally, the code for this tutorial is available on this GitHub repo. To get a feel of the design philosophy used for Kafka, you can check this section of the documentation. In a future tutorial, we can look at other tools made available via the Kafka API, like Kafka streams and Kafka connect. For an introduction, you can check this section of the documentation.
Summary
In sum, Kafka can act as a publisher/subscriber kind of system, used for building a read-and-write stream for batch data just like RabbitMQ. It can also be used for building highly resilient, scalable, real-time streaming and processing applications. Note that this kind of stream processing can be done on the fly based on some predefined events.
Additionally, just like messaging systems, Kafka has a storage mechanism comprised of highly tolerant clusters, which are replicated and highly distributed. By replication we mean data can be spread across multiple different clusters, keeping data loss in the entire chain to the barest minimum.
Overall, Kafka can be incorporated into other systems as a standalone plugin. In this case, it can independently scale based on need. What this means is that we can scale producers and consumers independently, without causing any side effects for the entire application.
Finally, we have been able to see that building a data pipeline involves moving data from a source point, where it is generated (note that this can also mean data output from another application), to a destination point, where it is needed or consumed by another application. Now we can go ahead and explore other more complex use cases.
In case you might have any questions, don’t hesitate to engage me in the comment section below or hit me up on Twitter.
Editor's note: Seeing something wrong with this post? You can find the correct version here.
Plug: LogRocket, a DVR for web apps
LogRocket is a frontend logging tool that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.
In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.
Try it for free.
The post Building a real-time data streaming app with Apache Kafka appeared first on LogRocket Blog.
Top comments (5)
Hi, thanks for the tutorial.
I tried following the steps but I am getting the following error when running ./start.sh
(node:10533) UnhandledPromiseRejectionWarning: TypeError: Cannot read property 'length' of undefined
at /mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/kafka-node/lib/protocol/protocol.js:483:27
at Array.forEach ()
at _encodeMetadataRequest (/mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/kafka-node/lib/protocol/protocol.js:482:10)
at Object.encodeMetadataV1Request as encoder
at /mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/kafka-node/lib/kafkaClient.js:912:41
at /mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/async/dist/async.js:3880:24
at replenish (/mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/async/dist/async.js:1011:17)
at /mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/async/dist/async.js:1016:9
at eachOfLimit (/mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/async/dist/async.js:1041:24)
at /mnt/c/Users/thomastai/Projects/kafka_producer_consumer_tutorial/node_modules/async/dist/async.js:1046:16
(node:10533) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:10533) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
Hello there, I can't seem to reproduce same error. Can you please have a look at the GitHub repo where this project is hosted and compare with what you have done.
Also, make sure everything is properly setup on your end.
Thanks for checking it out and do let me know if you are able to get it up and running. I'm here to assist in anyway I can...
Thanks for this
You are very much welcome Marcos👍
hi i have the same problem if you guys found the solution i would greatly appreciate it sharing it with me