DEV Community

agenthustler
agenthustler

Posted on

Building a Reddit Keyword Monitor: Track Mentions Across Subreddits in Python (2026 Guide)

Reddit's API changes in 2024-2025 broke countless scraping workflows overnight. Rate limits got stricter, OAuth requirements tightened, and many developers found their data pipelines suddenly dry. But Reddit data is more valuable than ever — market research, sentiment analysis, competitor monitoring, and trend detection all depend on it.

This guide covers what actually works in 2026 for scraping Reddit at scale, with real code, practical rate-limit strategies, and alternative approaches when direct scraping hits a wall.

What Changed: Reddit API Timeline

Here is the quick history:

  • June 2023: Reddit announced paid API access, killing most third-party apps
  • April 2024: Free API tier limited to 100 requests/minute for OAuth apps
  • October 2024: Additional restrictions on bulk data endpoints (/comments, /search)
  • March 2025: Reddit started actively fingerprinting and blocking automated requests more aggressively
  • 2026 (now): The free tier still exists but is heavily monitored. Commercial use requires paid access or alternative approaches.

The bottom line: Reddit official API still works for small-scale projects, but anything at scale needs a more robust strategy.

Method 1: Reddit Official API (PRAW)

The official API via PRAW is still the cleanest approach for moderate volumes.

Setup

pip install praw
Enter fullscreen mode Exit fullscreen mode
import praw
import time
from datetime import datetime

reddit = praw.Reddit(
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
    user_agent="keyword-monitor/1.0 by u/yourusername"
)

def monitor_subreddit(subreddit_name, keywords, limit=100):
    """Monitor a subreddit for specific keywords in new posts."""
    subreddit = reddit.subreddit(subreddit_name)
    matches = []

    for submission in subreddit.new(limit=limit):
        title_lower = submission.title.lower()
        selftext_lower = (submission.selftext or "").lower()

        for keyword in keywords:
            if keyword.lower() in title_lower or keyword.lower() in selftext_lower:
                matches.append({
                    "title": submission.title,
                    "url": f"https://reddit.com{submission.permalink}",
                    "score": submission.score,
                    "num_comments": submission.num_comments,
                    "created": datetime.fromtimestamp(
                        submission.created_utc
                    ).isoformat(),
                    "matched_keyword": keyword,
                    "subreddit": subreddit_name
                })
                break

    return matches

# Monitor multiple subreddits for product mentions
keywords = ["your-product", "competitor-name", "industry-term"]
subreddits = ["startups", "SaaS", "webdev", "programming"]

all_matches = []
for sub in subreddits:
    results = monitor_subreddit(sub, keywords)
    all_matches.extend(results)
    time.sleep(2)  # Be nice to the API

print(f"Found {len(all_matches)} keyword matches")
for match in all_matches:
    print(f"  [{match['subreddit']}] {match['title'][:80]}")
    print(f"    Score: {match['score']} | Comments: {match['num_comments']}")
Enter fullscreen mode Exit fullscreen mode

Rate Limit Reality

PRAW handles rate limiting internally, but here is what you are actually working with:

Tier Requests/min Monthly cost Best for
Free (OAuth) 100 $0 Small projects, under 10K posts/day
Free (no OAuth) 10 $0 Testing only
Paid tier 1,000+ Varies Production monitoring

The catch: Even at 100 req/min, you can pull 50K-100K posts per day if you are efficient. But Reddit tracks usage patterns, and sustained high-volume scraping will eventually get your app flagged.

Method 2: Direct HTTP Scraping (No API)

When you need data the API does not expose well — like full comment trees, deleted content context, or search results beyond the API 1000-result cap — direct scraping is an option.

import requests
import time
import random
from dataclasses import dataclass, field

