DEV Community

Peter
Peter

Posted on

Build a TypeScript Price Monitoring Pipeline with @agorio/sdk

Build a TypeScript Price Monitoring Pipeline with @agorio/sdk

Price monitoring is one of the most practical applications of AI commerce agents. Instead of manually checking prices across different stores, you can automate the entire pipeline: product discovery, data extraction, historical tracking, and alert triggers.

In this guide, we'll build a complete price monitoring system using @agorio/sdk — the open-source toolkit that implements both UCP (Universal Commerce Protocol) and ACP (Agentic Commerce Protocol) for AI shopping agents.

What We're Building

A price monitoring pipeline with these components:

  1. Product Discovery — Find products across UCP-enabled merchants
  2. Structured Extraction — Get clean price, title, availability data
  3. Historical Snapshots — Track price changes over time
  4. Diff Detection — Alert when prices change significantly
  5. Retry Logic — Handle network failures gracefully

Why Agorio for Price Monitoring?

Traditional web scraping requires site-specific selectors that break constantly. Agorio uses standardized commerce protocols:

  • UCP merchants expose structured product data at /.well-known/ucp
  • ACP merchants provide normalized checkout APIs
  • Auto-detection — one client handles both protocols seamlessly

Over 4.8 million Shopify stores are UCP-discoverable. 1.5 million Stripe merchants support ACP. This covers most online commerce.

Setup

npm install @agorio/sdk
Enter fullscreen mode Exit fullscreen mode

We'll start with a mock merchant for testing, then show real merchant integration:

import { 
  ShoppingAgent, 
  MockMerchant, 
  GeminiAdapter 
} from '@agorio/sdk';

// Start a test merchant
const merchant = new MockMerchant({ port: 3456 });
await merchant.start();

console.log(`Mock merchant running at: ${merchant.baseUrl}`);
// Output: Mock merchant running at: http://localhost:3456
Enter fullscreen mode Exit fullscreen mode

1. Product Schema Definition

First, define the structure for tracked products:

interface ProductSnapshot {
  productId: string;
  title: string;
  price: {
    amount: number;     // In cents: $29.99 = 2999
    currency: string;   // ISO 4217: "USD", "EUR", etc.
  };
  availability: 'in_stock' | 'out_of_stock' | 'limited' | 'preorder';
  url: string;
  merchantDomain: string;
  timestamp: string;    // ISO 8601

  // Optional fields for richer monitoring
  description?: string;
  imageUrl?: string;
  reviewCount?: number;
  averageRating?: number;
}

interface PriceHistory {
  productId: string;
  snapshots: ProductSnapshot[];
  lastChecked: string;
  alertThreshold: number; // Alert if price changes by this % 
}
Enter fullscreen mode Exit fullscreen mode

2. Price Extraction Engine

The core extractor uses Agorio's agent tools to get structured data:

class PriceExtractor {
  private agent: ShoppingAgent;

  constructor(apiKey: string) {
    this.agent = new ShoppingAgent({
      llm: new GeminiAdapter({ apiKey }),
      verbose: false, // Set true for debugging
    });
  }

  async extractProduct(
    merchantDomain: string, 
    searchQuery: string
  ): Promise<ProductSnapshot | null> {
    try {
      // Run the shopping agent with extraction prompt
      const result = await this.agent.run(`
        Go to ${merchantDomain} and search for "${searchQuery}".
        Find the first available product and extract:
        - Product ID
        - Exact title
        - Current price (amount and currency)
        - Availability status
        - Product URL if available

        Return just the structured data, no purchase.
      `);

      // Parse the agent's response into our schema
      const extracted = this.parseAgentResponse(result.answer);

      return {
        productId: extracted.id,
        title: extracted.title,
        price: {
          amount: Math.round(extracted.price * 100), // Convert to cents
          currency: extracted.currency || 'USD',
        },
        availability: this.normalizeAvailability(extracted.availability),
        url: extracted.url || `${merchantDomain}/search?q=${encodeURIComponent(searchQuery)}`,
        merchantDomain,
        timestamp: new Date().toISOString(),
        description: extracted.description,
        imageUrl: extracted.imageUrl,
      };

    } catch (error) {
      console.error(`Extraction failed for ${merchantDomain}:`, error);
      return null;
    }
  }

