Data is everywhere, but making sense of it requires collecting, cleaning, storing, and analyzing it effectively. In this blog, I’ll walk you through how I built a Reddit Sentiment Analysis pipeline that fetches Reddit posts, analyzes their sentiment, stores results in PostgreSQL, and visualizes insights in Grafana and orchestrated with Apache Airflow and containerized using Docker Compose.
If would want to check the repository out, here is the link.
Project Details
- Extract Reddit posts from specific subreddits (using
praw
API). - Clean and analyze sentiment with VADER Sentiment Analyzer.
- Load processed data into Postgres.
- Orchestrate everything with Airflow DAGs.
- Visualize insights on a Grafana Dashboard.
- Get email notifications when DAGs fail or succeed via SMTP.
- Monitoring metrics and service health using Prometheus, StatsD and Grafana.
The snapshot below shows the project architecture:
Project Setup
Step 1: Extract Reddit Data
Using the praw
Python library, I pull the top 50 posts from subreddits like r/kenya
and r/nairobi
.
@task
def extract_data():
reddit = praw.Reddit(
client_id=CLIENT_ID,
client_secret=SECRET_KEY,
user_agent='data pipeline by u/user'
)
limit = 50
data = []
subs = ["kenya", "nairobi"]
for sub in subs:
subreddit = reddit.subreddit(sub)
for s in subreddit.top(time_filter='day', limit=limit):
data.append({
'id': s.id,
'title': clean_text(s.title) if s.title else 'N/A',
'text': clean_text(s.selftext) if s.selftext else 'N/A',
'url': s.url,
'score': s.score,
'subreddit': sub,
'time_created': s.created_utc
})
return data
Step 2: Transform & Analyze
Posts are cleaned (remove special characters, filter out N/A
) and passed into VADER Sentiment Analyzer, which returns:
-
pos
→ Positive score -
neu
→ Neutral score -
neg
→ Negative score -
compound
→ Overall sentiment
I then classify sentiment into Positive, Neutral, or Negative.
@task
def analyze_text():
"""fetch posts from Postgres, analyze sentiment from posts, and save results back to a different psql table."""
# Impoting these modules inside this task helps with run time
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from utils.extract import get_sentiment
engine = create_engine(DB_URL)
analyzer = SentimentIntensityAnalyzer()
try:
# Load from DB
df = pd.read_sql_table("reddit_posts", con=engine, schema="reddit")
# Clean text off the N/A tags
df["text"] = df["text"].replace("N/A", np.nan)
df.dropna(subset=["text"], inplace=True)
# Apply sentiment
scores = df["text"].apply(lambda x: analyzer.polarity_scores(x))
df["compound"] = scores.apply(lambda x: x["compound"])
df["neg"] = scores.apply(lambda x: x["neg"])
df["neu"] = scores.apply(lambda x: x["neu"])
df["pos"] = scores.apply(lambda x: x["pos"])
df["sentiment"] = df["compound"].apply(get_sentiment)
# Save results
df.to_sql(
"reddit_sentiment_analysis",
con=engine,
schema="reddit",
if_exists="append",
index=False,
)
return "Data loaded successfully!"
except Exception as e:
raise RuntimeError(f"Error analyzing or saving data: {e}")
Step 3: Load into PostgreSQL
The processed data is stored in a reddit_sentiment_analysis
table, and the analysis data is stored in a reddit_posts
table which separates concerns in a medallion architecture, that is a bronze layer for raw data without any transformations from PRAW that prevents any data loss, silver layer for semi-structured data and gold layer which serves as the "single source of truth" for decision-making and ready for dashboards in Grafana.
Step 4: Orchestration with Airflow
Two DAGs coordinate the workflow:
-
reddit_dag.py
→ Extract & store raw posts. -
analyze_dag.py
→ Waits for the first DAG, runs sentiment analysis, and sends email notifications via Airflow'sExternalTaskSensor
which pokesreddit_dag
to check if it's complete then letsanalyze_dag
run.
Read more about ExternalTaskSensor
in the documentation
Step 5: Visualize in Grafana
Grafana queries Postgres directly and visualizes metrics like sentiment trends, subreddit comparisons, and post volume. Also, visualizes Airflow metrics via Prometheus and Grafana
StatsD is integrated into this project as a metrics collector for Airflow. Airflow emits metrics such as dag_processing.processes
and executor.open_slots
, which capture scheduler and worker health, DAG processing times, and task execution details. These metrics are forwarded to monitoring backends and visualized in Grafana dashboards for observability.
Prometheus is used as the metrics storage and querying system. The prometheus service is configured in docker-compose.yml
and prometheus.yml
files scrapes metrics exposed by Airflow at the /metrics
endpoint via a Prometheus exporter. Prometheus then makes these metrics queryable using PromQL and visualizable in Grafana.
To add Prometheus as a data source on Grafana and visualize metrics:
1. Head over to localhost:3000
to access the Grafana service
Use admin
& admin
as your login details, and you will be prompted to change your password for security reasons
2. Adding Prometheus as a data source
On the left side, click on 'Add Data Source' and select Prometheus as the data source.
3. Import a custom dashboard
To use a custom dashboard, click on Import Dashboard option and paste the contents of dashboard.json
.
Below is a snapshot of the metrics dashboard showing scheduler heartbeat rate, DAGs loaded, DAG processing duration, queued tasks and time taken in running tasks bt the executor.
Insights generated from this project
The dashboard below visualizes the data generated from this project based on sentiment scores. Here is the link to this dashboard.
- 66% of the data show positive sentiment, as 23% show negative sentiment and 7% show neutral sentiment.
- r/Nairobi show more positive sentiment posts than r/Kenya, but with a higher amount of posts from users.
- r/Nairobi has more posts has r/Kenya with 659 posts compared to r/Kenya's 616.
- The highest sentiment was recorded on 15 September 2025 (0.483), and the lowest sentiment was recorded on 12 September 2025 (0.213).
- Upvote score does not affect sentiment score, so lower upvote score doesn't show low sentiment and vice versa.
Conclusion
This project highlights how well-designed data pipelines can unlock valuable insights from unstructured data sources like Reddit, while following best practices in data engineering and analytics.
Please like, comment, share widely, and follow for more data engineering content! For collaboration on projects, please email me at denzelkinyua11@gmail.com or visit any of my social media platforms linked on my GitHub page.
Top comments (1)
Impressive!!