DEV Community

Cover image for Build a Social Media Data Pipeline That Actually Scales
Olamide Olaniyan
Olamide Olaniyan

Posted on

Build a Social Media Data Pipeline That Actually Scales

Your scraper runs fine for 100 profiles. At 10,000 it crashes. At 100,000 it's a dumpster fire.

You've got timeouts. Duplicate records. Missing data. A Postgres database that takes 30 seconds to query. And a cron job that silently failed three days ago — nobody noticed until a client complained.

I've built data pipelines that process millions of social media records daily. The architecture isn't complex. But it's very different from "fetch in a loop and save to DB."

Here's the exact pipeline I use.

The Stack

  • Node.js – orchestration
  • SociaVault API – social media data source
  • BullMQ + Redis – job queue
  • PostgreSQL – storage
  • Cron – scheduling

The Problem With "Fetch and Save"

Here's what most people start with:

// ❌ This doesn't scale
for (const username of usernames) {
  const profile = await fetchProfile(username);
  await db.insert('profiles', profile);
}
Enter fullscreen mode Exit fullscreen mode

Why this breaks:

  1. One failure kills everything — if request #5,001 fails, you lose your place
  2. No parallelism — sequential = slow
  3. No retry logic — transient errors become permanent failures
  4. No deduplication — run it twice, get duplicate records
  5. No monitoring — you don't know it's broken until someone tells you

The Pipeline Architecture

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│   Scheduler  │────▶│  Job Queue   │────▶│   Workers    │
│   (Cron)     │     │  (BullMQ)    │     │  (Fetch +    │
│              │     │              │     │  Transform)  │
└──────────────┘     └──────────────┘     └──────┬───────┘
                                                  │
                                           ┌──────▼───────┐
                                           │   Database    │
                                           │ (PostgreSQL)  │
                                           └──────────────┘
Enter fullscreen mode Exit fullscreen mode

Four stages: Schedule → Queue → Process → Store

Step 1: The API Client

const axios = require('axios');

const API_BASE = 'https://api.sociavault.com/v1';
const API_KEY = process.env.SOCIAVAULT_API_KEY;

const api = axios.create({
  baseURL: API_BASE,
  headers: { 'x-api-key': API_KEY },
  timeout: 15000,
});

