Why Build a Social Media Data Pipeline?
Social media generates billions of data points daily. Whether you're tracking brand sentiment, monitoring trends, doing academic research, or building analytics products — having a reliable pipeline that collects, stores, and analyzes social data is a foundational skill.
In this guide, I'll walk through building a complete pipeline that pulls data from Bluesky, Reddit, Twitter/X, and TikTok, stores it in a structured format, and produces actionable insights.
Architecture Overview
┌─────────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────┐
│ Collection │────▶│ Storage │────▶│ Processing │────▶│ Output │
│ Layer │ │ Layer │ │ Layer │ │ Layer │
├─────────────┤ ├──────────────┤ ├──────────────┤ ├────────────┤
│ • Bluesky │ │ • SQLite │ │ • Cleaning │ │ • Dashbd │
│ • Reddit │ │ • PostgreSQL │ │ • Sentiment │ │ • CSV │
│ • Twitter/X │ │ • Parquet │ │ • NER │ │ • API │
│ • TikTok │ │ │ │ • Trends │ │ • Alerts │
└─────────────┘ └──────────────┘ └──────────────┘ └────────────┘
Each layer is independent and connected through simple interfaces. This means you can swap storage backends, add new platforms, or change analysis methods without rewriting everything.
Layer 1: Collection
Bluesky (AT Protocol)
Bluesky is the easiest platform to scrape in 2026. The AT Protocol is open, public data requires no authentication, and there are no rate limits on public endpoints.
import httpx
from datetime import datetime, timedelta
class BlueskyCollector:
BASE_URL = 'https://public.api.bsky.app'
def __init__(self):
self.client = httpx.Client(timeout=15)
def search_posts(self, query: str, limit: int = 100) -> list[dict]:
"""Search public Bluesky posts."""
posts = []
cursor = None
while len(posts) < limit:
params = {
'q': query,
'limit': min(25, limit - len(posts)),
'sort': 'latest'
}
if cursor:
params['cursor'] = cursor
resp = self.client.get(
f'{self.BASE_URL}/xrpc/app.bsky.feed.searchPosts',
params=params
)
data = resp.json()
for post in data.get('posts', []):
posts.append({
'platform': 'bluesky',
'id': post['uri'],
'author': post['author']['handle'],
'text': post['record']['text'],
'created_at': post['record']['createdAt'],
'likes': post.get('likeCount', 0),
'reposts': post.get('repostCount', 0),
'replies': post.get('replyCount', 0)
})
cursor = data.get('cursor')
if not cursor:
break
return posts
def get_user_feed(self, handle: str, limit: int = 50) -> list[dict]:
"""Get a user's recent posts."""
resp = self.client.get(
f'{self.BASE_URL}/xrpc/app.bsky.feed.getAuthorFeed',
params={'actor': handle, 'limit': limit}
)
return [self._parse_post(item['post']) for item in resp.json().get('feed', [])]
For production-scale Bluesky scraping with built-in pagination, proxy rotation, and data export, check out the Bluesky Scraper on Apify — it handles all the edge cases so you don't have to.
Reddit's official API has strict rate limits (100 requests/minute for OAuth). For research-scale collection, combine the official API with public JSON endpoints:
import httpx
import time
class RedditCollector:
def __init__(self):
self.client = httpx.Client(
headers={'User-Agent': 'DataPipeline/1.0'},
timeout=15
)
self._last_request = 0
def _rate_limit(self):
"""Respect Reddit's rate limits."""
elapsed = time.time() - self._last_request
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
self._last_request = time.time()
def search_subreddit(self, subreddit: str, query: str,
limit: int = 100, sort: str = 'new') -> list[dict]:
posts = []
after = None
while len(posts) < limit:
self._rate_limit()
params = {
'q': query,
'sort': sort,
'limit': min(25, limit - len(posts)),
't': 'week'
}
if after:
params['after'] = after
resp = self.client.get(
f'https://www.reddit.com/r/{subreddit}/search.json',
params=params
)
data = resp.json()['data']
for child in data['children']:
post = child['data']
posts.append({
'platform': 'reddit',
'id': post['id'],
'subreddit': post['subreddit'],
'title': post['title'],
'text': post.get('selftext', ''),
'author': post['author'],
'score': post['score'],
'num_comments': post['num_comments'],
'created_at': datetime.fromtimestamp(post['created_utc']).isoformat(),
'url': f"https://reddit.com{post['permalink']}"
})
after = data.get('after')
if not after:
break
return posts
For heavy Reddit data collection with comment threads, user histories, and subreddit monitoring, the Reddit Scraper on Apify provides managed infrastructure with automatic retries and structured output.
Twitter/X
Twitter/X is the hardest platform to scrape in 2026. The official API is expensive ($100/month for basic access), and unofficial scraping requires careful session management.
The practical approach: use a proxy service and browser automation.
from playwright.async_api import async_playwright
import asyncio
class TwitterCollector:
async def search_tweets(self, query: str, max_tweets: int = 50) -> list[dict]:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
)
page = await context.new_page()
tweets = []
# Intercept API calls instead of parsing DOM
async def capture_tweets(response):
if 'SearchTimeline' in response.url:
try:
data = await response.json()
entries = (data.get('data', {})
.get('search_by_raw_query', {})
.get('search_timeline', {})
.get('timeline', {})
.get('instructions', [{}])[0]
.get('entries', []))
for entry in entries:
tweet = self._parse_tweet_entry(entry)
if tweet:
tweets.append(tweet)
except Exception:
pass
page.on('response', capture_tweets)
search_url = f'https://x.com/search?q={query}&src=typed_query&f=live'
await page.goto(search_url, wait_until='networkidle')
# Scroll to load more
for _ in range(5):
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
await page.wait_for_timeout(2000)
if len(tweets) >= max_tweets:
break
await browser.close()
return tweets[:max_tweets]
Pro tip: For reliable Twitter data at scale, using a residential proxy service like ThorData significantly reduces blocks. Their rotating residential IPs mimic real user traffic patterns.
TikTok
TikTok data collection focuses on public video metadata, hashtag trends, and user profiles:
import httpx
class TikTokCollector:
def search_hashtag(self, tag: str, count: int = 30) -> list[dict]:
"""Collect videos by hashtag using TikTok's web API."""
# TikTok's internal API requires specific headers and signatures
# For production use, a managed solution is more reliable
pass # See managed solution below
TikTok's anti-bot measures are among the strongest. For production use, the TikTok Scraper on Apify handles signature generation, CAPTCHA solving, and session management automatically.
Layer 2: Unified Storage
All collectors output the same schema. Store everything in one place:
import sqlite3
from datetime import datetime
from contextlib import contextmanager
class SocialDataStore:
def __init__(self, db_path: str = 'social_data.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
with self._connect() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS posts (
id TEXT PRIMARY KEY,
platform TEXT NOT NULL,
author TEXT,
text TEXT,
title TEXT,
created_at TEXT,
likes INTEGER DEFAULT 0,
reposts INTEGER DEFAULT 0,
replies INTEGER DEFAULT 0,
score INTEGER DEFAULT 0,
url TEXT,
metadata TEXT,
collected_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_platform_date
ON posts(platform, created_at)
''')
@contextmanager
def _connect(self):
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
finally:
conn.close()
def insert_posts(self, posts: list[dict]) -> int:
"""Insert posts, skip duplicates."""
inserted = 0
with self._connect() as conn:
for post in posts:
try:
conn.execute(
'''INSERT OR IGNORE INTO posts
(id, platform, author, text, title, created_at,
likes, reposts, replies, score, url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(post['id'], post['platform'], post.get('author'),
post.get('text'), post.get('title'),
post.get('created_at'), post.get('likes', 0),
post.get('reposts', 0), post.get('replies', 0),
post.get('score', 0), post.get('url'))
)
inserted += 1
except sqlite3.IntegrityError:
continue
return inserted
def query(self, platform: str = None, since: str = None,
search: str = None) -> list[dict]:
"""Flexible querying."""
sql = 'SELECT * FROM posts WHERE 1=1'
params = []
if platform:
sql += ' AND platform = ?'
params.append(platform)
if since:
sql += ' AND created_at >= ?'
params.append(since)
if search:
sql += ' AND (text LIKE ? OR title LIKE ?)'
params.extend([f'%{search}%', f'%{search}%'])
sql += ' ORDER BY created_at DESC'
with self._connect() as conn:
conn.row_factory = sqlite3.Row
return [dict(row) for row in conn.execute(sql, params).fetchall()]
For larger datasets, swap SQLite for PostgreSQL or write directly to Parquet files:
import pandas as pd
def export_to_parquet(store: SocialDataStore, output_path: str):
posts = store.query()
df = pd.DataFrame(posts)
df['created_at'] = pd.to_datetime(df['created_at'])
df.to_parquet(output_path, index=False, engine='pyarrow')
Layer 3: Processing & Analysis
Sentiment Analysis
from transformers import pipeline
class SentimentAnalyzer:
def __init__(self):
self.classifier = pipeline(
'sentiment-analysis',
model='cardiffnlp/twitter-roberta-base-sentiment-latest',
device='cpu'
)
def analyze_batch(self, texts: list[str]) -> list[dict]:
"""Batch sentiment analysis."""
results = self.classifier(texts, batch_size=32, truncation=True)
return [{
'label': r['label'],
'score': round(r['score'], 4)
} for r in results]
# Usage
analyzer = SentimentAnalyzer()
store = SocialDataStore()
posts = store.query(platform='bluesky', search='python')
texts = [p['text'] for p in posts if p['text']]
sentiments = analyzer.analyze_batch(texts)
for post, sentiment in zip(posts, sentiments):
print(f"{sentiment['label']} ({sentiment['score']}) | {post['text'][:80]}")
Trend Detection
from collections import Counter
import re
def extract_trending_topics(posts: list[dict], min_count: int = 3) -> list[tuple]:
"""Find trending hashtags and keywords."""
hashtags = Counter()
keywords = Counter()
stop_words = {'the', 'is', 'at', 'which', 'on', 'a', 'an', 'and', 'or', 'but',
'in', 'with', 'to', 'for', 'of', 'not', 'no', 'can', 'had', 'have',
'was', 'were', 'this', 'that', 'it', 'from', 'as', 'be', 'by', 'are'}
for post in posts:
text = post.get('text', '')
# Extract hashtags
for tag in re.findall(r'#(\w+)', text):
hashtags[tag.lower()] += 1
# Extract significant words
words = re.findall(r'\b[a-z]{4,}\b', text.lower())
for word in words:
if word not in stop_words:
keywords[word] += 1
trending = [(tag, count) for tag, count in hashtags.most_common(20)
if count >= min_count]
return trending
Layer 4: Visualization & Output
import matplotlib.pyplot as plt
import pandas as pd
def plot_platform_activity(store: SocialDataStore, days: int = 7):
"""Visualize posting activity across platforms."""
posts = store.query(since=(datetime.now() - timedelta(days=days)).isoformat())
df = pd.DataFrame(posts)
df['created_at'] = pd.to_datetime(df['created_at'])
df['date'] = df['created_at'].dt.date
activity = df.groupby(['date', 'platform']).size().unstack(fill_value=0)
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# Activity over time
activity.plot(kind='bar', stacked=True, ax=axes[0],
color=['#0085FF', '#FF4500', '#1DA1F2', '#ff0050'])
axes[0].set_title('Posts Collected by Platform')
axes[0].set_ylabel('Post Count')
# Engagement comparison
engagement = df.groupby('platform').agg({
'likes': 'mean', 'replies': 'mean', 'reposts': 'mean'
}).round(1)
engagement.plot(kind='barh', ax=axes[1])
axes[1].set_title('Average Engagement by Platform')
plt.tight_layout()
plt.savefig('pipeline_report.png', dpi=150)
print('Report saved to pipeline_report.png')
Putting It All Together
import asyncio
from datetime import datetime
async def run_pipeline(query: str):
"""Run the complete pipeline."""
store = SocialDataStore()
# Collect from all platforms
bluesky = BlueskyCollector()
reddit = RedditCollector()
print(f"[{datetime.now()}] Starting collection for: {query}")
# Bluesky + Reddit (synchronous collectors)
bsky_posts = bluesky.search_posts(query, limit=100)
reddit_posts = reddit.search_subreddit('all', query, limit=100)
# Store results
total = store.insert_posts(bsky_posts + reddit_posts)
print(f"Collected and stored {total} new posts")
# Analyze
all_posts = store.query(search=query)
trends = extract_trending_topics(all_posts)
print(f"\nTrending topics:")
for tag, count in trends[:10]:
print(f" #{tag}: {count} mentions")
# Visualize
plot_platform_activity(store)
if __name__ == '__main__':
asyncio.run(run_pipeline('artificial intelligence'))
Production Considerations
Proxy Rotation
Scraping multiple platforms at scale requires rotating proxies. Without them, you'll get blocked within hours.
ScraperAPI provides a simple API wrapper that handles proxy rotation, CAPTCHAs, and retries:
import httpx
SCRAPERAPI_KEY = 'your_key'
def fetch_with_proxy(url: str) -> str:
proxy_url = f'http://api.scraperapi.com?api_key={SCRAPERAPI_KEY}&url={url}'
resp = httpx.get(proxy_url, timeout=30)
return resp.text
For raw residential proxies with more control, ThorData offers rotating residential IPs across 190+ countries.
Scheduling
# crontab: Run pipeline every 6 hours
# 0 */6 * * * cd /path/to/pipeline && python3 run_pipeline.py >> logs/pipeline.log 2>&1
Managed Scraping
If you don't want to maintain scraping infrastructure, managed actors handle everything:
- Bluesky Scraper — Open AT Protocol, no auth needed
- Reddit Scraper — Handles rate limits and pagination
- TikTok Scraper — Signature generation and CAPTCHA solving
These export directly to JSON, CSV, or connect to your storage via webhooks.
What's Next
This pipeline is a starting point. From here you can:
- Add real-time streaming — Use WebSockets for Bluesky's firehose
- Build alerting — Notify when sentiment drops or a topic spikes
- Train custom models — Fine-tune classifiers on your domain data
- Create dashboards — Connect to Grafana or build a Streamlit app
The modular architecture means each improvement is isolated. Add platforms, swap models, change storage — the pipeline keeps running.
What platforms are you collecting data from? Share your pipeline architecture in the comments.
Top comments (0)