DEV Community

Lorena
Lorena

Posted on • Originally published at lorenaciutacu.com

Building a dockerized ETL pipeline for streaming tweets in Slack

One of the projects in my Data Science Bootcamp was about creating a database of tweets, along with their sentiment score, and post positive tweets in a Slack channel. This pipeline had to be orchestrated with Docker Compose. The pipeline looks like this:

schema ETL pipeline

In this post, I'll show you how I set up each step.

0. Prerequisites & tech stack

Here's an overview of the apps, services, and libraries I used in this project:

Apps & Databases Python libraries
Twitter tweepy & vader
Slack slackclient
MongoDB pymongo
PostgreSQL psycopg2-binary & sqlalchemy
Docker-Compose -

1. Collecting tweets

To collect tweets, I used the Twitter API along with the tweepy library.

First, I created an app on Twitter and got my credentials (API key and Access Token). Then, I wrote the Python code for streaming live tweets, using tweepy with my Twitter credentials. I chose to stream the hashtag #OnThisDay (thought it would be interesting to get a daily notification of what happened years ago) and collected the tweet text and user handle.

from tweepy import OAuthHandler, Stream, API
from tweepy.streaming import StreamListener
tweet = {
    'username': t['user']['screen_name'],
    'text': t['text'],
}

stream_listener = StreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=['OnThisDay'])
Enter fullscreen mode Exit fullscreen mode

2. Storing tweets in MongoDB

After collecting the tweets, I had to store them in MongoDB, a non-relational (NoSQL) database that stores data in JSON-like documents. Since the tweet data is collected as key-value pairs (JSON format), MongoDB is a good way to store this information.

First, I had to create a MongoDB instance, set up a cluster, and create a database and a collection within it:

  1. Create a MongoDB account
  2. Set up a cluster: cloud.mongodb.com > Clusters > Create New Cluster
  3. Create a database: Cluster > Collections > Create Database
  4. Create a collection: Cluster > Collections > Database > Create Collection
  5. Create a field: Collection > Insert document > Type the field text below _id
  6. Allow access to the database: Project > Security > Network Access > IP Access List > Add your IP address.
  7. Connect to the database from your terminal:\ mongo "mongodb+srv://YourClusterName.mongodb.net/YourDatabaseName" --username YourUsername

Second, I wrote the Python code for storing tweets in MongoDB using the pymongo library.

import pymongo

client = pymongo.MongoClient(host='mongo_container', port=27018)
db = client.tweets_db

def warning_log(tweet):
    logging.critical('\n\nTWEET: ' + tweet['username'] + 'just tweeted: ' + tweet['text'])
    db.collections.onthisday.insert_one(tweet)
Enter fullscreen mode Exit fullscreen mode

The host mongo_container is one of the Docker containers, explained in section 5.

3. Performing ETL job

The ETL (Extract, Transform, Load) job involves three actions: extracting tweets from MongoDB, analyzing their sentiment, and storing them into a new Postgres database. Here is the Python code for the ETL job.

3.1. Extracting tweets from MongoDB

To extract the tweet texts from MongoDB, I used again the pymongo library.

def extract_tweets():
    tweets  = list(db.onthisday.find())
    if tweets:
        t = random.choice(tweets)
        logging.critical("Random tweet: "+t["text"])
        return t
Enter fullscreen mode Exit fullscreen mode

3.2. Transforming tweets with sentiment scores

To analyze the sentiment of the tweets, I used the VADER library , which returns (among others) a compound sentiment score.

from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

def transform_tweets(tweet):

    tweet_text = tweet['text'].replace("\'","")

    sia = SentimentIntensityAnalyzer()
    tweet_sia = sia.polarity_scores(tweet_text)['compound']

    return tweet_sia
Enter fullscreen mode Exit fullscreen mode

3.3. Loading tweets into PostgreSQL

To load the tweets with their sentiment scores into a Postgres database, first you need––well, a Postgres database. I installed Postgres, then created a database and a table for tweets right from the terminal:

  1. Connect to Postgres: psql
  2. Create a database: createdb twitter
  3. Go into the created database: psql twitter
  4. Create columns in the database: CREATE TABLE tweets (text varchar(280), score numeric(4,3));

