DEV Community

Cover image for Building a Scalable Reddit Data Pipeline with Apache Airflow 3.1.0
Rugved
Rugved

Posted on

Building a Scalable Reddit Data Pipeline with Apache Airflow 3.1.0

A modern Apache Airflow 3.1.0 template for scraping and analyzing Reddit trending posts with PostgreSQL storage, built with Docker Compose for easy deployment.

๐Ÿš€ Features

This Reddit trending posts pipeline offers:

  • Airflow 3.1.0 with TaskFlow API and modern architecture
  • Reddit API Integration for scraping trending posts from multiple subreddits
  • PostgreSQL Database with comprehensive schema for post metadata
  • Docker Compose setup for easy deployment and scaling
  • Celery Executor for distributed task processing
  • Redis for task queue management
  • PgAdmin for database administration
  • Flower for Celery monitoring
  • Rate Limiting and error handling
  • Configurable Subreddits via environment variables

๐Ÿ—๏ธ Architecture Overview

Architecture

๐Ÿ› ๏ธ Tech Stack

  • Apache Airflow 3.1.0: Modern workflow orchestration
  • PostgreSQL: Robust data storage
  • Docker Compose: Container orchestration
  • Celery Executor: Distributed task processing
  • Redis: Message broker
  • Python 3.11+: Core programming language

๐Ÿ“Š Pipeline Capabilities

DAG Structure

The reddit_trending_pipeline DAG includes:

  1. get_config: Retrieves configuration and timezone settings
  2. get_subreddits: Reads subreddit list from environment variables
  3. fetch_subreddit: Scrapes posts from each subreddit (parallel execution)
  4. combine_results: Merges results from all subreddits
  5. persist_and_output: Saves data to PostgreSQL and outputs JSON

Key Features

@dag(
    dag_id="reddit_trending_pipeline",
    description="Scrape Reddit posts from last 24 hours and save to PostgreSQL",
    start_date=datetime.datetime(2025, 10, 1),
    schedule="0 6 * * *",  # every day at 6 AM
    catchup=False,
    default_args={
        "owner": "airflow",
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
    },
    tags=["reddit", "scraping", "taskflow"],
)
def reddit_trending_pipeline():
    # Task implementations...
Enter fullscreen mode Exit fullscreen mode

Smart Data Filtering

The pipeline includes intelligent filtering:

  • Time-based: Only posts from the last 24 hours
  • Quality-based: Minimum 100 upvotes threshold
  • Rate limiting: 2-second delay between requests
  • Media handling: Extracts images, videos, and gallery URLs
# Only last 24 hours
if created_utc <= args["cutoff_ts"]:
    continue
if int(p.get('ups', 0)) < 100:
    continue
Enter fullscreen mode Exit fullscreen mode

๐Ÿ—„๏ธ Database Schema

The pipeline creates a comprehensive reddit_posts table with the following fields:

