DEV Community

agenthustler
agenthustler

Posted on

Building a Social Media Data Pipeline with Python in 2026

Why Build a Social Media Data Pipeline?

Social media generates billions of data points daily. Whether you're tracking brand sentiment, monitoring trends, doing academic research, or building analytics products — having a reliable pipeline that collects, stores, and analyzes social data is a foundational skill.

In this guide, I'll walk through building a complete pipeline that pulls data from Bluesky, Reddit, Twitter/X, and TikTok, stores it in a structured format, and produces actionable insights.


Architecture Overview

┌─────────────┐     ┌──────────────┐     ┌──────────────┐     ┌────────────┐
│  Collection  │────▶│   Storage    │────▶│  Processing  │────▶│  Output    │
│  Layer       │     │  Layer       │     │  Layer       │     │  Layer     │
├─────────────┤     ├──────────────┤     ├──────────────┤     ├────────────┤
│ • Bluesky   │     │ • SQLite     │     │ • Cleaning   │     │ • Dashbd   │
│ • Reddit    │     │ • PostgreSQL │     │ • Sentiment  │     │ • CSV      │
│ • Twitter/X │     │ • Parquet    │     │ • NER        │     │ • API      │
│ • TikTok    │     │              │     │ • Trends     │     │ • Alerts   │
└─────────────┘     └──────────────┘     └──────────────┘     └────────────┘
Enter fullscreen mode Exit fullscreen mode

Each layer is independent and connected through simple interfaces. This means you can swap storage backends, add new platforms, or change analysis methods without rewriting everything.


Layer 1: Collection

Bluesky (AT Protocol)

Bluesky is the easiest platform to scrape in 2026. The AT Protocol is open, public data requires no authentication, and there are no rate limits on public endpoints.

import httpx
from datetime import datetime, timedelta

class BlueskyCollector:
    BASE_URL = 'https://public.api.bsky.app'

    def __init__(self):
        self.client = httpx.Client(timeout=15)

    def search_posts(self, query: str, limit: int = 100) -> list[dict]:
        """Search public Bluesky posts."""
        posts = []
        cursor = None

        while len(posts) < limit:
            params = {
                'q': query,
                'limit': min(25, limit - len(posts)),
                'sort': 'latest'
            }
            if cursor:
                params['cursor'] = cursor

            resp = self.client.get(
                f'{self.BASE_URL}/xrpc/app.bsky.feed.searchPosts',
                params=params
            )
            data = resp.json()

            for post in data.get('posts', []):
                posts.append({
                    'platform': 'bluesky',
                    'id': post['uri'],
                    'author': post['author']['handle'],
                    'text': post['record']['text'],
                    'created_at': post['record']['createdAt'],
                    'likes': post.get('likeCount', 0),
                    'reposts': post.get('repostCount', 0),
                    'replies': post.get('replyCount', 0)
                })

            cursor = data.get('cursor')
            if not cursor:
                break

        return posts

    def get_user_feed(self, handle: str, limit: int = 50) -> list[dict]:
        """Get a user's recent posts."""
        resp = self.client.get(
            f'{self.BASE_URL}/xrpc/app.bsky.feed.getAuthorFeed',
            params={'actor': handle, 'limit': limit}
        )
        return [self._parse_post(item['post']) for item in resp.json().get('feed', [])]
Enter fullscreen mode Exit fullscreen mode

For production-scale Bluesky scraping with built-in pagination, proxy rotation, and data export, check out the Bluesky Scraper on Apify — it handles all the edge cases so you don't have to.

Reddit

Reddit's official API has strict rate limits (100 requests/minute for OAuth). For research-scale collection, combine the official API with public JSON endpoints:

import httpx
import time