@dataclass
class RedditScraper:
    """Scrape Reddit without using the official API."""

    base_url: str = "https://www.reddit.com"
    session: requests.Session = field(default_factory=requests.Session)
    min_delay: float = 3.0
    max_delay: float = 7.0

    def __post_init__(self):
        self.session.headers.update({
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/124.0.0.0 Safari/537.36"
            ),
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
        })

    def _respectful_delay(self):
        """Random delay between requests to avoid detection."""
        delay = random.uniform(self.min_delay, self.max_delay)
        time.sleep(delay)

    def get_json(self, path, params=None):
        """Fetch a Reddit page as JSON by appending .json."""
        url = f"{self.base_url}/{path.strip('/')}.json"
        self._respectful_delay()

        try:
            resp = self.session.get(url, params=params, timeout=15)

            if resp.status_code == 429:
                retry_after = int(
                    resp.headers.get("Retry-After", 60)
                )
                print(f"Rate limited. Waiting {retry_after}s...")
                time.sleep(retry_after)
                return self.get_json(path, params)

            resp.raise_for_status()
            return resp.json()

        except requests.RequestException as e:
            print(f"Request failed for {path}: {e}")
            return None

    def search_subreddit(self, subreddit, query, sort="new", limit=25):
        """Search within a specific subreddit."""
        params = {
            "q": query,
            "sort": sort,
            "limit": min(limit, 100),
            "restrict_sr": "on",
            "type": "link"
        }
        data = self.get_json(f"r/{subreddit}/search", params)

        if not data or "data" not in data:
            return []

        posts = []
        for child in data["data"].get("children", []):
            post = child["data"]
            posts.append({
                "id": post["id"],
                "title": post["title"],
                "author": post.get("author", "[deleted]"),
                "score": post["score"],
                "url": post.get("url", ""),
                "permalink": f"https://reddit.com{post['permalink']}",
                "created_utc": post["created_utc"],
                "num_comments": post["num_comments"],
                "selftext": post.get("selftext", "")[:500],
            })

        return posts

    def get_comments(self, subreddit, post_id, depth=5):
        """Get all comments for a specific post."""
        params = {"depth": depth, "limit": 500}
        data = self.get_json(
            f"r/{subreddit}/comments/{post_id}", params
        )

        if not data or len(data) < 2:
            return []

        return self._parse_comment_tree(
            data[1]["data"]["children"]
        )

    def _parse_comment_tree(self, children, depth=0):
        """Recursively parse nested comment structure."""
        comments = []
        for child in children:
            if child["kind"] != "t1":
                continue
            c = child["data"]
            comment = {
                "author": c.get("author", "[deleted]"),
                "body": c.get("body", ""),
                "score": c.get("score", 0),
                "depth": depth,
                "id": c["id"],
            }
            comments.append(comment)

            if c.get("replies") and isinstance(c["replies"], dict):
                replies = c["replies"]["data"]["children"]
                comments.extend(
                    self._parse_comment_tree(replies, depth + 1)
                )

        return comments


# Usage example
scraper = RedditScraper(min_delay=4.0, max_delay=8.0)

results = scraper.search_subreddit(
    "webdev", "scraping API", sort="new", limit=10
)
for post in results:
    print(f"[{post['score']:>4}] {post['title'][:70]}")

    if post["num_comments"] > 20:
        comments = scraper.get_comments("webdev", post["id"])
        print(f"       -> {len(comments)} comments fetched")
Enter fullscreen mode Exit fullscreen mode

Why This Breaks (And How to Handle It)

Direct scraping will eventually hit walls:

  1. CAPTCHAs: Reddit serves CAPTCHAs after sustained automated access
  2. IP blocks: Repeated requests from the same IP get throttled, then blocked
  3. Fingerprinting: Reddit detects headless browsers and simple HTTP clients
  4. Legal gray area: Reddit ToS prohibits scraping; enforcement varies

Mitigation strategies:

# Rotating proxies (you need a proxy service)
PROXIES = [
    "http://proxy1:port",
    "http://proxy2:port",
    "http://proxy3:port",
]

def get_with_proxy_rotation(url, proxy_list):
    """Try proxies in rotation until one works."""
    for proxy in proxy_list:
        try:
            resp = requests.get(
                url,
                proxies={"http": proxy, "https": proxy},
                timeout=10
            )
            if resp.status_code == 200:
                return resp
        except requests.RequestException:
            continue
    return None
Enter fullscreen mode Exit fullscreen mode

But maintaining proxy rotation, CAPTCHA solving, and anti-fingerprinting is a full-time job. For production use cases, dedicated scraping infrastructure makes more sense.

Method 3: Dedicated Scraping APIs and Services

If you need reliable, high-volume Reddit data without maintaining scraping infrastructure, several services handle the hard parts:

Option A: Apify Reddit Scrapers

Apify has pre-built Reddit scraping actors that handle proxies, rate limits, and anti-bot measures:

from apify_client import ApifyClient

client = ApifyClient("your_apify_token")

# Run a Reddit scraper actor
run = client.actor("cryptosignals/reddit-scraper").call(
    run_input={
        "subreddits": ["startups", "SaaS", "webdev"],
        "searchKeywords": ["web scraping", "data extraction"],
        "maxResults": 500,
        "includeComments": True,
    }
)

# Fetch results from the dataset
dataset = client.dataset(run["defaultDatasetId"])
for item in dataset.iterate_items():
    print(f"{item['title']} - {item['score']} upvotes")
Enter fullscreen mode Exit fullscreen mode

The advantage here is you pay for infrastructure someone else maintains. No proxy management, no CAPTCHA solving, no rate limit cat-and-mouse.

Option B: Arctic Shift (Historical Data)

For historical Reddit data, Arctic Shift (the successor to Pushshift) provides bulk access:

import requests

def search_arctic_shift(
    query, subreddit=None, after=None, before=None, size=100
):
    """Search Reddit historical data via Arctic Shift API."""
    params = {"q": query, "size": size}
    if subreddit:
        params["subreddit"] = subreddit
    if after:
        params["after"] = after
    if before:
        params["before"] = before

    resp = requests.get(
        "https://arctic-shift.photon-reddit.com/api/posts/search",
        params=params,
        timeout=30
    )
    return resp.json().get("data", [])