CREATE TABLE reddit_posts (
    id SERIAL PRIMARY KEY,
    post_id VARCHAR(32) UNIQUE NOT NULL,
    subreddit VARCHAR(100) NOT NULL,
    title TEXT,
    description TEXT,
    upvotes INT,
    downvotes INT,
    score INT,
    url TEXT,
    author VARCHAR(255),
    created_at TIMESTAMP WITH TIME ZONE,
    num_comments INT,
    over_18 BOOLEAN,
    media_urls TEXT,
    upvote_ratio REAL,
    scraped_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Enter fullscreen mode Exit fullscreen mode

๐Ÿš€ Getting Started

Prerequisites

  • Docker and Docker Compose
  • At least 4GB RAM
  • At least 2 CPU cores
  • 10GB free disk space

Option A: Use This Template

git clone https://github.com/rugvedp/apache-airflow-3-template.git
cd apache-airflow-3-template
Enter fullscreen mode Exit fullscreen mode

Option B: Start Fresh with Official Template

If you want to start from the official Airflow 3.1.0 template:

# Download the official Airflow 3.1.0 docker-compose.yaml
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/3.1.0/docker-compose.yaml'

# Create required directories
mkdir -p dags logs plugins config
Enter fullscreen mode Exit fullscreen mode

2. Initialize Environment

make env
Enter fullscreen mode Exit fullscreen mode

This creates the required directories and .env file with your user ID.

3. Initialize Airflow

make init
Enter fullscreen mode Exit fullscreen mode

This initializes the Airflow database and creates the default admin user (username: airflow, password: airflow).

4. Start Services

make upd
Enter fullscreen mode Exit fullscreen mode

This starts all services in detached mode.

5. Access the Web Interface

Airflow Dashboard

Airflow Dashboard

DAG Execution View

DAG Test Execution

โš™๏ธ Configuration

Environment Variables

Create a .env file or set these environment variables:

# Required
AIRFLOW_UID=50000

# Optional Reddit Configuration
REDDIT_SUBREDDITS='["MusicIndia", "IndiaTech", "worldnews", "programming"]'
REDDIT_REQUEST_DELAY_S=2
REDDIT_DB_CONN=postgresql://airflow:airflow@postgres:5432/airflow

# Optional Airflow Configuration
_AIRFLOW_WWW_USER_USERNAME=airflow
_AIRFLOW_WWW_USER_PASSWORD=airflow
Enter fullscreen mode Exit fullscreen mode

DAG Configuration

The DAG runs daily at 6 AM (Asia/Kolkata timezone) and:

  • Scrapes posts from the last 24 hours
  • Filters posts with at least 100 upvotes
  • Includes rate limiting (2-second delay between requests)
  • Handles media URLs (images, videos, galleries)
  • Supports conflict resolution for duplicate posts

๐ŸŽฏ Usage

Manual DAG Execution

  1. Access the Airflow UI at http://localhost:8080
  2. Navigate to the reddit_trending_pipeline DAG
  3. Click "Trigger DAG" to run manually
  4. Monitor task execution in the Graph or Tree view

Customizing Subreddits

Set the REDDIT_SUBREDDITS environment variable:

export REDDIT_SUBREDDITS='["MusicIndia", "IndiaTech", "worldnews", "programming", "MachineLearning"]'
Enter fullscreen mode Exit fullscreen mode

Database Queries

Connect to PostgreSQL via PgAdmin or directly:

-- View recent posts
SELECT subreddit, title, upvotes, created_at 
FROM reddit_posts 
ORDER BY created_at DESC 
LIMIT 10;

-- Top posts by subreddit
SELECT subreddit, COUNT(*) as post_count, AVG(upvotes) as avg_upvotes
FROM reddit_posts 
GROUP BY subreddit 
ORDER BY post_count DESC;

-- Trending posts (high upvote ratio)
SELECT title, subreddit, upvotes, upvote_ratio
FROM reddit_posts 
WHERE upvote_ratio > 0.8 
ORDER BY upvotes DESC;
Enter fullscreen mode Exit fullscreen mode

๐Ÿ› ๏ธ Development

Makefile Commands

make help          # Show all available commands
make env           # Create directories and .env file
make init          # Initialize Airflow database
make up            # Start services (foreground)
make upd           # Start services (detached)
make down          # Stop services
make restart       # Restart all services
make clean         # Remove containers, images, and volumes
make shell         # Open bash shell in worker container
make cli ARGS="..." # Run Airflow CLI commands
Enter fullscreen mode Exit fullscreen mode

Adding New DAGs

  1. Place your DAG file in the dags/ directory
  2. The DAG will be automatically detected by the DAG processor
  3. Use the TaskFlow API for modern Python DAGs:
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    dag_id="my_custom_dag",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
)
def my_dag():
    @task
    def my_task():
        return "Hello World"

    my_task()

dag = my_dag()
Enter fullscreen mode Exit fullscreen mode

๐Ÿ“ˆ Monitoring

Airflow UI

  • DAGs: Overview of all DAGs and their status
  • Graph: Visual representation of task dependencies
  • Tree: Timeline view of task executions
  • Gantt: Task duration analysis

Flower (Celery Monitoring)

PgAdmin

  • Access at http://localhost:5050
  • Database administration and query interface
  • Monitor database performance

๐Ÿ“š Additional Resources

๐ŸŽ‰ Conclusion

This Reddit trending posts pipeline demonstrates the power of modern Apache Airflow 3.1.0 with Docker Compose. It provides:

  • Scalable: Handle multiple subreddits simultaneously
  • Reliable: Robust error handling and monitoring
  • Maintainable: Clean, readable code with TaskFlow API
  • Observable: Comprehensive monitoring and logging

Ready to build your own data pipeline? Clone the repository and start scraping Reddit data today!

git clone https://github.com/rugvedp/apache-airflow-3-template.git
cd apache-airflow-3-template
Enter fullscreen mode Exit fullscreen mode

Have questions about building data pipelines with Airflow? Drop a comment below! ๐Ÿš€

Tags

#airflow #data-engineering #python #docker #postgresql #reddit #data-pipeline #apache-airflow #data-scraping #workflow-orchestration

Top comments (0)