  private parseAgentResponse(response: string): any {
    // The agent returns natural language, extract structured data
    // In production, you'd use the agent's tool calls or structured output
    const priceMatch = response.match(/price[:\s]*\$?([0-9,]+\.?[0-9]*)/i);
    const titleMatch = response.match(/title[:\s]*([^\n]+)/i);
    const idMatch = response.match(/id[:\s]*([^\s\n]+)/i);

    return {
      id: idMatch?.[1] || `product_${Date.now()}`,
      title: titleMatch?.[1]?.trim() || 'Unknown Product',
      price: priceMatch ? parseFloat(priceMatch[1].replace(',', '')) : 0,
      currency: 'USD',
      availability: 'in_stock',
    };
  }

  private normalizeAvailability(status: string): ProductSnapshot['availability'] {
    const normalized = status.toLowerCase();
    if (normalized.includes('out') || normalized.includes('sold')) return 'out_of_stock';
    if (normalized.includes('limited') || normalized.includes('few')) return 'limited';
    if (normalized.includes('preorder') || normalized.includes('pre-order')) return 'preorder';
    return 'in_stock';
  }
}
Enter fullscreen mode Exit fullscreen mode

3. Historical Tracking with Retry Logic

Add persistence and failure handling:

import { promises as fs } from 'fs';
import path from 'path';

class PriceTracker {
  private extractor: PriceExtractor;
  private dataDir: string;

  constructor(apiKey: string, dataDir = './price-data') {
    this.extractor = new PriceExtractor(apiKey);
    this.dataDir = dataDir;
  }

  async track(
    merchantDomain: string, 
    searchQuery: string,
    options: {
      alertThreshold?: number; // % change to trigger alert
      maxRetries?: number;
      retryDelay?: number; // ms
    } = {}
  ): Promise<ProductSnapshot | null> {
    const { alertThreshold = 10, maxRetries = 3, retryDelay = 2000 } = options;

    let lastError: Error | null = null;

    // Retry logic with exponential backoff
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        console.log(`[Attempt ${attempt}/${maxRetries}] Tracking ${searchQuery} on ${merchantDomain}`);

        const snapshot = await this.extractor.extractProduct(merchantDomain, searchQuery);
        if (!snapshot) {
          throw new Error('Failed to extract product data');
        }

        // Load existing history
        const history = await this.loadHistory(snapshot.productId);

        // Add new snapshot
        history.snapshots.push(snapshot);
        history.lastChecked = snapshot.timestamp;

        // Check for price changes
        const priceChanged = this.detectPriceChange(history, alertThreshold);
        if (priceChanged) {
          this.triggerAlert(history, priceChanged);
        }

        // Save updated history
        await this.saveHistory(history);

        console.log(`✅ Tracked ${snapshot.title}: $${snapshot.price.amount / 100}`);
        return snapshot;

      } catch (error) {
        lastError = error as Error;
        console.error(`❌ Attempt ${attempt} failed:`, error.message);

        if (attempt < maxRetries) {
          const delay = retryDelay * Math.pow(2, attempt - 1); // Exponential backoff
          console.log(`⏳ Retrying in ${delay}ms...`);
          await this.sleep(delay);
        }
      }
    }

    console.error(`🚨 All ${maxRetries} attempts failed. Last error:`, lastError?.message);
    return null;
  }

  private async loadHistory(productId: string): Promise<PriceHistory> {
    const filePath = path.join(this.dataDir, `${productId}.json`);

    try {
      await fs.mkdir(this.dataDir, { recursive: true });
      const data = await fs.readFile(filePath, 'utf-8');
      return JSON.parse(data);
    } catch {
      // File doesn't exist, create new history
      return {
        productId,
        snapshots: [],
        lastChecked: new Date().toISOString(),
        alertThreshold: 10,
      };
    }
  }

  private async saveHistory(history: PriceHistory): Promise<void> {
    const filePath = path.join(this.dataDir, `${history.productId}.json`);
    await fs.writeFile(filePath, JSON.stringify(history, null, 2));
  }

  private detectPriceChange(history: PriceHistory, threshold: number) {
    if (history.snapshots.length < 2) return null;

    const current = history.snapshots[history.snapshots.length - 1];
    const previous = history.snapshots[history.snapshots.length - 2];

    const currentPrice = current.price.amount;
    const previousPrice = previous.price.amount;

    if (previousPrice === 0) return null; // Avoid division by zero

    const changePercent = ((currentPrice - previousPrice) / previousPrice) * 100;

    if (Math.abs(changePercent) >= threshold) {
      return {
        changePercent: changePercent.toFixed(2),
        previousPrice,
        currentPrice,
        direction: changePercent > 0 ? 'increased' : 'decreased',
        current,
        previous,
      };
    }

    return null;
  }

  private triggerAlert(history: PriceHistory, change: any) {
    console.log(`
🔔 PRICE ALERT for ${change.current.title}
📈 Price ${change.direction} by ${Math.abs(change.changePercent)}%
💰 Was: $${change.previousPrice / 100} → Now: $${change.currentPrice / 100}
🏪 Store: ${change.current.merchantDomain}
⏰ Time: ${new Date(change.current.timestamp).toLocaleString()}
    `.trim());

    // In production, send email, SMS, or webhook here
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}
Enter fullscreen mode Exit fullscreen mode