Then, I wrote the Python code for inserting tweets into the tweets table, using the sqlalchemy library.

def load_tweets(tweet, sentiment):

    insert_query = """
    INSERT INTO tweets VALUES ('{tweet["text"]}', {tweet_sia});
    """

    engine.execute(insert_query)
    logging.critical(f'Tweet {tweet["text"]} loaded into Postgres.')
Enter fullscreen mode Exit fullscreen mode

4. Extracting tweets from Postgres

After having a database of tweets and their sentiment score in place, I had to select and extract some tweets, that would be sent to Slack.

query = pg.execute(
    "SELECT text FROM tweets ORDER BY sentiment DESC LIMIT 1")
msg = str(list(query))
output = f'NEW TWEET! {user} just tweeted: {msg} \nSentiment score: {blob_score}'
Enter fullscreen mode Exit fullscreen mode

5. Posting tweets with a Slackbot

The last step in the pipeline is posting tweets in a Slack channel. To do this, first I created a Slackbot.

Then, I wrote the Python code for posting tweets in a Slack channel, including the code from the previous step:

import time
import slack
from sqlalchemy import create_engine
import config

engine = config.PG_ENGINE
webhook_url = config.WEBHOOK_SLACK


while True:
    logging.critical("\n\nPositive tweet:\n")
    query = pg.execute(
        "SELECT text FROM tweets ORDER BY sentiment DESC LIMIT 1")
    msg = str(list(query))
    logging.critical(msg + "\n")
    output = f'NEW TWEET! {user} just tweeted: {msg} \nSentiment score: {blob_score}'
    data = {'text': output}
    requests.post(url=webhook_url, json=data)

    time.sleep(30)
Enter fullscreen mode Exit fullscreen mode

And 🎉 –– here's the tweet that was posted in Slack:

Tweet posted by Slackbot

6. Creating the Docker Compose pipeline

The final touch of this project is orchestration. The individual Python scripts for each step work when you run them manually, but the goal is to run this pipeline from beginnning to end with only one command. This is where Docker Compose comes in.

Compose is a tool for defining and running multi-container Docker applications. With Compose, you use a YAML file to configure your application’s services. Then, with a single command, you create and start all the services from your configuration.

Each of the five previous steps (or the rectangles in my messy schema) represents a Docker container, so in my docker_compose.yml file I had five containers (services): tweet_container, postgres_container, mongo_container, etl_container, and slackbot_container.

For the two database containers, I used Docker images, since they didn't depend on custom code stored in my project folders. For the other three containers, I referefenced the respective code location (build) and their dependencies (depends_on) (for example, the tweet_collector depends on postgres and mongo, since the tweets are stored in these databases).

I also used Docker volumes to keep the data when the containers are stopped (data persistence).

version: '3'

services:
  tweet_container:
    build: tweet_collector/
    depends_on:
      - postgres_container
      - mongo_container
    volumes:
      - ./tweet_collector/:/app

  postgres_container:
    build: postgresdb
    image: postgres:13.0
    ports:
      - 5555:5432
    environment:
      - POSTGRES_USER=your_user
      - POSTGRES_PASSWORD=your_password

  mongo_container:
    build: mongodb
    image: mongo
    ports:
      - 27018:27018
    volumes:
      - ./mongodb:/app

  etl_container:
    build: etl_job/
    depends_on:
      - postgres_container
      - mongo_container
    volumes:
      - ./etl_job/:/app

  slackbot_container:
    build: slackbot/
    depends_on:
      - mongo_container
      - postgres_container
    volumes:
      - ./slackbot/:/app
Enter fullscreen mode Exit fullscreen mode

Finally, here some of the CLI commands I used for managing the Docker containers (you can find more in their docs):

  • docker images to list all the used images (postgres and mongo)
  • docker ps -a to list all my containers
  • docker -v to mount volumes
  • docker build to build an image from a Docker file
  • docker run to run the containers

And that was it: my very first dockerized ETL pipeline –– a week's work and a few hours writing packed in a 6-minute blog post.

Latest comments (0)