DEV Community

Cover image for Building a Real-Time Data Pipeline from Shopify to Meta's Marketing API
Alex Toska
Alex Toska

Posted on

Building a Real-Time Data Pipeline from Shopify to Meta's Marketing API

Building a Real-Time Data Pipeline from Shopify to Meta’s Marketing API

I spent the last few months building Audience+ — a tool that syncs Shopify customer data to Meta’s advertising platform in real time.

Below is a clear, accessible technical breakdown of how it works, the challenges we solved, and concrete code patterns that may help if you’re building something similar.


The Problem We’re Solving

Meta’s browser-based tracking is fundamentally broken.

  • iOS 14.5 App Tracking Transparency hides ~75% of iPhone users
  • The Meta Pixel only retains data for 180 days
  • Meta optimizes on 30–40% of real conversion data for most stores

The solution: send first-party customer and purchase data directly from Shopify to Meta using server-side APIs.


Architecture Overview

┌───────────────┐   ┌───────────────┐   ┌───────────────┐
│ Shopify Store │──▶│  Audience+ API│──▶│   Meta API    │
│  (Webhooks)   │   │ (Processing)  │   │               │
└───────────────┘   └───────────────┘   └───────────────┘
                         │
                         ▼
                  ┌───────────────┐
                  │  PostgreSQL   │
                  │ (Customer DB) │
                  └───────────────┘
Enter fullscreen mode Exit fullscreen mode

Tech Stack

  • Framework: Next.js 15 (App Router)
  • Language: TypeScript
  • API Layer: tRPC
  • Database: PostgreSQL (Neon serverless)
  • ORM: Prisma
  • Authentication: Better-Auth + Shopify OAuth
  • Hosting: Vercel

Shopify Webhook Integration

Shopify sends webhooks for customer and order lifecycle events.

We verify each request using HMAC signatures before processing to ensure authenticity.

// app/api/webhooks/shopify/route.ts
import { NextRequest, NextResponse } from 'next/server';
import crypto from 'crypto';

export async function POST(req: NextRequest) {
  const body = await req.text();
  const hmac = req.headers.get('x-shopify-hmac-sha256');

  if (!verifyShopifyWebhook(body, hmac)) {
    return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
  }

  const topic = req.headers.get('x-shopify-topic');
  const payload = JSON.parse(body);

  switch (topic) {
    case 'orders/create':
      await handleNewOrder(payload);
      break;
    case 'customers/create':
      await handleNewCustomer(payload);
      break;
    case 'customers/update':
      await handleCustomerUpdate(payload);
      break;
  }

  return NextResponse.json({ received: true });
}

function verifyShopifyWebhook(body: string, hmac: string | null): boolean {
  if (!hmac) return false;

  const hash = crypto
    .createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET!)
    .update(body, 'utf8')
    .digest('base64');

  return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(hmac));
}
Enter fullscreen mode Exit fullscreen mode

Customer Data Normalization

Customer data is normalized and hashed to meet Meta’s requirements
(lowercase, trimmed, SHA-256).

// lib/customer-processor.ts
interface ShopifyCustomer {
  id: number;
  email: string;
  phone?: string;
  first_name?: string;
  last_name?: string;
  orders_count: number;
  total_spent: string;
  created_at: string;
}

interface MetaUserData {
  em?: string;
  ph?: string;
  fn?: string;
  ln?: string;
  external_id?: string;
}

function processCustomerForMeta(customer: ShopifyCustomer): MetaUserData {
  const userData: MetaUserData = {};

  if (customer.email) {
    userData.em = hashForMeta(customer.email.toLowerCase().trim());
  }

  if (customer.phone) {
    userData.ph = hashForMeta(normalizePhone(customer.phone));
  }

  if (customer.first_name) {
    userData.fn = hashForMeta(customer.first_name.toLowerCase().trim());
  }

  if (customer.last_name) {
    userData.ln = hashForMeta(customer.last_name.toLowerCase().trim());
  }

  userData.external_id = hashForMeta(customer.id.toString());

  return userData;
}

function hashForMeta(value: string): string {
  return crypto.createHash('sha256').update(value).digest('hex');
}
Enter fullscreen mode Exit fullscreen mode

Meta Marketing API Integration