class RedditCollector:
    def __init__(self):
        self.client = httpx.Client(
            headers={'User-Agent': 'DataPipeline/1.0'},
            timeout=15
        )
        self._last_request = 0

    def _rate_limit(self):
        """Respect Reddit's rate limits."""
        elapsed = time.time() - self._last_request
        if elapsed < 1.0:
            time.sleep(1.0 - elapsed)
        self._last_request = time.time()

    def search_subreddit(self, subreddit: str, query: str, 
                         limit: int = 100, sort: str = 'new') -> list[dict]:
        posts = []
        after = None

        while len(posts) < limit:
            self._rate_limit()
            params = {
                'q': query,
                'sort': sort,
                'limit': min(25, limit - len(posts)),
                't': 'week'
            }
            if after:
                params['after'] = after

            resp = self.client.get(
                f'https://www.reddit.com/r/{subreddit}/search.json',
                params=params
            )
            data = resp.json()['data']

            for child in data['children']:
                post = child['data']
                posts.append({
                    'platform': 'reddit',
                    'id': post['id'],
                    'subreddit': post['subreddit'],
                    'title': post['title'],
                    'text': post.get('selftext', ''),
                    'author': post['author'],
                    'score': post['score'],
                    'num_comments': post['num_comments'],
                    'created_at': datetime.fromtimestamp(post['created_utc']).isoformat(),
                    'url': f"https://reddit.com{post['permalink']}"
                })

            after = data.get('after')
            if not after:
                break

        return posts
Enter fullscreen mode Exit fullscreen mode

For heavy Reddit data collection with comment threads, user histories, and subreddit monitoring, the Reddit Scraper on Apify provides managed infrastructure with automatic retries and structured output.

Twitter/X

Twitter/X is the hardest platform to scrape in 2026. The official API is expensive ($100/month for basic access), and unofficial scraping requires careful session management.

The practical approach: use a proxy service and browser automation.

from playwright.async_api import async_playwright
import asyncio

class TwitterCollector:
    async def search_tweets(self, query: str, max_tweets: int = 50) -> list[dict]:
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            context = await browser.new_context(
                user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            )
            page = await context.new_page()

            tweets = []

            # Intercept API calls instead of parsing DOM
            async def capture_tweets(response):
                if 'SearchTimeline' in response.url:
                    try:
                        data = await response.json()
                        entries = (data.get('data', {})
                                      .get('search_by_raw_query', {})
                                      .get('search_timeline', {})
                                      .get('timeline', {})
                                      .get('instructions', [{}])[0]
                                      .get('entries', []))
                        for entry in entries:
                            tweet = self._parse_tweet_entry(entry)
                            if tweet:
                                tweets.append(tweet)
                    except Exception:
                        pass

            page.on('response', capture_tweets)

            search_url = f'https://x.com/search?q={query}&src=typed_query&f=live'
            await page.goto(search_url, wait_until='networkidle')

            # Scroll to load more
            for _ in range(5):
                await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
                await page.wait_for_timeout(2000)
                if len(tweets) >= max_tweets:
                    break

            await browser.close()
            return tweets[:max_tweets]
Enter fullscreen mode Exit fullscreen mode

Pro tip: For reliable Twitter data at scale, using a residential proxy service like ThorData significantly reduces blocks. Their rotating residential IPs mimic real user traffic patterns.

TikTok

TikTok data collection focuses on public video metadata, hashtag trends, and user profiles:

import httpx

class TikTokCollector:
    def search_hashtag(self, tag: str, count: int = 30) -> list[dict]:
        """Collect videos by hashtag using TikTok's web API."""
        # TikTok's internal API requires specific headers and signatures
        # For production use, a managed solution is more reliable
        pass  # See managed solution below
Enter fullscreen mode Exit fullscreen mode

TikTok's anti-bot measures are among the strongest. For production use, the TikTok Scraper on Apify handles signature generation, CAPTCHA solving, and session management automatically.


Layer 2: Unified Storage

All collectors output the same schema. Store everything in one place:

import sqlite3
from datetime import datetime
from contextlib import contextmanager

