Your scraper runs fine for 100 profiles. At 10,000 it crashes. At 100,000 it's a dumpster fire.
You've got timeouts. Duplicate records. Missing data. A Postgres database that takes 30 seconds to query. And a cron job that silently failed three days ago — nobody noticed until a client complained.
I've built data pipelines that process millions of social media records daily. The architecture isn't complex. But it's very different from "fetch in a loop and save to DB."
Here's the exact pipeline I use.
The Stack
- Node.js – orchestration
- SociaVault API – social media data source
- BullMQ + Redis – job queue
- PostgreSQL – storage
- Cron – scheduling
The Problem With "Fetch and Save"
Here's what most people start with:
// ❌ This doesn't scale
for (const username of usernames) {
const profile = await fetchProfile(username);
await db.insert('profiles', profile);
}
Why this breaks:
- One failure kills everything — if request #5,001 fails, you lose your place
- No parallelism — sequential = slow
- No retry logic — transient errors become permanent failures
- No deduplication — run it twice, get duplicate records
- No monitoring — you don't know it's broken until someone tells you
The Pipeline Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Scheduler │────▶│ Job Queue │────▶│ Workers │
│ (Cron) │ │ (BullMQ) │ │ (Fetch + │
│ │ │ │ │ Transform) │
└──────────────┘ └──────────────┘ └──────┬───────┘
│
┌──────▼───────┐
│ Database │
│ (PostgreSQL) │
└──────────────┘
Four stages: Schedule → Queue → Process → Store
Step 1: The API Client
const axios = require('axios');
const API_BASE = 'https://api.sociavault.com/v1';
const API_KEY = process.env.SOCIAVAULT_API_KEY;
const api = axios.create({
baseURL: API_BASE,
headers: { 'x-api-key': API_KEY },
timeout: 15000,
});
// Wrapper with automatic retry
async function fetchWithRetry(url, params = {}, retries = 3) {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
const { data } = await api.get(url, { params });
return data;
} catch (err) {
if (attempt === retries) throw err;
const isRetryable = err.response?.status === 429 || err.response?.status >= 500;
if (!isRetryable) throw err;
const delay = Math.pow(2, attempt) * 1000;
console.log(`Retry ${attempt}/${retries} for ${url} in ${delay}ms`);
await new Promise(r => setTimeout(r, delay));
}
}
}
Step 2: The Job Queue
BullMQ gives you retries, concurrency, rate limiting, and dead letter queues — for free.
const { Queue, Worker } = require('bullmq');
const Redis = require('ioredis');
const connection = new Redis(process.env.REDIS_URL);
// Create queues for each stage
const profileQueue = new Queue('fetch-profiles', { connection });
const postQueue = new Queue('fetch-posts', { connection });
const transformQueue = new Queue('transform-data', { connection });
// Add jobs in bulk
async function enqueueProfiles(usernames, platform) {
const jobs = usernames.map(username => ({
name: `profile:${platform}:${username}`,
data: { username, platform },
opts: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000, // Keep last 1000 completed jobs
removeOnFail: 5000, // Keep last 5000 failed jobs
},
}));
await profileQueue.addBulk(jobs);
console.log(`Enqueued ${jobs.length} profile fetch jobs`);
}
Step 3: The Workers
Workers pull jobs from the queue, fetch data, and push results downstream.
// Profile fetch worker
const profileWorker = new Worker('fetch-profiles', async (job) => {
const { username, platform } = job.data;
// Fetch profile data from SociaVault API
const profile = await fetchWithRetry(`/${platform}/profile/${username}`);
// Save to database
await upsertProfile(platform, profile);
// Enqueue post fetching as the next step
await postQueue.add(`posts:${platform}:${username}`, {
username,
platform,
followerCount: profile.followerCount,
});
return { username, followers: profile.followerCount };
}, {
connection,
concurrency: 10, // 10 parallel jobs
limiter: {
max: 20, // Max 20 jobs
duration: 1000, // Per second
},
});
// Post fetch worker
const postWorker = new Worker('fetch-posts', async (job) => {
const { username, platform } = job.data;
const posts = await fetchWithRetry(`/${platform}/posts/${username}`, { limit: 50 });
// Transform and save each post
for (const post of posts) {
await upsertPost(platform, username, post);
}
return { username, postCount: posts.length };
}, {
connection,
concurrency: 5,
limiter: {
max: 10,
duration: 1000,
},
});
Step 4: Database Upserts (Not Inserts)
Always use upserts. Running the pipeline twice should produce the same result.
const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
async function upsertProfile(platform, profile) {
await pool.query(`
INSERT INTO profiles (platform, username, follower_count, following_count,
post_count, biography, profile_pic_url, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT (platform, username)
DO UPDATE SET
follower_count = EXCLUDED.follower_count,
following_count = EXCLUDED.following_count,
post_count = EXCLUDED.post_count,
biography = EXCLUDED.biography,
profile_pic_url = EXCLUDED.profile_pic_url,
updated_at = NOW()
`, [
platform,
profile.username,
profile.followerCount,
profile.followingCount,
profile.postCount,
profile.biography,
profile.profilePicUrl,
]);
}
async function upsertPost(platform, username, post) {
await pool.query(`
INSERT INTO posts (platform, post_id, username, caption, like_count,
comment_count, share_count, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
ON CONFLICT (platform, post_id)
DO UPDATE SET
like_count = EXCLUDED.like_count,
comment_count = EXCLUDED.comment_count,
share_count = EXCLUDED.share_count,
updated_at = NOW()
`, [
platform,
post.id,
username,
post.caption,
post.likeCount,
post.commentCount,
post.shareCount,
new Date(post.createdAt),
]);
}
Why upserts matter: If a job fails halfway and retries, you don't get duplicates. The pipeline is idempotent.
Step 5: The Scheduler
const cron = require('node-cron');
// Refresh all tracked profiles daily at 2 AM
cron.schedule('0 2 * * *', async () => {
console.log('Starting daily profile refresh...');
const { rows } = await pool.query(
'SELECT platform, username FROM tracked_profiles WHERE active = true'
);
console.log(`Refreshing ${rows.length} profiles`);
const byPlatform = {};
for (const row of rows) {
if (!byPlatform[row.platform]) byPlatform[row.platform] = [];
byPlatform[row.platform].push(row.username);
}
for (const [platform, usernames] of Object.entries(byPlatform)) {
await enqueueProfiles(usernames, platform);
}
});
// Fetch posts for high-priority profiles every 6 hours
cron.schedule('0 */6 * * *', async () => {
const { rows } = await pool.query(
'SELECT platform, username FROM tracked_profiles WHERE priority = $1 AND active = true',
['high']
);
for (const row of rows) {
await postQueue.add(`posts:${row.platform}:${row.username}`, {
username: row.username,
platform: row.platform,
});
}
});
Step 6: Monitoring
A pipeline you don't monitor is a pipeline that's broken and you don't know it.
// Event listeners for monitoring
profileWorker.on('completed', (job, result) => {
console.log(`✅ ${job.name} — ${result.followers} followers`);
});
profileWorker.on('failed', (job, err) => {
console.error(`❌ ${job.name} — ${err.message}`);
});
// Health check endpoint
const express = require('express');
const app = express();
app.get('/health', async (req, res) => {
const profileWaiting = await profileQueue.getWaitingCount();
const profileActive = await profileQueue.getActiveCount();
const profileFailed = await profileQueue.getFailedCount();
const postWaiting = await postQueue.getWaitingCount();
const postActive = await postQueue.getActiveCount();
const postFailed = await postQueue.getFailedCount();
res.json({
status: 'ok',
queues: {
profiles: { waiting: profileWaiting, active: profileActive, failed: profileFailed },
posts: { waiting: postWaiting, active: postActive, failed: postFailed },
},
});
});
app.listen(3001);
Performance Numbers
Running this pipeline on a $20/month VPS:
| Metric | "Fetch and Save" Loop | Pipeline |
|---|---|---|
| Profiles/hour | ~500 | ~15,000 |
| Error recovery | Manual restart | Automatic retry |
| Duplicate handling | None | Upsert |
| Monitoring | None | Health endpoint |
| Failure visibility | "It stopped working" | Exact job + error |
30x throughput. Plus it fixes itself when things go wrong.
Read the Full Guide
This is a condensed version. The full guide includes:
- Historical data tracking and change detection
- Webhook notifications for data changes
- Multi-region deployment for lower latency
- Cost optimization strategies
Read the complete guide on SociaVault →
Want the data without building the pipeline? SociaVault provides ready-to-use social media data APIs for TikTok, Instagram, YouTube, and 10+ platforms. Skip the infrastructure. Just fetch the data.
Discussion
What's the most painful part of your data pipeline? Rate limits? Data quality? Monitoring? Let's compare war stories 👇
Top comments (0)