A modern Apache Airflow 3.1.0 template for scraping and analyzing Reddit trending posts with PostgreSQL storage, built with Docker Compose for easy deployment.
๐ Features
This Reddit trending posts pipeline offers:
- Airflow 3.1.0 with TaskFlow API and modern architecture
- Reddit API Integration for scraping trending posts from multiple subreddits
- PostgreSQL Database with comprehensive schema for post metadata
- Docker Compose setup for easy deployment and scaling
- Celery Executor for distributed task processing
- Redis for task queue management
- PgAdmin for database administration
- Flower for Celery monitoring
- Rate Limiting and error handling
- Configurable Subreddits via environment variables
๐๏ธ Architecture Overview
๐ ๏ธ Tech Stack
- Apache Airflow 3.1.0: Modern workflow orchestration
- PostgreSQL: Robust data storage
- Docker Compose: Container orchestration
- Celery Executor: Distributed task processing
- Redis: Message broker
- Python 3.11+: Core programming language
๐ Pipeline Capabilities
DAG Structure
The reddit_trending_pipeline DAG includes:
- get_config: Retrieves configuration and timezone settings
- get_subreddits: Reads subreddit list from environment variables
- fetch_subreddit: Scrapes posts from each subreddit (parallel execution)
- combine_results: Merges results from all subreddits
- persist_and_output: Saves data to PostgreSQL and outputs JSON
Key Features
@dag(
dag_id="reddit_trending_pipeline",
description="Scrape Reddit posts from last 24 hours and save to PostgreSQL",
start_date=datetime.datetime(2025, 10, 1),
schedule="0 6 * * *", # every day at 6 AM
catchup=False,
default_args={
"owner": "airflow",
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
tags=["reddit", "scraping", "taskflow"],
)
def reddit_trending_pipeline():
# Task implementations...
Smart Data Filtering
The pipeline includes intelligent filtering:
- Time-based: Only posts from the last 24 hours
- Quality-based: Minimum 100 upvotes threshold
- Rate limiting: 2-second delay between requests
- Media handling: Extracts images, videos, and gallery URLs
# Only last 24 hours
if created_utc <= args["cutoff_ts"]:
continue
if int(p.get('ups', 0)) < 100:
continue
๐๏ธ Database Schema
The pipeline creates a comprehensive reddit_posts table with the following fields:
CREATE TABLE reddit_posts (
id SERIAL PRIMARY KEY,
post_id VARCHAR(32) UNIQUE NOT NULL,
subreddit VARCHAR(100) NOT NULL,
title TEXT,
description TEXT,
upvotes INT,
downvotes INT,
score INT,
url TEXT,
author VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE,
num_comments INT,
over_18 BOOLEAN,
media_urls TEXT,
upvote_ratio REAL,
scraped_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
๐ Getting Started
Prerequisites
- Docker and Docker Compose
- At least 4GB RAM
- At least 2 CPU cores
- 10GB free disk space
Option A: Use This Template
git clone https://github.com/rugvedp/apache-airflow-3-template.git
cd apache-airflow-3-template
Option B: Start Fresh with Official Template
If you want to start from the official Airflow 3.1.0 template:
# Download the official Airflow 3.1.0 docker-compose.yaml
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/3.1.0/docker-compose.yaml'
# Create required directories
mkdir -p dags logs plugins config
2. Initialize Environment
make env
This creates the required directories and .env file with your user ID.
3. Initialize Airflow
make init
This initializes the Airflow database and creates the default admin user (username: airflow, password: airflow).
4. Start Services
make upd
This starts all services in detached mode.
5. Access the Web Interface
- Airflow UI: http://localhost:8080 (airflow/airflow)
- PgAdmin: http://localhost:5050 (admin@admin.com/admin)
- Flower: http://localhost:5555 (optional, for Celery monitoring)
Airflow Dashboard
DAG Execution View
โ๏ธ Configuration
Environment Variables
Create a .env file or set these environment variables:
# Required
AIRFLOW_UID=50000
# Optional Reddit Configuration
REDDIT_SUBREDDITS='["MusicIndia", "IndiaTech", "worldnews", "programming"]'
REDDIT_REQUEST_DELAY_S=2
REDDIT_DB_CONN=postgresql://airflow:airflow@postgres:5432/airflow
# Optional Airflow Configuration
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
DAG Configuration
The DAG runs daily at 6 AM (Asia/Kolkata timezone) and:
- Scrapes posts from the last 24 hours
- Filters posts with at least 100 upvotes
- Includes rate limiting (2-second delay between requests)
- Handles media URLs (images, videos, galleries)
- Supports conflict resolution for duplicate posts
๐ฏ Usage
Manual DAG Execution
- Access the Airflow UI at http://localhost:8080
- Navigate to the
reddit_trending_pipelineDAG - Click "Trigger DAG" to run manually
- Monitor task execution in the Graph or Tree view
Customizing Subreddits
Set the REDDIT_SUBREDDITS environment variable:
export REDDIT_SUBREDDITS='["MusicIndia", "IndiaTech", "worldnews", "programming", "MachineLearning"]'
Database Queries
Connect to PostgreSQL via PgAdmin or directly:
-- View recent posts
SELECT subreddit, title, upvotes, created_at
FROM reddit_posts
ORDER BY created_at DESC
LIMIT 10;
-- Top posts by subreddit
SELECT subreddit, COUNT(*) as post_count, AVG(upvotes) as avg_upvotes
FROM reddit_posts
GROUP BY subreddit
ORDER BY post_count DESC;
-- Trending posts (high upvote ratio)
SELECT title, subreddit, upvotes, upvote_ratio
FROM reddit_posts
WHERE upvote_ratio > 0.8
ORDER BY upvotes DESC;
๐ ๏ธ Development
Makefile Commands
make help # Show all available commands
make env # Create directories and .env file
make init # Initialize Airflow database
make up # Start services (foreground)
make upd # Start services (detached)
make down # Stop services
make restart # Restart all services
make clean # Remove containers, images, and volumes
make shell # Open bash shell in worker container
make cli ARGS="..." # Run Airflow CLI commands
Adding New DAGs
- Place your DAG file in the
dags/directory - The DAG will be automatically detected by the DAG processor
- Use the TaskFlow API for modern Python DAGs:
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="my_custom_dag",
start_date=datetime(2025, 1, 1),
schedule="@daily",
catchup=False,
)
def my_dag():
@task
def my_task():
return "Hello World"
my_task()
dag = my_dag()
๐ Monitoring
Airflow UI
- DAGs: Overview of all DAGs and their status
- Graph: Visual representation of task dependencies
- Tree: Timeline view of task executions
- Gantt: Task duration analysis
Flower (Celery Monitoring)
- Access at http://localhost:5555
- Monitor worker status and task queues
- View task execution statistics
PgAdmin
- Access at http://localhost:5050
- Database administration and query interface
- Monitor database performance
๐ Additional Resources
- Apache Airflow Documentation
- Airflow 3.0 Migration Guide
- Docker Compose Documentation
- PostgreSQL Documentation
- Reddit API Documentation
๐ Conclusion
This Reddit trending posts pipeline demonstrates the power of modern Apache Airflow 3.1.0 with Docker Compose. It provides:
- Scalable: Handle multiple subreddits simultaneously
- Reliable: Robust error handling and monitoring
- Maintainable: Clean, readable code with TaskFlow API
- Observable: Comprehensive monitoring and logging
Ready to build your own data pipeline? Clone the repository and start scraping Reddit data today!
git clone https://github.com/rugvedp/apache-airflow-3-template.git
cd apache-airflow-3-template
Have questions about building data pipelines with Airflow? Drop a comment below! ๐
Tags
#airflow #data-engineering #python #docker #postgresql #reddit #data-pipeline #apache-airflow #data-scraping #workflow-orchestration



Top comments (0)