class SocialDataStore:
    def __init__(self, db_path: str = 'social_data.db'):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with self._connect() as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS posts (
                    id TEXT PRIMARY KEY,
                    platform TEXT NOT NULL,
                    author TEXT,
                    text TEXT,
                    title TEXT,
                    created_at TEXT,
                    likes INTEGER DEFAULT 0,
                    reposts INTEGER DEFAULT 0,
                    replies INTEGER DEFAULT 0,
                    score INTEGER DEFAULT 0,
                    url TEXT,
                    metadata TEXT,
                    collected_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            conn.execute('''
                CREATE INDEX IF NOT EXISTS idx_platform_date 
                ON posts(platform, created_at)
            ''')

    @contextmanager
    def _connect(self):
        conn = sqlite3.connect(self.db_path)
        try:
            yield conn
            conn.commit()
        finally:
            conn.close()

    def insert_posts(self, posts: list[dict]) -> int:
        """Insert posts, skip duplicates."""
        inserted = 0
        with self._connect() as conn:
            for post in posts:
                try:
                    conn.execute(
                        '''INSERT OR IGNORE INTO posts 
                           (id, platform, author, text, title, created_at, 
                            likes, reposts, replies, score, url)
                           VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
                        (post['id'], post['platform'], post.get('author'),
                         post.get('text'), post.get('title'),
                         post.get('created_at'), post.get('likes', 0),
                         post.get('reposts', 0), post.get('replies', 0),
                         post.get('score', 0), post.get('url'))
                    )
                    inserted += 1
                except sqlite3.IntegrityError:
                    continue
        return inserted

    def query(self, platform: str = None, since: str = None, 
              search: str = None) -> list[dict]:
        """Flexible querying."""
        sql = 'SELECT * FROM posts WHERE 1=1'
        params = []

        if platform:
            sql += ' AND platform = ?'
            params.append(platform)
        if since:
            sql += ' AND created_at >= ?'
            params.append(since)
        if search:
            sql += ' AND (text LIKE ? OR title LIKE ?)'
            params.extend([f'%{search}%', f'%{search}%'])

        sql += ' ORDER BY created_at DESC'

        with self._connect() as conn:
            conn.row_factory = sqlite3.Row
            return [dict(row) for row in conn.execute(sql, params).fetchall()]
Enter fullscreen mode Exit fullscreen mode

For larger datasets, swap SQLite for PostgreSQL or write directly to Parquet files:

import pandas as pd

def export_to_parquet(store: SocialDataStore, output_path: str):
    posts = store.query()
    df = pd.DataFrame(posts)
    df['created_at'] = pd.to_datetime(df['created_at'])
    df.to_parquet(output_path, index=False, engine='pyarrow')
Enter fullscreen mode Exit fullscreen mode

Layer 3: Processing & Analysis

Sentiment Analysis

from transformers import pipeline

class SentimentAnalyzer:
    def __init__(self):
        self.classifier = pipeline(
            'sentiment-analysis',
            model='cardiffnlp/twitter-roberta-base-sentiment-latest',
            device='cpu'
        )

    def analyze_batch(self, texts: list[str]) -> list[dict]:
        """Batch sentiment analysis."""
        results = self.classifier(texts, batch_size=32, truncation=True)
        return [{
            'label': r['label'],
            'score': round(r['score'], 4)
        } for r in results]

# Usage
analyzer = SentimentAnalyzer()
store = SocialDataStore()

posts = store.query(platform='bluesky', search='python')
texts = [p['text'] for p in posts if p['text']]
sentiments = analyzer.analyze_batch(texts)

for post, sentiment in zip(posts, sentiments):
    print(f"{sentiment['label']} ({sentiment['score']}) | {post['text'][:80]}")
Enter fullscreen mode Exit fullscreen mode

Trend Detection

from collections import Counter
import re

def extract_trending_topics(posts: list[dict], min_count: int = 3) -> list[tuple]:
    """Find trending hashtags and keywords."""
    hashtags = Counter()
    keywords = Counter()

    stop_words = {'the', 'is', 'at', 'which', 'on', 'a', 'an', 'and', 'or', 'but',
                  'in', 'with', 'to', 'for', 'of', 'not', 'no', 'can', 'had', 'have',
                  'was', 'were', 'this', 'that', 'it', 'from', 'as', 'be', 'by', 'are'}

    for post in posts:
        text = post.get('text', '')

        # Extract hashtags
        for tag in re.findall(r'#(\w+)', text):
            hashtags[tag.lower()] += 1

        # Extract significant words
        words = re.findall(r'\b[a-z]{4,}\b', text.lower())
        for word in words:
            if word not in stop_words:
                keywords[word] += 1

    trending = [(tag, count) for tag, count in hashtags.most_common(20) 
                if count >= min_count]

    return trending
Enter fullscreen mode Exit fullscreen mode

Layer 4: Visualization & Output

import matplotlib.pyplot as plt
import pandas as pd

def plot_platform_activity(store: SocialDataStore, days: int = 7):
    """Visualize posting activity across platforms."""
    posts = store.query(since=(datetime.now() - timedelta(days=days)).isoformat())
    df = pd.DataFrame(posts)
    df['created_at'] = pd.to_datetime(df['created_at'])
    df['date'] = df['created_at'].dt.date

    activity = df.groupby(['date', 'platform']).size().unstack(fill_value=0)

    fig, axes = plt.subplots(2, 1, figsize=(12, 8))

    # Activity over time
    activity.plot(kind='bar', stacked=True, ax=axes[0], 
                  color=['#0085FF', '#FF4500', '#1DA1F2', '#ff0050'])
    axes[0].set_title('Posts Collected by Platform')
    axes[0].set_ylabel('Post Count')

    # Engagement comparison
    engagement = df.groupby('platform').agg({
        'likes': 'mean', 'replies': 'mean', 'reposts': 'mean'
    }).round(1)
    engagement.plot(kind='barh', ax=axes[1])
    axes[1].set_title('Average Engagement by Platform')

    plt.tight_layout()
    plt.savefig('pipeline_report.png', dpi=150)
    print('Report saved to pipeline_report.png')
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

import asyncio
from datetime import datetime

async def run_pipeline(query: str):
    """Run the complete pipeline."""
    store = SocialDataStore()

    # Collect from all platforms
    bluesky = BlueskyCollector()
    reddit = RedditCollector()

    print(f"[{datetime.now()}] Starting collection for: {query}")

    # Bluesky + Reddit (synchronous collectors)
    bsky_posts = bluesky.search_posts(query, limit=100)
    reddit_posts = reddit.search_subreddit('all', query, limit=100)

    # Store results
    total = store.insert_posts(bsky_posts + reddit_posts)
    print(f"Collected and stored {total} new posts")

    # Analyze
    all_posts = store.query(search=query)
    trends = extract_trending_topics(all_posts)
    print(f"\nTrending topics:")
    for tag, count in trends[:10]:
        print(f"  #{tag}: {count} mentions")

    # Visualize
    plot_platform_activity(store)

if __name__ == '__main__':
    asyncio.run(run_pipeline('artificial intelligence'))
Enter fullscreen mode Exit fullscreen mode

Production Considerations

Proxy Rotation

Scraping multiple platforms at scale requires rotating proxies. Without them, you'll get blocked within hours.

ScraperAPI provides a simple API wrapper that handles proxy rotation, CAPTCHAs, and retries:

import httpx

SCRAPERAPI_KEY = 'your_key'

def fetch_with_proxy(url: str) -> str:
    proxy_url = f'http://api.scraperapi.com?api_key={SCRAPERAPI_KEY}&url={url}'
    resp = httpx.get(proxy_url, timeout=30)
    return resp.text
Enter fullscreen mode Exit fullscreen mode

For raw residential proxies with more control, ThorData offers rotating residential IPs across 190+ countries.

Scheduling

# crontab: Run pipeline every 6 hours
# 0 */6 * * * cd /path/to/pipeline && python3 run_pipeline.py >> logs/pipeline.log 2>&1
Enter fullscreen mode Exit fullscreen mode

Managed Scraping

If you don't want to maintain scraping infrastructure, managed actors handle everything:

These export directly to JSON, CSV, or connect to your storage via webhooks.


What's Next

This pipeline is a starting point. From here you can:

  1. Add real-time streaming — Use WebSockets for Bluesky's firehose
  2. Build alerting — Notify when sentiment drops or a topic spikes
  3. Train custom models — Fine-tune classifiers on your domain data
  4. Create dashboards — Connect to Grafana or build a Streamlit app

The modular architecture means each improvement is isolated. Add platforms, swap models, change storage — the pipeline keeps running.


What platforms are you collecting data from? Share your pipeline architecture in the comments.

Top comments (0)