4. Monitoring Multiple Products

Scale to multiple products and merchants:

class PriceMonitor {
  private tracker: PriceTracker;
  private watchlist: Array<{
    merchantDomain: string;
    searchQuery: string;
    alertThreshold: number;
  }> = [];

  constructor(apiKey: string) {
    this.tracker = new PriceTracker(apiKey);
  }

  addWatch(merchantDomain: string, searchQuery: string, alertThreshold = 10) {
    this.watchlist.push({ merchantDomain, searchQuery, alertThreshold });
    console.log(`Added to watchlist: "${searchQuery}" on ${merchantDomain}`);
  }

  async runFullCheck(): Promise<void> {
    console.log(`🏃 Running full price check on ${this.watchlist.length} items...`);

    const results = await Promise.allSettled(
      this.watchlist.map(item => 
        this.tracker.track(item.merchantDomain, item.searchQuery, {
          alertThreshold: item.alertThreshold,
        })
      )
    );

    const successful = results.filter(r => r.status === 'fulfilled').length;
    const failed = results.length - successful;

    console.log(`✅ Check complete: ${successful} successful, ${failed} failed`);
  }

  async startScheduled(intervalMinutes = 60): Promise<void> {
    console.log(`⏰ Starting scheduled monitoring every ${intervalMinutes} minutes`);

    // Run immediately
    await this.runFullCheck();

    // Then run on interval
    setInterval(() => {
      this.runFullCheck().catch(console.error);
    }, intervalMinutes * 60 * 1000);
  }
}
Enter fullscreen mode Exit fullscreen mode

5. Complete Working Example

Put it all together with real configuration:

async function main() {
  const apiKey = process.env.GEMINI_API_KEY;
  if (!apiKey) {
    throw new Error('Set GEMINI_API_KEY environment variable');
  }

  // For testing, start with MockMerchant
  const merchant = new MockMerchant();
  await merchant.start();

  try {
    const monitor = new PriceMonitor(apiKey);

    // Add products to monitor
    monitor.addWatch(merchant.domain, 'wireless headphones', 15); // 15% threshold
    monitor.addWatch(merchant.domain, 'laptop stand', 10);
    monitor.addWatch(merchant.domain, 'mechanical keyboard', 20);

    // Run a single check
    await monitor.runFullCheck();

    // For continuous monitoring:
    // await monitor.startScheduled(30); // Every 30 minutes

  } finally {
    await merchant.stop();
  }
}

