DEV Community

Cover image for SellerSprite Alternative: Building a Cost-Effective Amazon Data Pipeline with Pangolinfo API
Mox Loop
Mox Loop

Posted on

SellerSprite Alternative: Building a Cost-Effective Amazon Data Pipeline with Pangolinfo API

TL;DR

Migrated from SellerSprite ($150K/year) to Pangolinfo API ($70K/year) for our e-commerce data needs. Built a custom data pipeline with Python, PostgreSQL, and Redis. Result: 53% cost reduction + complete data ownership + 3x faster insights.

Tech Stack: Python 3.10, FastAPI, PostgreSQL, Redis, Apache Airflow, Docker

Time to Implement: 12 weeks

Team Size: 2 backend engineers

The Problem: SaaS Tools Don't Scale

When you're managing 300+ Amazon SKUs and need to integrate e-commerce data with your ERP, CRM, and BI systems, traditional SaaS tools like SellerSprite hit hard limits:

  1. Cost Explosion: $30K/year per license × 5 users = $150K
  2. Data Silos: Export limits force manual copy-paste workflows
  3. No Customization: Can't build proprietary analytics or ML models
  4. Vendor Lock-in: Pricing increases, feature changes, service disruptions

The Solution: API-First Data Infrastructure

After evaluating several SellerSprite alternatives, we chose Pangolinfo API for its:

  • Pay-per-call pricing: Only pay for what you use
  • Complete data access: JSON/HTML/Markdown formats
  • High accuracy: 98% success rate on SP ad placements
  • Massive scale: Millions of pages per day
  • Multi-platform: Amazon, Walmart, Shopify, Shopee, eBay

Architecture Overview

┌─────────────┐      ┌──────────────┐      ┌─────────────┐
│   Airflow   │─────▶│  API Client  │─────▶│   Redis     │
│  Scheduler  │      │   (Python)   │      │   (Queue)   │
└─────────────┘      └──────────────┘      └─────────────┘
                            │                      │
                            ▼                      ▼
                     ┌──────────────┐      ┌─────────────┐
                     │ Rate Limiter │      │  Workers    │
                     └──────────────┘      └─────────────┘
                                                  │
                                                  ▼
                                           ┌─────────────┐
                                           │ PostgreSQL  │
                                           └─────────────┘
Enter fullscreen mode Exit fullscreen mode

Implementation: Step-by-Step

1. API Client Setup

import requests
from typing import Optional, Dict
import time
from functools import wraps

