In this post, I will share my data engineering course final project from the previous semester. The task was to create an ETL process for streaming data. The streamed data sources are from Twitter and fetched using Kafka. There are some additional tasks, like using Airflow for scheduling all tasks and creating a dashboard. in this project we stream data to fetch tweet about Windows 11. So the output of this project after Stream and ETL process is a sentiment analysis about Windows 11.
Installing Kafka
Before install Kafka, make sure that you have Java version 1.8.0 and scala. Then head to https://kafka.apache.org/downloads to download the installation file and choose the file that has the same scala version that has been installed in your computer, because my laptop has Scala with 2.12 version so we going to download Kafka with version 3.0.0. Installing Kafka is pretty easy, you just need to extract the file somewhere like in home directory. After extracting the file, change the snapshot directory in zookepeer.properties and log directory in server.properties to your kafka installation like this.
for snapshot directory
dataDir=/home/USER/kafka/data/zookeeper
for log directory
log.dirs=/home/USER/kafka/data/kafka
After adding both directories, you can use Kafka right away. You can add Kafka to your path environment variable for easier use. Additionally, if your Kafka broker won't run, remove the listener that is being commented in the server.properties like the image below.
All Kafka installation is now completed, now you can run both Zookeeper Server and Kafka Server by using command below (make sure you run zookeeper first).
zookeeper-server-start.sh ../../config/zookeeper.properties
kafka-server-start.sh ../../config/server.properties
Installing Airflow
The reason we used Linux for this project is that installing Airflow is somehow harder on Windows. To install Airflow, first make sure you already have Python-pip. After that, you can install Airflow right away by running "pip install apache-airflow" and waiting until the installation is completed. You can add airflow to your environment variable like Kafka did before.
After installation is completed, create a new database by using the command airflow db init
. This command is only used once, the first time we use Airflow.
Finally, we can use airflow. To use Airflow, you need to open two different consoles and run airflow webserver-p 8080
and airflow scheduler
in each console, like in the image below.
Now you can access Airflow at localhost:8080, but you need credentials first to access Airflow. To do this, you can create a new account by running airflow users create [-h]". -e EMAIL-f FIRSTNAME-l LASTNAME [-p PASSWORD] -r ROLE [--random-password] -u USERNAME
. Then login to Airflow with the account you have just created. After you login, the homepage will look like the image below.
Streaming data and Simple ETL
Now we are on the main subject, streaming data and ETL process. First you need to create a topic by running the command kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic windows_11
. Next, we need to create several python files for the simple ETL process since we are going to schedule every python program by using Airflow.
Stream
To stream data, we create a python file. Since we are going to fetch data from Twitter, make sure you have a Twitter developer account to get access tokens and keys. To stream data, you can use the blocks of code below (change access, consumer key and secret to yours). After that, save the code into a python file named stream.py.
# import library
import tweepy
from kafka import KafkaProducer
from datetime import datetime, timedelta
import csv
import pandas as pd
# Twitter Consumer and Access
consumer_key = YOUR_KEY
consumer_secret = YOUR_KEY_SECRET
access_token = YOUR_ACCESS_TOKEN
access_token_secret = YOUR_ACCESS_TOKEN_SECRET
# setup authentication
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# instantiation API
api = tweepy.API(auth, wait_on_rate_limit=True)
# Adjust time to local time (GMT +7 / UTC +7)
def normalize_time(time):
mytime = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
mytime += timedelta(hours=7)
return (mytime.strftime("%Y-%m-%d %H:%M:%S"))
# instantiation of Kafka Producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10, 1))
# Topic initialization, keyword, and max query
topic_name = 'windows_11'
search_key = "windows 11"
maxId = -1
maxTweets = 3000
tweetCount = 0
tweetsPerQuery = 500
# Csv declaration, change path to yours
csvFile = open("/home/stndb01/Documents/Data_Engineering/Proyek/"+search_key+".csv", "a+", newline="", encoding="utf-8")
csvWriter = csv.writer(csvFile)
# List declaration
tweet_id = []
tweet_username = []
tweet_text = []
# Looping to fetch tweet by using twitter API until specified limit
while tweetCount < maxTweets:
# getting first tweet data
if maxId <= 0:
# newTweets = api.search_tweets(q=search_key, lang="en", count=tweetCount, max_id=maxId)
newTweets = api.search_tweets(q=search_key, lang="en", count=tweetCount)
# getting data from second tweet kedua and forth
newTweets = api.search_tweets(q=search_key, lang="en", count=tweetsPerQuery)
# getting certain attribute from tweet
for i in newTweets:
record = str(i.user.id_str)
record += ';'
record = str(i.user.name)
record += ';'
# record += str(normalize_timestamp(str(i.created_at)))
# record += ';'
# record += str(i.full_text.encode('utf-8'))
record += str(i.text.encode('utf-8'))
record += ';'
print(str.encode(record))
producer.send(topic_name, str.encode(record))
tweet_id.append(str(i.user.id_str))
tweet_username.append(str(i.user.name))
tweet_text.append(str(i.text.encode('utf-8')))
tweets = [str(i.user.id_str), str(i.user.name), str(i.text.encode('utf-8'))]
csvWriter.writerow(tweets)
# adding TweetCount and MaxId
tweetCount += len(newTweets)
maxId = newTweets[-1].id
dictTweets = {"id":tweet_id, "username":tweet_username, "text":tweet_text}
df = pd.DataFrame(dictTweets)
Extract
The next step is to extract the data from the csv file we've just created. In extract step we extract data from csv to a new temporary csv with prefix _e, this csv later will be deleted in future step. Save the code below into new python file named extract.py.
import pandas as pd
kolom = ['id', 'username', 'tweet']
df_windows = pd.read_csv('/home/stndb01/Documents/Data_Engineering/Proyek/windows 11.csv', names = kolom)
df_windows.to_csv('/home/stndb01/Documents/Data_Engineering/Proyek/windows_11_e.csv')
Transform
The csv file that we've just extracted contains duplicated data. Then, in this step, we will transform the data by removing duplicated data. The code below will use the previous windows_11_e.csv to delete duplicated data and save it to a new csv called windows_11_t.csv. Save the code below into a new python file named transform.py.
import pandas as pd
kolom = ['id', 'username', 'tweet']
df_windows = pd.read_csv('/home/stndb01/Documents/Data_Engineering/Proyek/windows_11_e.csv', names = kolom)
#df_windows.duplicated().value_counts()
df_windows.drop_duplicates(inplace=True, ignore_index=True)
df_windows
df_windows.to_csv('/home/stndb01/Documents/Data_Engineering/Proyek/windows_11_t.csv')
Load
After we transform the data, we load the data into a local database. For this project, we used SQLite3 for the database. The code below is used to load data. This code also deletes both the temporary CSV files that were created in the extract and transform steps. Save the code below into a new python file named load.py.
import sqlite3 as s3
import pandas as pd
import os
# creating connection to database
conn = s3.connect('/home/stndb01/Documents/Data_Engineering/Proyek/windows11_data.db')
# Object cursor to run SQL query
cur = conn.cursor()
# load dataframe to datamart
df_windows = pd.read_csv('/home/stndb01/Documents/Data_Engineering/Proyek/windows_11_t.csv')
df_windows.to_sql('windows11_table',conn,if_exists='replace',index=False)
os.remove("/home/stndb01/Documents/Data_Engineering/Proyek/windows_11_e.csv")
os.remove("/home/stndb01/Documents/Data_Engineering/Proyek/windows_11_t.csv")
Scheduling
The files for the Stream and ETL processes have been created. Then we can create a schedule for those tasks. By creating a new Python file, we are going to create a DAG, or Directed Acyclic Graph, for every task. The DAG code looks like the code below. Save the code below into a new python file named scheduling-proyek.py.
from datetime import timedelta
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow import DAG
# insert your airflow account
default_args = {
'owner': 'admin',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
with DAG(
'Proyek_Akhir',
default_args=default_args,
description='Scheduling untuk task proyek akhir',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['proyek'],
) as dag:
stream = BashOperator(
task_id='Stream_data_twitter',
bash_command='python /home/stndb01/Documents/Data_Engineering/Proyek/stream.py'
#bash_command='python stream.py',
)
extract = BashOperator(
task_id='Extract_data_twitter',
bash_command='python /home/stndb01/Documents/Data_Engineering/Proyek/extract.py'
#bash_command='python extract.py',
)
transform = BashOperator(
task_id='Transform_data_twitter',
bash_command='python /home/stndb01/Documents/Data_Engineering/Proyek/transform.py'
#bash_command='python transform.py',
)
load = BashOperator(
task_id='Load_data_twitter',
bash_command='python /home/stndb01/Documents/Data_Engineering/Proyek/load.py'
#bash_command='python load.py',
)
# create the execute order, set_downstream mean we execute the
# left one and then the right one
stream.set_downstream(extract)
extract.set_downstream(transform)
transform.set_downstream(load)
After you save the file, move the scheduling file to your Airflow installation folder. Inside you will see a DAG folder. Move the scheduling file to that folder and run python scheduling-proyek.py
. Then return to the Airflow homepage and you will see a new DAG for your project, like in the image below.
Finally, click the DAG and you will be redirected to a new page that contains a tree graph based on how you queued all your tasks. Click the play button and then the task will run as you set the interval. Since the start date was set 2 days ago and timedelta was set to 1, all tasks will be run in order and every task that has been completed will be marked with green. The result will look like the image below.
Sentiment Analysis
To do a sentiment analysis, we load the data that has been saved to SQLite database. The code and result would be like these.
# Creating connection to database
conn = s3.connect('windows11_data.db')
# creating cursor
cur = conn.cursor()
query = '''select * from windows11_table'''
df = pd.read_sql(query, conn)
df
Since we only deleted duplicated tweets in the transform process before, there are still some other things that are not really fixed that can cause problems in analysis, such as emoji, url links, symbols, and etc. Because those items are saved in byte format, they can interfere with analysis.So we used the regex function to delete them.
def preprocess_text(review):
review = codecs.decode(review, 'unicode_escape') #remove escape character
review = review[2:-1]
review = re.sub('((www\.[^\s]+)|(https?://[^\s]+))','URL', review)
review = re.sub('[^\x00-\x7f]', '', review)
review = re.sub('@[^\s]+','USER', review)
review = review.lower().replace("ё", "е")
review = re.sub('[^a-zA-Zа-яА-Я1-9]+', ' ', review)
review = re.sub(' +',' ', review)
return review.strip()
df['tweet'] = [preprocess_text(review) for review in df['tweet']]
After that, the data should be okay to be analyzed. Next, we created a new function by using the textblob library to find subjectivity, polarity, and analysis. The result will be added to the new column in the previous dataframe and from that we will know which tweet can be considered a positive, negative, or even neutral review of the Windows 11 launch. The code and result would be as below.
from textblob import TextBlob
def sentiment_analysis(data):
def getSubjectivity(text):
return TextBlob(text).sentiment.subjectivity
#Create a function to get the polarity
def getPolarity(text):
return TextBlob(text).sentiment.polarity
def getAnalysis(score):
if score < 0:
return "Negative"
elif score == 0:
return "Neutral"
else:
return "Positive"
#Create two new columns 'Subjectivity' & 'Polarity'
data["TextBlob_Subjectivity"] = data["tweet"].apply(getSubjectivity)
data["TextBlob_Polarity"] = data["tweet"].apply(getPolarity)
data["TextBlob_Analysis"] = data["TextBlob_Polarity"].apply(getAnalysis)
return data
The next code is an optional step. We count the top 20 words that appear frequently, and that would also be displayed with the sentiment analysis result later.
from collections import Counter
def getMostFrequentWord(df):
most_freq = Counter(" ".join(df["tweet"]).split()).most_common(30)
most_freq_filtered = {}
for i in most_freq:
if(i[0] == 'user' or i[0] == 'url' or i[0] == 'rt'):
continue
else:
most_freq_filtered[i[0]] = i[1]
if(len(most_freq_filtered) == 20): # ambil 20 kata paling banyak muncul
break
return most_freq_filtered
#Calling the function
most_freq = getMostFrequentWord(df)
most_freq
Dashboard
The last step in this project is creating a dashboard that will display the sentiment analysis results in a pie chart and also show the top 20 words that appear frequently. The code and result would look like below.
f, axes = plt.subplots(1, 2, figsize = (15, 7))
# sns.set_style("white")
colors = sns.color_palette('pastel')[0:5]
# colors = sns.color_palette('bright')[0:5]
f.suptitle("{} Twitter Topic Analysis".format(TOPIC), fontsize=20)
f.tight_layout(pad=3.0)
# figure 1
ax1 = plt.subplot2grid((1,2),(0,0))
ax1.title.set_text("{} Tweet Sentiment".format(TOPIC))
fig1 = plt.pie(df.groupby("TextBlob_Analysis").size(), labels = df.groupby("TextBlob_Analysis").size().index, colors = colors, autopct='%.0f%%')
# figure 2
ax2 = plt.subplot2grid((1,2),(0,1))
ax2.title.set_text("{} Tweet Most Common Words".format(TOPIC))
fig2 = sns.barplot(x=list(most_freq.keys()), y=list(most_freq.values()))
fig2.set(xlabel='Most Common Words', ylabel='Frequency')
for item in fig2.get_xticklabels():
item.set_rotation(90)
Reference
How to install Apache Airflow
Working with streaming Twitter data using Kafka
How to test your data with Great Expectations
Sentiment Analysis using TextBlob
Exploratory data analysis for text data: EDA using python
Top comments (0)