// Run with error handling
main().catch(console.error);
Enter fullscreen mode Exit fullscreen mode

6. Production Deployment

For production monitoring, consider these enhancements:

Database Storage

Replace JSON files with PostgreSQL or MongoDB:

// Example with PostgreSQL
import { Pool } from 'pg';

class DatabasePriceTracker extends PriceTracker {
  private db: Pool;

  constructor(apiKey: string, connectionString: string) {
    super(apiKey);
    this.db = new Pool({ connectionString });
  }

  async saveSnapshot(snapshot: ProductSnapshot): Promise<void> {
    await this.db.query(`
      INSERT INTO price_snapshots 
      (product_id, title, price_amount, currency, availability, timestamp, merchant_domain)
      VALUES ($1, $2, $3, $4, $5, $6, $7)
    `, [
      snapshot.productId,
      snapshot.title, 
      snapshot.price.amount,
      snapshot.price.currency,
      snapshot.availability,
      snapshot.timestamp,
      snapshot.merchantDomain
    ]);
  }
}
Enter fullscreen mode Exit fullscreen mode

Real Merchant Discovery

Replace mock merchant with real UCP discovery:

async function discoverRealMerchants(): Promise<string[]> {
  const domains = [
    'shop.example.com',
    'store.anothersite.com',
    // Add UCP-enabled merchant domains
  ];

  const ucpClient = new UcpClient();
  const validDomains: string[] = [];

  for (const domain of domains) {
    try {
      await ucpClient.discover(domain);
      validDomains.push(domain);
      console.log(`✅ UCP enabled: ${domain}`);
    } catch {
      console.log(`❌ UCP not found: ${domain}`);
    }
  }

  return validDomains;
}
Enter fullscreen mode Exit fullscreen mode

Error Recovery & Alerts

Add webhook notifications:

async function sendSlackAlert(change: any) {
  const webhook = process.env.SLACK_WEBHOOK_URL;
  if (!webhook) return;

  await fetch(webhook, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      text: `🔔 Price Alert: ${change.current.title} ${change.direction} by ${change.changePercent}%`,
      attachments: [{
        color: change.direction === 'decreased' ? 'good' : 'warning',
        fields: [
          { title: 'Product', value: change.current.title, short: true },
          { title: 'Store', value: change.current.merchantDomain, short: true },
          { title: 'Previous', value: `$${change.previousPrice / 100}`, short: true },
          { title: 'Current', value: `$${change.currentPrice / 100}`, short: true },
        ]
      }]
    })
  });
}
Enter fullscreen mode Exit fullscreen mode

Why This Approach Works

Protocol-Based > Screen Scraping: UCP/ACP provide structured APIs that don't break when sites redesign.

Agent Intelligence: The LLM adapter handles natural language search queries and adapts to different merchant layouts automatically.

Multi-Protocol: Auto-detection between UCP and ACP means maximum merchant coverage without code changes.

Type Safety: Full TypeScript types prevent runtime errors in production monitoring systems.

Next Steps

  • Scale Testing: Use MockMerchant with different product catalogs
  • Add More LLMs: Try Claude, OpenAI, or local Ollama models
  • Visual Monitoring: Add screenshot capture for manual verification
  • Market Analysis: Track competitor pricing across multiple merchants
  • Inventory Alerts: Monitor stock levels, not just prices

The complete code is available as examples in the @agorio/sdk repository. The SDK handles the protocol complexity so you can focus on business logic.


Found this useful? Star @agorio/sdk on GitHub and let me know what you build with agentic commerce!

Top comments (0)