class PangolinfoClient:
    """Pangolinfo API client with retry logic and rate limiting"""

    def __init__(self, api_key: str, api_secret: str):
        self.api_key = api_key
        self.api_secret = api_secret
        self.base_url = "https://api.pangolinfo.com/v1"
        self.session = requests.Session()

    def _retry_with_backoff(max_retries=3):
        """Exponential backoff retry decorator"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                for attempt in range(max_retries):
                    try:
                        return func(*args, **kwargs)
                    except requests.exceptions.RequestException as e:
                        if attempt == max_retries - 1:
                            raise
                        wait_time = 2 ** attempt
                        print(f"Retry {attempt + 1}/{max_retries}, waiting {wait_time}s...")
                        time.sleep(wait_time)
            return wrapper
        return decorator

    @_retry_with_backoff(max_retries=3)
    def get_product(self, asin: str, marketplace: str = "US") -> Optional[Dict]:
        """Fetch Amazon product details"""
        endpoint = f"{self.base_url}/amazon/product"

        headers = {
            "X-API-Key": self.api_key,
            "X-API-Secret": self.api_secret,
            "Content-Type": "application/json"
        }

        payload = {
            "asin": asin,
            "marketplace": marketplace,
            "format": "json",
            "fields": [
                "title", "price", "rating", "reviews_count",
                "availability", "images", "variants"
            ]
        }

        response = self.session.post(
            endpoint,
            headers=headers,
            json=payload,
            timeout=30
        )
        response.raise_for_status()
        return response.json()

# Usage
client = PangolinfoClient(
    api_key=os.getenv("PANGOLINFO_API_KEY"),
    api_secret=os.getenv("PANGOLINFO_API_SECRET")
)

product = client.get_product("B08N5WRWNW")
print(f"Product: {product['title']}, Price: ${product['price']}")
Enter fullscreen mode Exit fullscreen mode

2. Database Schema

-- PostgreSQL schema for product data
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    asin VARCHAR(10) UNIQUE NOT NULL,
    title TEXT,
    price DECIMAL(10, 2),
    rating DECIMAL(3, 2),
    reviews_count INTEGER,
    availability VARCHAR(50),
    images JSONB,
    variants JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_asin ON products(asin);
CREATE INDEX idx_updated_at ON products(updated_at);

-- Reviews table
CREATE TABLE reviews (
    id SERIAL PRIMARY KEY,
    asin VARCHAR(10) NOT NULL,
    review_id VARCHAR(50) UNIQUE NOT NULL,
    rating INTEGER,
    title TEXT,
    content TEXT,
    author VARCHAR(255),
    verified_purchase BOOLEAN,
    review_date DATE,
    helpful_count INTEGER,
    created_at TIMESTAMP DEFAULT NOW(),
    FOREIGN KEY (asin) REFERENCES products(asin)
);

CREATE INDEX idx_reviews_asin ON reviews(asin);
CREATE INDEX idx_review_date ON reviews(review_date);
Enter fullscreen mode Exit fullscreen mode

3. Data Pipeline with Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
from pangolinfo_client import PangolinfoClient

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'amazon_product_sync',
    default_args=default_args,
    description='Sync Amazon product data via Pangolinfo API',
    schedule_interval='0 */6 * * *',  # Every 6 hours
    catchup=False
)

def fetch_and_store_products(**context):
    """Fetch products and store in database"""
    client = PangolinfoClient(
        api_key=os.getenv("PANGOLINFO_API_KEY"),
        api_secret=os.getenv("PANGOLINFO_API_SECRET")
    )

    # Get ASIN list from database
    conn = psycopg2.connect(os.getenv("DATABASE_URL"))
    cursor = conn.cursor()
    cursor.execute("SELECT DISTINCT asin FROM products")
    asin_list = [row[0] for row in cursor.fetchall()]

    # Fetch and update
    for asin in asin_list:
        try:
            data = client.get_product(asin)

            cursor.execute("""
                INSERT INTO products (asin, title, price, rating, reviews_count, availability, images, variants)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (asin) DO UPDATE SET
                    title = EXCLUDED.title,
                    price = EXCLUDED.price,
                    rating = EXCLUDED.rating,
                    reviews_count = EXCLUDED.reviews_count,
                    availability = EXCLUDED.availability,
                    images = EXCLUDED.images,
                    variants = EXCLUDED.variants,
                    updated_at = NOW()
            """, (
                data['asin'],
                data['title'],
                data['price'],
                data['rating'],
                data['reviews_count'],
                data['availability'],
                json.dumps(data['images']),
                json.dumps(data['variants'])
            ))
            conn.commit()

        except Exception as e:
            print(f"Error processing {asin}: {e}")
            continue

    cursor.close()
    conn.close()

fetch_task = PythonOperator(
    task_id='fetch_and_store_products',
    python_callable=fetch_and_store_products,
    dag=dag
)
Enter fullscreen mode Exit fullscreen mode

4. Caching Layer with Redis

import redis
import json
import hashlib
from functools import wraps

class CachedPangolinfoClient(PangolinfoClient):
    """API client with Redis caching"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.redis = redis.Redis(
            host=os.getenv("REDIS_HOST", "localhost"),
            port=int(os.getenv("REDIS_PORT", 6379)),
            decode_responses=True
        )

    def _cache_key(self, method: str, *args) -> str:
        """Generate cache key"""
        key_data = f"{method}:{':'.join(map(str, args))}"
        return hashlib.md5(key_data.encode()).hexdigest()

    def get_product(self, asin: str, marketplace: str = "US", ttl: int = 3600):
        """Get product with caching (default 1 hour TTL)"""
        cache_key = self._cache_key("get_product", asin, marketplace)

        # Try cache first
        cached = self.redis.get(cache_key)
        if cached:
            print(f"Cache hit for {asin}")
            return json.loads(cached)

        # Cache miss - fetch from API
        print(f"Cache miss for {asin}, fetching from API")
        data = super().get_product(asin, marketplace)

        # Store in cache
        if data:
            self.redis.setex(cache_key, ttl, json.dumps(data))

        return data