// Wrapper with automatic retry
async function fetchWithRetry(url, params = {}, retries = 3) {
  for (let attempt = 1; attempt <= retries; attempt++) {
    try {
      const { data } = await api.get(url, { params });
      return data;
    } catch (err) {
      if (attempt === retries) throw err;

      const isRetryable = err.response?.status === 429 || err.response?.status >= 500;
      if (!isRetryable) throw err;

      const delay = Math.pow(2, attempt) * 1000;
      console.log(`Retry ${attempt}/${retries} for ${url} in ${delay}ms`);
      await new Promise(r => setTimeout(r, delay));
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: The Job Queue

BullMQ gives you retries, concurrency, rate limiting, and dead letter queues — for free.

const { Queue, Worker } = require('bullmq');
const Redis = require('ioredis');

const connection = new Redis(process.env.REDIS_URL);

// Create queues for each stage
const profileQueue = new Queue('fetch-profiles', { connection });
const postQueue = new Queue('fetch-posts', { connection });
const transformQueue = new Queue('transform-data', { connection });

// Add jobs in bulk
async function enqueueProfiles(usernames, platform) {
  const jobs = usernames.map(username => ({
    name: `profile:${platform}:${username}`,
    data: { username, platform },
    opts: {
      attempts: 3,
      backoff: { type: 'exponential', delay: 2000 },
      removeOnComplete: 1000, // Keep last 1000 completed jobs
      removeOnFail: 5000,     // Keep last 5000 failed jobs
    },
  }));

  await profileQueue.addBulk(jobs);
  console.log(`Enqueued ${jobs.length} profile fetch jobs`);
}
Enter fullscreen mode Exit fullscreen mode

Step 3: The Workers

Workers pull jobs from the queue, fetch data, and push results downstream.

// Profile fetch worker
const profileWorker = new Worker('fetch-profiles', async (job) => {
  const { username, platform } = job.data;

  // Fetch profile data from SociaVault API
  const profile = await fetchWithRetry(`/${platform}/profile/${username}`);

  // Save to database
  await upsertProfile(platform, profile);

  // Enqueue post fetching as the next step
  await postQueue.add(`posts:${platform}:${username}`, {
    username,
    platform,
    followerCount: profile.followerCount,
  });

  return { username, followers: profile.followerCount };
}, {
  connection,
  concurrency: 10,       // 10 parallel jobs
  limiter: {
    max: 20,             // Max 20 jobs
    duration: 1000,      // Per second
  },
});

// Post fetch worker
const postWorker = new Worker('fetch-posts', async (job) => {
  const { username, platform } = job.data;

  const posts = await fetchWithRetry(`/${platform}/posts/${username}`, { limit: 50 });

  // Transform and save each post
  for (const post of posts) {
    await upsertPost(platform, username, post);
  }

  return { username, postCount: posts.length };
}, {
  connection,
  concurrency: 5,
  limiter: {
    max: 10,
    duration: 1000,
  },
});
Enter fullscreen mode Exit fullscreen mode

Step 4: Database Upserts (Not Inserts)

Always use upserts. Running the pipeline twice should produce the same result.

const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL });

async function upsertProfile(platform, profile) {
  await pool.query(`
    INSERT INTO profiles (platform, username, follower_count, following_count, 
                          post_count, biography, profile_pic_url, updated_at)
    VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
    ON CONFLICT (platform, username)
    DO UPDATE SET
      follower_count = EXCLUDED.follower_count,
      following_count = EXCLUDED.following_count,
      post_count = EXCLUDED.post_count,
      biography = EXCLUDED.biography,
      profile_pic_url = EXCLUDED.profile_pic_url,
      updated_at = NOW()
  `, [
    platform,
    profile.username,
    profile.followerCount,
    profile.followingCount,
    profile.postCount,
    profile.biography,
    profile.profilePicUrl,
  ]);
}

async function upsertPost(platform, username, post) {
  await pool.query(`
    INSERT INTO posts (platform, post_id, username, caption, like_count, 
                       comment_count, share_count, created_at, updated_at)
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
    ON CONFLICT (platform, post_id)
    DO UPDATE SET
      like_count = EXCLUDED.like_count,
      comment_count = EXCLUDED.comment_count,
      share_count = EXCLUDED.share_count,
      updated_at = NOW()
  `, [
    platform,
    post.id,
    username,
    post.caption,
    post.likeCount,
    post.commentCount,
    post.shareCount,
    new Date(post.createdAt),
  ]);
}
Enter fullscreen mode Exit fullscreen mode

Why upserts matter: If a job fails halfway and retries, you don't get duplicates. The pipeline is idempotent.

Step 5: The Scheduler

const cron = require('node-cron');

// Refresh all tracked profiles daily at 2 AM
cron.schedule('0 2 * * *', async () => {
  console.log('Starting daily profile refresh...');

  const { rows } = await pool.query(
    'SELECT platform, username FROM tracked_profiles WHERE active = true'
  );

  console.log(`Refreshing ${rows.length} profiles`);

  const byPlatform = {};
  for (const row of rows) {
    if (!byPlatform[row.platform]) byPlatform[row.platform] = [];
    byPlatform[row.platform].push(row.username);
  }

  for (const [platform, usernames] of Object.entries(byPlatform)) {
    await enqueueProfiles(usernames, platform);
  }
});

// Fetch posts for high-priority profiles every 6 hours
cron.schedule('0 */6 * * *', async () => {
  const { rows } = await pool.query(
    'SELECT platform, username FROM tracked_profiles WHERE priority = $1 AND active = true',
    ['high']
  );

  for (const row of rows) {
    await postQueue.add(`posts:${row.platform}:${row.username}`, {
      username: row.username,
      platform: row.platform,
    });
  }
});
Enter fullscreen mode Exit fullscreen mode

Step 6: Monitoring

A pipeline you don't monitor is a pipeline that's broken and you don't know it.

// Event listeners for monitoring
profileWorker.on('completed', (job, result) => {
  console.log(`✅ ${job.name}${result.followers} followers`);
});

profileWorker.on('failed', (job, err) => {
  console.error(`❌ ${job.name}${err.message}`);
});

// Health check endpoint
const express = require('express');
const app = express();

app.get('/health', async (req, res) => {
  const profileWaiting = await profileQueue.getWaitingCount();
  const profileActive = await profileQueue.getActiveCount();
  const profileFailed = await profileQueue.getFailedCount();

  const postWaiting = await postQueue.getWaitingCount();
  const postActive = await postQueue.getActiveCount();
  const postFailed = await postQueue.getFailedCount();

  res.json({
    status: 'ok',
    queues: {
      profiles: { waiting: profileWaiting, active: profileActive, failed: profileFailed },
      posts: { waiting: postWaiting, active: postActive, failed: postFailed },
    },
  });
});

app.listen(3001);
Enter fullscreen mode Exit fullscreen mode

Performance Numbers

Running this pipeline on a $20/month VPS:

Metric "Fetch and Save" Loop Pipeline
Profiles/hour ~500 ~15,000
Error recovery Manual restart Automatic retry
Duplicate handling None Upsert
Monitoring None Health endpoint
Failure visibility "It stopped working" Exact job + error

30x throughput. Plus it fixes itself when things go wrong.

Read the Full Guide

This is a condensed version. The full guide includes:

  • Historical data tracking and change detection
  • Webhook notifications for data changes
  • Multi-region deployment for lower latency
  • Cost optimization strategies

Read the complete guide on SociaVault →


Want the data without building the pipeline? SociaVault provides ready-to-use social media data APIs for TikTok, Instagram, YouTube, and 10+ platforms. Skip the infrastructure. Just fetch the data.

Discussion

What's the most painful part of your data pipeline? Rate limits? Data quality? Monitoring? Let's compare war stories 👇

webdev #api #nodejs #database #backend

Top comments (0)