DEV Community

Cover image for Building a Reddit Sentiment Pipeline using Python, PostgreSQL, VADER, Airflow, Grafana, Prometheus and StatsD
Denzel Kanyeki
Denzel Kanyeki

Posted on

Building a Reddit Sentiment Pipeline using Python, PostgreSQL, VADER, Airflow, Grafana, Prometheus and StatsD

Data is everywhere, but making sense of it requires collecting, cleaning, storing, and analyzing it effectively. In this blog, I’ll walk you through how I built a Reddit Sentiment Analysis pipeline that fetches Reddit posts, analyzes their sentiment, stores results in PostgreSQL, and visualizes insights in Grafana and orchestrated with Apache Airflow and containerized using Docker Compose.

If would want to check the repository out, here is the link.

Project Details

  • Extract Reddit posts from specific subreddits (using praw API).
  • Clean and analyze sentiment with VADER Sentiment Analyzer.
  • Load processed data into Postgres.
  • Orchestrate everything with Airflow DAGs.
  • Visualize insights on a Grafana Dashboard.
  • Get email notifications when DAGs fail or succeed via SMTP.
  • Monitoring metrics and service health using Prometheus, StatsD and Grafana.

The snapshot below shows the project architecture:
Architecture

Project Setup

Step 1: Extract Reddit Data

Using the praw Python library, I pull the top 50 posts from subreddits like r/kenya and r/nairobi.

@task
def extract_data():
  reddit = praw.Reddit(
              client_id=CLIENT_ID,
              client_secret=SECRET_KEY,
              user_agent='data pipeline by u/user'
          )

          limit = 50
          data = []
          subs = ["kenya", "nairobi"]
          for sub in subs:
              subreddit = reddit.subreddit(sub)
              for s in subreddit.top(time_filter='day', limit=limit):
                  data.append({
                      'id': s.id,
                      'title': clean_text(s.title) if s.title else 'N/A',
                      'text': clean_text(s.selftext) if s.selftext else 'N/A',
                      'url': s.url,
                      'score': s.score,
                      'subreddit': sub,
                      'time_created': s.created_utc
                  })

          return data
Enter fullscreen mode Exit fullscreen mode

Step 2: Transform & Analyze

Posts are cleaned (remove special characters, filter out N/A) and passed into VADER Sentiment Analyzer, which returns:

  • pos → Positive score
  • neu → Neutral score
  • neg → Negative score
  • compound → Overall sentiment

I then classify sentiment into Positive, Neutral, or Negative.

@task
    def analyze_text():
        """fetch posts from Postgres, analyze sentiment from posts, and save results back to a different psql table."""
        # Impoting these modules inside this task helps with run time
        from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
        from utils.extract import get_sentiment

        engine = create_engine(DB_URL)
        analyzer = SentimentIntensityAnalyzer()

        try:
            # Load from DB
            df = pd.read_sql_table("reddit_posts", con=engine, schema="reddit")

            # Clean text off the N/A tags
            df["text"] = df["text"].replace("N/A", np.nan)
            df.dropna(subset=["text"], inplace=True)

            # Apply sentiment
            scores = df["text"].apply(lambda x: analyzer.polarity_scores(x))
            df["compound"] = scores.apply(lambda x: x["compound"])
            df["neg"] = scores.apply(lambda x: x["neg"])
            df["neu"] = scores.apply(lambda x: x["neu"])
            df["pos"] = scores.apply(lambda x: x["pos"])
            df["sentiment"] = df["compound"].apply(get_sentiment)

            # Save results
            df.to_sql(
                "reddit_sentiment_analysis",
                con=engine,
                schema="reddit",
                if_exists="append",
                index=False,
            )
            return "Data loaded successfully!"
        except Exception as e:
            raise RuntimeError(f"Error analyzing or saving data: {e}")
Enter fullscreen mode Exit fullscreen mode

Step 3: Load into PostgreSQL

The processed data is stored in a reddit_sentiment_analysis table, and the analysis data is stored in a reddit_posts table which separates concerns in a medallion architecture, that is a bronze layer for raw data without any transformations from PRAW that prevents any data loss, silver layer for semi-structured data and gold layer which serves as the "single source of truth" for decision-making and ready for dashboards in Grafana.

Step 4: Orchestration with Airflow

Two DAGs coordinate the workflow:

  1. reddit_dag.py → Extract & store raw posts.
  2. analyze_dag.py → Waits for the first DAG, runs sentiment analysis, and sends email notifications via Airflow's ExternalTaskSensor which pokes reddit_dag to check if it's complete then lets analyze_dag run.

Read more about ExternalTaskSensor in the documentation

Step 5: Visualize in Grafana

Grafana queries Postgres directly and visualizes metrics like sentiment trends, subreddit comparisons, and post volume. Also, visualizes Airflow metrics via Prometheus and Grafana

StatsD is integrated into this project as a metrics collector for Airflow. Airflow emits metrics such as dag_processing.processes and executor.open_slots, which capture scheduler and worker health, DAG processing times, and task execution details. These metrics are forwarded to monitoring backends and visualized in Grafana dashboards for observability.

Airflow x StatsD

Prometheus is used as the metrics storage and querying system. The prometheus service is configured in docker-compose.yml and prometheus.yml files scrapes metrics exposed by Airflow at the /metrics endpoint via a Prometheus exporter. Prometheus then makes these metrics queryable using PromQL and visualizable in Grafana.

To add Prometheus as a data source on Grafana and visualize metrics:

1. Head over to localhost:3000 to access the Grafana service

Use admin & admin as your login details, and you will be prompted to change your password for security reasons

login page

2. Adding Prometheus as a data source

On the left side, click on 'Add Data Source' and select Prometheus as the data source.

datasource page

3. Import a custom dashboard

To use a custom dashboard, click on Import Dashboard option and paste the contents of dashboard.json.

Import dashboard

Below is a snapshot of the metrics dashboard showing scheduler heartbeat rate, DAGs loaded, DAG processing duration, queued tasks and time taken in running tasks bt the executor.

Metrics dashboard

Insights generated from this project

The dashboard below visualizes the data generated from this project based on sentiment scores. Here is the link to this dashboard.

findings

Findings 2

  • 66% of the data show positive sentiment, as 23% show negative sentiment and 7% show neutral sentiment.
  • r/Nairobi show more positive sentiment posts than r/Kenya, but with a higher amount of posts from users.
  • r/Nairobi has more posts has r/Kenya with 659 posts compared to r/Kenya's 616.
  • The highest sentiment was recorded on 15 September 2025 (0.483), and the lowest sentiment was recorded on 12 September 2025 (0.213).
  • Upvote score does not affect sentiment score, so lower upvote score doesn't show low sentiment and vice versa.

Conclusion

This project highlights how well-designed data pipelines can unlock valuable insights from unstructured data sources like Reddit, while following best practices in data engineering and analytics.

Please like, comment, share widely, and follow for more data engineering content! For collaboration on projects, please email me at denzelkinyua11@gmail.com or visit any of my social media platforms linked on my GitHub page.

Top comments (1)

Collapse
 
dismas_mike profile image
Ambuso Dismas

Impressive!!