<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Sid</title>
    <description>The latest articles on DEV Community by Sid (@ssiidd).</description>
    <link>https://dev.to/ssiidd</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F641333%2F0498aaa2-6678-4122-be89-93209d6028c1.jpg</url>
      <title>DEV Community: Sid</title>
      <link>https://dev.to/ssiidd</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ssiidd"/>
    <language>en</language>
    <item>
      <title>Sentiment Analysis using Kafka, Apache Spark</title>
      <dc:creator>Sid</dc:creator>
      <pubDate>Tue, 02 Aug 2022 22:08:00 +0000</pubDate>
      <link>https://dev.to/ssiidd/sentiment-analysis-using-kafka-apache-spark-4ge</link>
      <guid>https://dev.to/ssiidd/sentiment-analysis-using-kafka-apache-spark-4ge</guid>
      <description>&lt;p&gt;Hello dev community! I would like to share my learnings on how to make an streaming service for sentiment analysis of customer interaction with e-commerce application.&lt;/p&gt;

&lt;p&gt;The technologies used are Apache-Kafka, Zookeeper, Apache-Spark, Docker, and Cassandra. &lt;/p&gt;

&lt;p&gt;Yeah, so let's start 😃&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Prerequisites:&lt;/strong&gt;&lt;br&gt;
I have used the following versions&lt;br&gt;
spark-2.1.1-bin-hadoop2.7&lt;br&gt;
kafka_2.11-0.9.0.0&lt;br&gt;
Cassandra 4.0.5&lt;br&gt;
Docker version 20.10.7, build 20.10.7-0ubuntu5~20.04.2&lt;br&gt;
openjdk version 11.0.14.1&lt;br&gt;
Python3.8&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;The architecture&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--3RyG7l2o--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/gbkdpcwcs37c0zynb24u.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--3RyG7l2o--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/gbkdpcwcs37c0zynb24u.png" alt="Streaming architecture" width="880" height="399"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Let's know what are these in short&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Apache Kafka is an event streaming platform which has capabilities to publish, subscribe, store, and process the stream of events.&lt;/li&gt;
&lt;li&gt;Apache ZooKeeper is a service used by a cluster (group of nodes) to coordinate between themselves and maintain shared data with robust synchronization.&lt;/li&gt;
&lt;li&gt;Cassandra is a high-performance distributed database designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. It is a type of NoSQL database.&lt;/li&gt;
&lt;li&gt;Docker is an containerization platform which enables to package application into containers which simplifies building, shipping, running tasks.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;ZooKeeper daemon (process), runs in the name of &lt;em&gt;QuorunPeerMain&lt;/em&gt; is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages. Kafka daemon consists of kafka-producer and kafka-consumer. Kafka-producer will listen to the click events on the web application and create event based data, which is published to one of the Kafka topics which is further consumed by kafka consumer.&lt;br&gt;
This consumed data is sent to spark using a kafka to spark connector.&lt;/p&gt;

&lt;p&gt;Note that Kafka is a data pipeline tool for streaming data, and Spark is a big-data processing tool.&lt;/p&gt;

&lt;p&gt;Spark consists of master and worker processes. The master is the driver that runs the main() program where the spark context is created. It then interacts with the cluster manager to schedule the job execution and perform the tasks. 2. The worker consists of processes that can run in parallel to perform the tasks scheduled by the driver program.&lt;/p&gt;

&lt;p&gt;Cassandra is connected to spark via a connector trough which the data is provided and stored in the database.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Implementation&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I'm using an ubuntu virtual machine.&lt;br&gt;
Download the zip files and unzip them.&lt;/p&gt;

&lt;p&gt;Navigate into the spark directory and start all the spark daemons.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sbin/start-all.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can check the status which the help of &lt;code&gt;jps&lt;/code&gt; command which lists the instrumented Java HotSpot VMs on the target system.&lt;br&gt;
&lt;em&gt;Master&lt;/em&gt;, &lt;em&gt;Worker&lt;/em&gt; would be appeared in the list.&lt;/p&gt;

&lt;p&gt;Now, navigate to zookeeper directory and start the zookeeper which acts as a metadata service for Kafka. To run this command in background we use &lt;code&gt;nohup&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;nohup bin/zookeeper-server-start.sh config/zookeeper.properties &amp;gt;&amp;gt; nohup.out &amp;amp; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(press enter if required)&lt;br&gt;
&lt;em&gt;QuorumPeerMain&lt;/em&gt; process is started which you can verify using &lt;code&gt;jps&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Similarly, start Kafka daemon&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;nohup bin/kafka-server-start.sh config/server.properties &amp;gt;&amp;gt; nohup.out &amp;amp;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You would now see &lt;em&gt;Kafka&lt;/em&gt; process using &lt;code&gt;jps&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;I have used docker container for running cassandra, but you can do it without using docker using command &lt;code&gt;nohup bin/cassandra -f&lt;/code&gt; and check if you are able to access &lt;code&gt;cqlsh&lt;/code&gt; shell. I was facing problems accessing cqlsh shell as I had installed and uninstalled multiple versions of Cassandra.&lt;/p&gt;

