DEV Community

John Bull
John Bull

Posted on • Updated on

Wash away the CRUD with Kafka Streaming Part3

Kafka Streams with your web app.

Pipelines

In the second post, we expanded on our API by creating a Kafka Producer that displays the data from our producer and a front end with Vue.js that displays the data. Post2

And you can find the first post here for the intro.Post1

Now we want to do some processing on our data, cool. Let's stick with our initial data feed idea, and work with a csv. Lets say one of our employees in the HR department enters some data for an employees when that employee changes in some way, this could be, leaves the company, or gets promoted, all we are going to need to do is create an event id for that employee and pass it into our message system. One advantage to this is that we have a log of all changes going back for as long as the topic in Kafka has run. Let's get to it.

So we are going to have another csv file with employees and we are going to add a field called event_id as follows.

f_name,l_name,hire_date,event_id
Berta,Manby,10/17/2015,emp_chng_02
Elsi,Geharke,10/15/2017,emp_chng_02
Fay,Kirtley,12/10/2016,emp_chng_02
Sig,Dzeniskevich,6/16/2015,emp_chng_02
Nikolas,Pounsett,12/24/2018,emp_chng_02
Sutherland,Dudney,2/5/2012,emp_chng_02
Vivianna,Zolini,3/11/2014,emp_chng_02
Bran,McQuade,8/7/2017,emp_chng_02
Enter fullscreen mode Exit fullscreen mode

At the moment we just have one event id that denotes any kind of change to the employees status, but you can see how we could have unique id's for different statuses we needed to account for.

Next, we are going to create another producer to read in these events to our employees' topic in Kafka.

//employeeChanges.js

const fs = require("fs");
const parse = require("csv-parse");

const kafka = require("kafka-node");
const Producer = kafka.Producer;
const client = new kafka.Client("localhost:2181");

const topic = "employees";

(KeyedMessage = kafka.KeyedMessage),
  (producer = new Producer(client)),
  (km = new KeyedMessage("key", "message")),
  (testProducerReady = false);


producer.on("error", function(err) {
  console.error("Problem with producing Kafka message " + err);
});

const inputFile = "./dataFiles/MOCK_Employee_Changes.csv";

var dataArray = [];

producer.on('ready', function() {
  var parser = parse({ delimiter: "," }, function(err, data) {
    dataArray = data;
    handleData(1);
  });

  fs.createReadStream(inputFile).pipe(parser);

  const handleData = currentData =>  {
    let messageArray = [];
    let line = dataArray[currentData];
    let dataNode = {
      f_name: line[0],
      l_name: line[1],
      hire_date: line[2],
      event_id: line[3]
    };
    // console.log(JSON.stringify(dataNode));

    //create individual message with csv parser and push them to an array.
    (KeyedMessage = kafka.KeyedMessage),
    (dataNodeKM = new KeyedMessage(dataNode.code, JSON.stringify(dataNode)));

    dataArray.forEach(element => {
      messageArray.push({ topic: topic, messages: dataNodeKM, partition: 0 }); 
    });
    let delay = 90;
    setTimeout(handleData.bind(null, currentData + 1), delay);
    produceDataMessage(messageArray);
  }

  const produceDataMessage = messageArray => {
    console.log(messageArray);
    payloads = messageArray;

    producer.send(payloads, function(err, data) {
      console.log(data);
      if (err) {
        console.log(
          "Sorry, TestProducer is not ready yet, failed to produce a message to kafka with error ==== " +
            err
        );
      }
    }); 
  }
});
Enter fullscreen mode Exit fullscreen mode

There isn't a huge difference between this and our other consumer other than we are adding the field for event_id. When we run this script we should see some messages in the terminal that show the event id.

{ topic: 'employees',
messages:
KeyedMessage {
magic: 0,
attributes: 0,
key: undefined,
value:
'{"f_name":"Bran","l_name":"McQuade","hire_date":"8/7/2017","event_id":"emp_chng_02"}',
timestamp: 1555012556198 }

Now let's add some code to our frontend so that the employees with changes will be displayed.

In our Employees component, we are already looking for event id's in our API call and pushing them to an array on line 44.

    if (element.event_id==="emp_chng_02") {
          this.employeesWithChanges.push(element)
        } else {
          this.employees.push(element)
        }
Enter fullscreen mode Exit fullscreen mode

So now all we need to do is add that to the template as such.


 <div class="employee-card" v-for="employee in employeesWithChanges">
        <div>{{employee.f_name }} {{employee.l_name}}</div> 
        <div>Hire date: {{employee.hire_date}}</div>  

        <div class="alert">This employee had a change</div>
 </div>
Enter fullscreen mode Exit fullscreen mode


Now we should see our employees that had changes show up in the front end. It should look something like this.

employees with changes

Based on this example, it is hopefully easier to see how we might be able to use message streaming to implement a web application without a traditional database and query based system, where we could write and read data with Kafka producers and consumers. Confluent has built some helpful database connectors that read changes from your databases into the event system. Using all the various tools for Kafka a team can effectively marry their legacy systems with their more modern systems and easily pipe all that data into the cloud. It was quite a journey for me, and I want to be honest when I say I didn't get Kafka streams or understand how it could be used until I started looking for ways to use it in real worldish scenarios. Hopefully, Kafka Streams and message systems can become one of the many tools in your tool belt also.

Please reach out to me, follow me on twitter @jbull328, find me on the Linked In

Top comments (0)