# Find all posts about "web scraping" in r/Python from 2025
posts = search_arctic_shift(
    query="web scraping",
    subreddit="Python",
    after="2025-01-01",
    before="2025-12-31",
    size=500
)
print(f"Found {len(posts)} historical posts")
Enter fullscreen mode Exit fullscreen mode

Building a Complete Keyword Monitor

Here is a production-ready example combining multiple sources with alerting:

import json
import time
import smtplib
from email.mime.text import MIMEText
from pathlib import Path
from datetime import datetime


class RedditKeywordMonitor:
    """Monitor Reddit for keyword mentions with dedup and alerting."""

    def __init__(self, keywords, subreddits, seen_file="seen_posts.json"):
        self.keywords = [kw.lower() for kw in keywords]
        self.subreddits = subreddits
        self.seen_file = Path(seen_file)
        self.seen_ids = self._load_seen()

    def _load_seen(self):
        if self.seen_file.exists():
            return set(json.loads(self.seen_file.read_text()))
        return set()

    def _save_seen(self):
        self.seen_file.write_text(json.dumps(list(self.seen_ids)))

    def _matches_keywords(self, text):
        text_lower = text.lower()
        return [kw for kw in self.keywords if kw in text_lower]

    def check_new_posts(self, scraper):
        """Check all subreddits for new keyword matches."""
        new_matches = []

        for subreddit in self.subreddits:
            for keyword in self.keywords:
                posts = scraper.search_subreddit(
                    subreddit, keyword, sort="new", limit=25
                )

                for post in posts:
                    if post["id"] in self.seen_ids:
                        continue

                    matched = self._matches_keywords(
                        f"{post['title']} {post.get('selftext', '')}"
                    )
                    if matched:
                        post["matched_keywords"] = matched
                        post["found_in"] = subreddit
                        post["found_at"] = datetime.now().isoformat()
                        new_matches.append(post)
                        self.seen_ids.add(post["id"])

        self._save_seen()
        return new_matches

    def send_alert(self, matches, smtp_config):
        """Send email alert for new matches."""
        if not matches:
            return

        body = f"Found {len(matches)} new Reddit mentions:\n\n"
        for m in matches:
            body += f"Subreddit: r/{m['found_in']}\n"
            body += f"Title: {m['title']}\n"
            body += f"Keywords: {', '.join(m['matched_keywords'])}\n"
            body += f"Score: {m['score']} | Comments: {m['num_comments']}\n"
            body += f"URL: {m['permalink']}\n\n"

        msg = MIMEText(body)
        msg["Subject"] = f"Reddit Alert: {len(matches)} new mentions"
        msg["From"] = smtp_config["from"]
        msg["To"] = smtp_config["to"]

        with smtplib.SMTP(smtp_config["host"], smtp_config["port"]) as s:
            s.starttls()
            s.login(smtp_config["user"], smtp_config["password"])
            s.send_message(msg)


# Run the monitor on a schedule
monitor = RedditKeywordMonitor(
    keywords=["your-brand", "competitor-x", "industry-trend"],
    subreddits=[
        "startups", "SaaS", "webdev",
        "programming", "Entrepreneur"
    ]
)

scraper = RedditScraper(min_delay=5.0, max_delay=10.0)

# Check every 30 minutes (run via cron or systemd timer)
new_matches = monitor.check_new_posts(scraper)

if new_matches:
    print(f"Found {len(new_matches)} new mentions!")
    for m in new_matches:
        print(f"  r/{m['found_in']}: {m['title'][:60]}")
else:
    print("No new mentions.")
Enter fullscreen mode Exit fullscreen mode

Deploying as a Persistent Service

To run this continuously, you have several options:

  1. Cron job (simplest): Run every 30 minutes via crontab
  2. Systemd timer (Linux): More reliable than cron, with logging
  3. Cloud function: AWS Lambda + EventBridge, or similar
  4. Apify scheduled actor: If using Apify, just schedule the actor run
# Crontab entry - run every 30 minutes
*/30 * * * * cd /path/to/project && python3 monitor.py >> /var/log/reddit-monitor.log 2>&1
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Start with the official API (PRAW). It is clean, documented, and sufficient for 90% of use cases.
  2. Direct scraping works but requires constant maintenance. Budget 20% of your time for anti-bot countermeasures.
  3. For production, use dedicated services. The time saved on proxy management and CAPTCHA solving pays for itself quickly.
  4. Respect rate limits. Reddit actively monitors scraping patterns. Getting your IP range blocked is not worth the extra speed.
  5. Deduplicate aggressively. When monitoring multiple subreddits, the same post often appears in search results across different queries.

The right approach depends on your scale: hobbyist project -> PRAW, startup monitoring tool -> direct scraping with proxies, enterprise data pipeline -> dedicated scraping API.


Building something with Reddit data? Share your use case in the comments.

Top comments (1)

Collapse
 
sleywill_45 profile image
Alex Serebriakov

lambda + chromium is a mess — the bundle size alone is brutal

snapapi.pics sidesteps this entirely — REST call from your lambda, no chromium bundled, no size issues