# Usage
cached_client = CachedPangolinfoClient(
    api_key=os.getenv("PANGOLINFO_API_KEY"),
    api_secret=os.getenv("PANGOLINFO_API_SECRET")
)

# First call - API request
product1 = cached_client.get_product("B08N5WRWNW")  # Cache miss

# Second call - from cache
product2 = cached_client.get_product("B08N5WRWNW")  # Cache hit
Enter fullscreen mode Exit fullscreen mode

5. Rate Limiting

from ratelimit import limits, sleep_and_retry
import time

class RateLimitedClient(CachedPangolinfoClient):
    """API client with rate limiting"""

    @sleep_and_retry
    @limits(calls=100, period=60)  # 100 calls per minute
    def get_product(self, *args, **kwargs):
        return super().get_product(*args, **kwargs)

    @sleep_and_retry
    @limits(calls=50, period=60)  # 50 calls per minute for reviews
    def get_reviews(self, asin: str, max_pages: int = 10):
        """Fetch product reviews with rate limiting"""
        # Implementation here
        pass

# Usage
client = RateLimitedClient(
    api_key=os.getenv("PANGOLINFO_API_KEY"),
    api_secret=os.getenv("PANGOLINFO_API_SECRET")
)
Enter fullscreen mode Exit fullscreen mode

Cost Optimization Strategies

1. Smart Scheduling

# Different update frequencies for different data types
update_schedule = {
    "price": "1h",        # Price changes frequently
    "inventory": "2h",    # Stock levels
    "rating": "6h",       # Ratings change slowly
    "reviews": "24h",     # Reviews once daily
    "description": "7d"   # Static content weekly
}
Enter fullscreen mode Exit fullscreen mode

2. Incremental Updates

def incremental_sync(last_sync_time):
    """Only fetch products updated since last sync"""
    cursor.execute("""
        SELECT asin FROM products 
        WHERE updated_at < %s 
        OR updated_at IS NULL
    """, (last_sync_time,))
    return [row[0] for row in cursor.fetchall()]
Enter fullscreen mode Exit fullscreen mode

3. Batch Requests

def batch_fetch(asin_list, batch_size=50):
    """Fetch products in batches"""
    for i in range(0, len(asin_list), batch_size):
        batch = asin_list[i:i+batch_size]
        # Process batch
        yield [client.get_product(asin) for asin in batch]
Enter fullscreen mode Exit fullscreen mode

Results & Metrics

After 6 months of operation:

  • Cost Reduction: 53% ($150K → $70K annually)
  • API Call Optimization: 40% reduction through caching
  • Data Freshness: <5 minutes (vs 24 hours with SellerSprite)
  • System Uptime: 99.8%
  • Cache Hit Rate: 82%

Lessons Learned

  1. Start with POC: Validate API quality before full migration
  2. Implement Caching Early: Reduced our API costs by 40%
  3. Monitor Everything: Set up alerts for cost, errors, and latency
  4. Plan for Failures: Retry logic and circuit breakers are essential
  5. Document Well: Future team members will thank you

When to Choose API vs SaaS

Choose API (Pangolinfo) if:

  • You have 2+ developers on the team
  • Managing 50+ SKUs with high data volume
  • Need custom analytics or ML models
  • Want to integrate with existing systems

Stick with SaaS (SellerSprite) if:

  • Small team without technical resources
  • <50 SKUs with basic needs
  • Need immediate solution without development time

Resources

Conclusion

For teams with technical capabilities, building a custom data pipeline with Pangolinfo API delivers superior ROI compared to traditional SaaS tools. The initial development investment pays off through lower ongoing costs, complete data ownership, and unlimited customization potential.


Tags: #python #api #ecommerce #dataengineering #amazon #postgresql #redis #airflow

Questions? Drop a comment below or reach out on Twitter [@yourhandle]

Found this helpful? ⭐ Star the repo and share with your network!

Top comments (0)