DEV Community

Cover image for How to Build a Data Pipeline to enrich your data with PySpark and Sentiment Analysis
Adi Polak
Adi Polak

Posted on • Updated on • Originally published at

How to Build a Data Pipeline to enrich your data with PySpark and Sentiment Analysis

In this tutorial, you will learn how to enrich COVID19 tweets data with a positive sentiment score. You will leverage PySpark and Cognitive Services and learn about Augmented Analytics.

๐Ÿคฏ Oh, wait! What Is Augmented Analytics?

According to Gartner's report, augmented analytics is the use of technologies such as machine learning and AI to assist with data preparation and insight generation. Its main goal is to help more people to get value out of data and generate insights in an easy, conversational manner. For our example, we extract the positive sentiment score out of a tweet to help in understanding the overall sentiment towards COVID-19.
YES, This is Augmented Analytics!

What Is PySpark?

PySpark is the framework we use to work with Apache Spark and Python.
If you want to learn more about spark, checkout this bitesize series.

What Is Sentiment Analysis?

Sentiment Analysis is part of NLP (natural language processing) usage that combines text analytics, computation linguistics, and more to systematically study affective states and subjective information, such as tweets.

In simple words, understand the sentiment of a text, if the text positive, negative, or neutral.
It can reflect on the person that wrote it, were they happy with a situation when they wrote the tweet?

In our example, we will see how we can extract positive sentiment score out of COVID-19 tweets text to better understand the feelings of people writing these tweets.

I know, feelings, right?! that's complicated!

ALRIGHT! I got you!

In this tutorial, YOU are going to leverage Azure Cognitive Service, which gives us Sentiment Analysis capabilities out of the box.

When working with it, we can leverage the TextAnalyticsClient client library or leverage REST API. Today, you will use the REST API as it gives us more flexibility.

โฐ Tutorial O'Clock!


  • Apache Spark environment with notebooks, it can be Databricks, or you can start a local environment with docker by running the next command: docker run -it -p 8888:8888 jupyter/pyspark-notebook
  • Azure free account
  • Download Kaggle COVID-19 Tweet data
  • Cognitive Services free account (check out the picture below )

Alt Text

Step by Step Tutorial - Full Data Pipeline:

In this step by step tutorial, you will learn how to load the data with PySpark, create a user define a function to connect to Sentiment Analytics API, add the sentiment data and save everything to the Parquet format files.

You now need to extract upload the data to your Apache Spark environment, rather it's Databricks or PySpark jupyter notebook. For Databricks use this, for juypter use this.

For both cases, you will need the
file_location = "/FileStore/tables/covid19_tweets.csv" make sure to keep a note of it.

Step 1: Loading the data with PySpark

This is how you load the data to PySpark DataFrame object, spark will try to infer the schema directly from the CSV. One of the things you will notice is that when working with CSV and infer a schema, Spark often refers to most columns as String format.

inputDF =\
option("header", "true").\
option("inferSchema", "true").load("/FileStore/tables/covid19_tweets.csv")
Enter fullscreen mode Exit fullscreen mode

Step 2: Provide more Accurate Schema to our data:

In here you define the expectedSchema and later cast the data to match it. You will use StructType and StructField which are Spark SQL DataTypes that help you with defining the schema.

withColumn functionality creates a new DataFrame with the desired column according to the name and value you provide it with.

from pyspark.sql.types import *
from pyspark.sql.functions import *

# create expected schema
expectedSchema = StructType([
  StructField("user_name", StringType(), True),
  StructField("user_location", StringType(), True),
  StructField("user_description", StringType(), True),
  StructField("user_created", StringType(), True),
  StructField("user_followers", FloatType(), True),
  StructField("user_friends", FloatType(), True),
  StructField("user_favourites", FloatType(), True),
  StructField("user_verified", BooleanType(), True),
  StructField("date", StringType(), True),
  StructField("text", StringType(), True),
  StructField("hashtags", StringType(), True),
  StructField("source", StringType(), True),
  StructField("is_retweet", BooleanType(), True)])
Enter fullscreen mode Exit fullscreen mode

Now, let's create your new DataFrame with the right schema!

Notice that you assign the new schema to inputDF, which means you will no longer have access to the old DataFrame.

# Set data types - cast the data in columns to match the schema

inputDF = inputDF \
  .withColumn("user_name", inputDF["user_name"].cast("string")) \
  .withColumn("user_location", inputDF["user_location"].cast("string")) \
  .withColumn("user_description", inputDF["user_description"].cast("string")) \
  .withColumn("user_created", inputDF["user_created"].cast("string")) \
  .withColumn("user_followers", inputDF["user_followers"].cast("float")) \
  .withColumn("user_friends", inputDF["user_friends"].cast("float")) \
  .withColumn("user_favourites", inputDF["user_favourites"].cast("float")) \
  .withColumn("user_verified", inputDF["user_verified"].cast("boolean")) \
  .withColumn("date", inputDF["date"].cast("string")) \
  .withColumn("text", inputDF["text"].cast("string")) \
withColumn("hashtags", inputDF["hashtags"].cast("string"))\
withColumn("source", inputDF["source"].cast("string")) \
  .withColumn("is_retweet", inputDF["is_retweet"].cast("boolean")) \