We integrate with two Meta APIs:

  1. Custom Audiences API — for syncing customer lists
  2. Conversions API — for real-time server-side events

Custom Audience Sync

// lib/meta-audience-sync.ts
const META_API_VERSION = 'v18.0';

async function addUsersToAudience(
  audienceId: string,
  users: AudienceUser[],
  accessToken: string
): Promise<void> {
  const BATCH_SIZE = 10_000;
  const batches = chunk(users, BATCH_SIZE);

  for (const batch of batches) {
    const payload = {
      schema: ['EMAIL', 'PHONE', 'FN', 'LN'],
      data: batch.map(user => [
        user.email ? hashForMeta(user.email.toLowerCase()) : '',
        user.phone ? hashForMeta(normalizePhone(user.phone)) : '',
        user.firstName ? hashForMeta(user.firstName.toLowerCase()) : '',
        user.lastName ? hashForMeta(user.lastName.toLowerCase()) : '',
      ]),
    };

    const response = await fetch(
      `https://graph.facebook.com/${META_API_VERSION}/${audienceId}/users`,
      {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ payload, access_token: accessToken }),
      }
    );

    if (!response.ok) {
      throw new Error(`Meta API error`);
    }

    await sleep(1000);
  }
}
Enter fullscreen mode Exit fullscreen mode

Sending Purchase Events (Conversions API)

// lib/meta-conversions.ts
async function sendPurchaseEvent(
  pixelId: string,
  customer: ProcessedCustomer,
  order: ShopifyOrder,
  accessToken: string
) {
  const event = {
    event_name: 'Purchase',
    event_time: Math.floor(new Date(order.created_at).getTime() / 1000),
    event_id: `order_${order.id}`,
    action_source: 'website',
    user_data: {
      em: customer.hashedEmail,
      ph: customer.hashedPhone,
      client_ip_address: order.client_details?.browser_ip,
      client_user_agent: order.client_details?.user_agent,
    },
    custom_data: {
      value: parseFloat(order.total_price),
      currency: order.currency,
      content_ids: order.line_items.map(i => i.product_id.toString()),
      content_type: 'product',
      num_items: order.line_items.reduce((s, i) => s + i.quantity, 0),
    },
  };

  await fetch(
    `https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`,
    {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ data: [event], access_token: accessToken }),
    }
  );
}
Enter fullscreen mode Exit fullscreen mode

Audience Segmentation

Customers are automatically classified into segments that stay in sync with Meta.

enum CustomerSegment {
  NEW = 'new',
  ENGAGED = 'engaged',
  EXISTING = 'existing',
}

function classifyCustomer(customer: CustomerWithOrders): CustomerSegment {
  if (customer.orders_count === 0) return CustomerSegment.ENGAGED;
  if (customer.orders_count >= 1) return CustomerSegment.EXISTING;
  return CustomerSegment.NEW;
}
Enter fullscreen mode Exit fullscreen mode

Key Challenges & Solutions

Rate Limiting

Meta enforces strict limits. We use exponential backoff retries.

async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries = 3
): Promise<T> {
  let lastError: Error;

  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error as Error;
      await sleep(2 ** i * 1000);
    }
  }

  throw lastError!;
}
Enter fullscreen mode Exit fullscreen mode

Webhook Idempotency

Prevents duplicate processing from retries or out-of-order delivery.

async function handleWebhookIdempotently(
  webhookId: string,
  handler: () => Promise<void>
) {
  const existing = await prisma.processedWebhook.findUnique({
    where: { id: webhookId },
  });

  if (existing) return;

  await handler();

  await prisma.processedWebhook.create({
    data: { id: webhookId, processedAt: new Date() },
  });
}
Enter fullscreen mode Exit fullscreen mode

Results

Stores using Audience+ typically see:

  • 50–100% more conversions visible to Meta
  • 10–20% ROAS improvement
  • Correct exclusions and retargeting for the first time

What I’d Do Differently

  1. Start with the Conversions API first
  2. Add monitoring earlier
  3. Use queues from day one instead of synchronous processing

Try It Out

If you want this without building it yourself, check out
👉 https://www.audience-plus.com

10-minute setup. Fully automated.


Have questions about the implementation? Drop them in the comments.

Top comments (0)