&lt;p&gt;Using docker, pull the cassandra latest version image and create a container out of it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker pull cassandra:latest
docker run --name cassandra -p 127.0.0.1:9042:9042 -p 127.0.0.1:9160:9160 -d cassandra
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To get access of cqlsh shell inside the container, we do the following.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker exec -it cassandra bash
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that the name cassandra is just name of container and can be any other name.&lt;br&gt;
You may face some issues, like when you accidently create multiple container, or other things which you can fix using some docker commands referring the &lt;a href="https://docs.docker.com/get-started/overview/"&gt;documentation&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Now, create a topic in Kafka, here topic1 is the topic name.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Before moving onto spark, we will first setup cassandra database.&lt;br&gt;
In cqlsh shell create a database and a table with the following schema.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;create kespace sparkdata with replication ={'class':'simpleStrategy', 'replication_factor':1};
use sparkdata;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--wxdj3Jvm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/5aavyc1n1uv933gl0hs1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--wxdj3Jvm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/5aavyc1n1uv933gl0hs1.png" alt="Image description" width="880" height="193"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE cust_data (fname text , lname text , url text,product text , cnt counter ,primary key (fname,lname,url,product));
select * from cust_data;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Cassandra will do an upsert if you try to add records with a primary key that already exists. &lt;/p&gt;

&lt;p&gt;Now coming to spark, run this commands to get the required packages which help in connecting kafka with spark and spark with cassandra.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.0.2","org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You may face some errors, as I had faced, but just trying some other ways and again executing the same command worked for me, strange isn't it? The errors could be because of bintray is not longer active.&lt;br&gt;
I tried &lt;a href="https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.0.0"&gt;this&lt;/a&gt; and &lt;a href="https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.11/2.0.2"&gt;this&lt;/a&gt; sources.&lt;br&gt;
You may need to manually download the packages.&lt;/p&gt;

&lt;p&gt;Open spark shell using &lt;code&gt;bin/spark-shell&lt;/code&gt; and import the packages which we downloaded. Before proceeding, stop the spark process within the scala terminal using &lt;code&gt;spark.stop&lt;/code&gt;, as previous spark instance is already running, and only one spark session can be running at a time.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;spark.stop

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.streaming._

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Further, in the same shell, we create an spark application and configure its settings. Here, app name is &lt;em&gt;KafkaSparkStreaming&lt;/em&gt; and &lt;code&gt;127.0.0.1&lt;/code&gt; is the IP address of the cassandra instance which is running.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val sparkConf = new SparkConf().setAppName("KafkaSparkStreaming").set("spark.cassandra.connection.host", "127.0.0.1")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. We create a new SparkContext, and create a stream between kafka and spark.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;val ssc = new StreamingContext(sparkConf, Seconds(20))
val topicpMap = "mytopic".split(",").map((_, 1.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "sparkgroup", topicpMap).map(_._2)
lines.print();
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The consumed message by kafka-consumer is published to spark which processes with by mapping the message data to corresponding columns of the table schema that we created before, and is saved to the database.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;lines.map(line =&amp;gt; { val arr = line.split(","); (arr(0),arr(1),arr(2),arr(3),arr(4)) }).saveToCassandra("sparkdata", "cust_data", SomeColumns("fname", "lname","url","product","cnt"))

ssc.start      // starts the JobScheduler which inturn starts jobGenerator which creates jobs

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--kdiS3TxF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/xstdigiho2h6gca346nq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--kdiS3TxF--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/xstdigiho2h6gca346nq.png" alt="Image description" width="880" height="231"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congrats, you made it till the implementation part. 🎉&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Testing&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Now to test this, we create new sessions of kafka-producer and kafka-consumer, and leave the old sessions of spark-shell and cqlsh undisturbed.&lt;/p&gt;

&lt;p&gt;As, I haven't created a web interface currently which I plan to do in near future, we can test it manually on the terminal.&lt;/p&gt;

&lt;p&gt;Create kafka-producer and kafka-consumer in different terminals.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We then manually enter this lines one by one, in kafka-producer terminal as if it produces this data from the web-application&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rahul,patil,http://127.03.32,Microsoft Xbox,1
rohan,G,http://127.24.43,iPhone 13 Pro,1

rahul,patil,http://127.03.32,Microsoft Xbox,1
aditya,chouglae,http://127.55.33,Boatstone 1000,1
rahul,patil,http://127.03.32,Sony PlayStation,1
rahul,patil,http://127.03.32,Microsoft Xbox,1
rohan,G,http://127.24.43,iPhone 13 Pro,1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--_TqpBiT6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0o3qcxkf8krvypsw2had.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--_TqpBiT6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/0o3qcxkf8krvypsw2had.png" alt="Image description" width="880" height="160"&gt;&lt;/a&gt;&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--NM_iJsWZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/lgyuyagika5oc5n95azr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--NM_iJsWZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/lgyuyagika5oc5n95azr.png" alt="Image description" width="880" height="177"&gt;&lt;/a&gt;&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--81JgHioJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/90qx5c5m4zitkx4ii2r4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--81JgHioJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/90qx5c5m4zitkx4ii2r4.png" alt="Image description" width="678" height="531"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Notice that the message is been passed to kafka-consumer which in turn streams to spark, which processes the message and stores it into Cassandra. &lt;br&gt;
We can observe that the count get incremented as it is not the part of primary key. In this way we can know how may times the user has visited a particular page in an ecommerce website, and using this data we can further analyse and give discount to the user accordingly so that our product sales in increase effectively, and also know the factors due to which user is product sales are not good enough, and what can be made better.&lt;/p&gt;

&lt;p&gt;Finally, we have completed the architecture as planned in the diagram successfully.&lt;/p&gt;

&lt;p&gt;References:&lt;br&gt;
&lt;a href="https://bit.ly/3zqU5dZ"&gt;This&lt;/a&gt; was the initial reference and many other sites like StackOverflow (ofcourse haha), cassandra docs, &lt;a href="https://www.datastax.com/"&gt;datastax&lt;/a&gt;, GitHub issues, and many more.&lt;/p&gt;

</description>
      <category>spark</category>
      <category>kafka</category>
      <category>cassandra</category>
      <category>docker</category>
    </item>
  </channel>
</rss>