Enter fullscreen mode Exit fullscreen mode

Step 3: Connect to Sentiment Analysis With REST API

For connecting and consuming sentiment analysis services, we need to provide the sentiment analysis endpoint and access key. Both can be found in the .

Finding the endpoint, it can be from the Overview section or from Keys and Endpoints.

Alt Text

Finding the access key:
Alt Text

After finding the key and endpoint, for production and working in a team, you need to store them in someplace safe, try providing saving keys in free text in code, this is not safe. You might end up with hackers mining your cloud environment for bitcoins.

For Databricks, you can leverage dbutils.secrets functionality. This is how to set it up.

If you work locally with juypter PySpark notebook, you can use plain-text, but remember to remove it when you commit your code to a git repo.

This is how to work with dbutils, providing it the scope and key name.

In this code snippet, the scope is named - mle2ebigdatakv and the name for the key is sentimentEndpoint and sentimentAccessKeys.

# provide endpoint and key 
sentimentEndpoint = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentEndpoint")
sentimentAccessKeys = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentAccessKeys")
Enter fullscreen mode Exit fullscreen mode

Let's build the connections itself, sentiment analysis expects to receive a document like an object, for that you will work with python dictionary and will build a doc request with ID. The ID has to be unique for every request.

Notice here the language_api_url variable, this is where you are constructing the request for Cognitive Analysis, asking for text analytics sentiment with version 3.0.

import requests

# build the rest API request with language_api_url
language_api_url = sentimentEndpoint + "/text/analytics/v3.0/sentiment"
headers = {"Ocp-Apim-Subscription-Key": sentimentAccessKeys}
def constractDocRequest(text):
  docRequest = {}
  doc = {}
  doc["id"]= text
  doc["text"]= text
  docRequest["documents"] = [doc]
  return docRequest
Enter fullscreen mode Exit fullscreen mode

Try running it with some text, you will see that the response is consistent of score sentiment for positive, neutral and negative.

This is how a response is structured:

    "documents": [
            "id": "1",
            "sentiment": "positive",
            "confidenceScores": {
                "positive": 1.0,
                "neutral": 0.0,
                "negative": 0.0
            "sentences": [
                    "sentiment": "positive",
                    "confidenceScores": {
                        "positive": 1.0,
                        "neutral": 0.0,
                        "negative": 0.0
                    "offset": 0,
                    "length": 66,
                    "text": "covid19 is not scary at all, it's actually an opportunity to thrive"
            "warnings": []
    "errors": [],
    "modelVersion": "2020-04-01"
Enter fullscreen mode Exit fullscreen mode

Let's build a python functionality to extract sentiment and register the function with PySpark through the UDF(user-defined function) API.

You need to make sure you are actually getting a document back from the REST API and also secure your functionality from sending an empty text to the sentiment analysis service, as it will resolve in an error.

This is how you connect everything together:

from pyspark.sql.functions import UDF

# extract the sentiment out of the returned json doc
def extractSentiment(doc,sentimentType):
  if doc == {} or not 'documents' in doc:
    return 0.0
  return float(doc['documents'][0]['confidenceScores'][sentimentType])
#function for extracting the positive sentiment 
def getPositiveSentiment(text):
  if bool(text.strip()) == False:
    return 0.0
  positive = extractSentiment(constructDocRequest(text),'positive')
  return positive
# creating the udf function pointer
get_positive_sentiment = udf(getPositiveSentiment, StringType())
# create a new DF with new column represetning positive sentiment score        
enrichedDF_positiveSentiment = inputDF.withColumn('positive_sentiment', get_positive_sentiment(inputStream["text"])) 
Enter fullscreen mode Exit fullscreen mode

After enriching your data, it's important to save it to storage for future needs, you will save it to parquet format which will keep the schema intact. Apache Parquet format is designed to bring efficient columnar storage of data compared to row-based files such as CSV since it allows better compression and faster scanning of a subset of columns later on. Vs CSV, where we have to read the whole file and the columns to query only a subset of them.

This is how it's done with PySpark:

# Stream processed data to parquet for the Data Science to explore and build ML models

enrichedDF_poisitveSentiment.write \
  .format("parquet") \
Enter fullscreen mode Exit fullscreen mode

You can decide on the name and structure of your data, but make sure to point out that this data now contained a new column with positive sentiment.

This data can later be used in various data visualization tools or for researchers.

Step 4:

There is no step 4, you are done!!!

๐Ÿž What about Bugs?

Let's take a look at this json exception:

{'error': {'code': 'InvalidRequest',           
           'innererror': {'code': 'EmptyRequest',                                                           'message': 'Request body must be present.'
           'message': 'Invalid Request.'}
Enter fullscreen mode Exit fullscreen mode

Pay notice to the error message, when you see this kind of error, it might be that you are out of quote with Cognitive Services. If you learning about the service and trying it out, it's better to use a few samples and not the whole datasets, as you might run out of quota in the free tier as it's good for up to 5K transactions.

โœ… That's it!

This tutorial walked you through how to leverage existing REST API services to enrich your data for future work and augmented analytics.

To learn more, check out the GitHub repo: covid-19-e2e-big-data-ml-system.

Happy to take your questions and follow up with you on Adi Polak - Data and AI @ Twitter ๐Ÿฆ .

Top comments (0)