Build a TypeScript Price Monitoring Pipeline with @agorio/sdk
Price monitoring is one of the most practical applications of AI commerce agents. Instead of manually checking prices across different stores, you can automate the entire pipeline: product discovery, data extraction, historical tracking, and alert triggers.
In this guide, we'll build a complete price monitoring system using @agorio/sdk — the open-source toolkit that implements both UCP (Universal Commerce Protocol) and ACP (Agentic Commerce Protocol) for AI shopping agents.
What We're Building
A price monitoring pipeline with these components:
- Product Discovery — Find products across UCP-enabled merchants
- Structured Extraction — Get clean price, title, availability data
- Historical Snapshots — Track price changes over time
- Diff Detection — Alert when prices change significantly
- Retry Logic — Handle network failures gracefully
Why Agorio for Price Monitoring?
Traditional web scraping requires site-specific selectors that break constantly. Agorio uses standardized commerce protocols:
-
UCP merchants expose structured product data at
/.well-known/ucp - ACP merchants provide normalized checkout APIs
- Auto-detection — one client handles both protocols seamlessly
Over 4.8 million Shopify stores are UCP-discoverable. 1.5 million Stripe merchants support ACP. This covers most online commerce.
Setup
npm install @agorio/sdk
We'll start with a mock merchant for testing, then show real merchant integration:
import {
ShoppingAgent,
MockMerchant,
GeminiAdapter
} from '@agorio/sdk';
// Start a test merchant
const merchant = new MockMerchant({ port: 3456 });
await merchant.start();
console.log(`Mock merchant running at: ${merchant.baseUrl}`);
// Output: Mock merchant running at: http://localhost:3456
1. Product Schema Definition
First, define the structure for tracked products:
interface ProductSnapshot {
productId: string;
title: string;
price: {
amount: number; // In cents: $29.99 = 2999
currency: string; // ISO 4217: "USD", "EUR", etc.
};
availability: 'in_stock' | 'out_of_stock' | 'limited' | 'preorder';
url: string;
merchantDomain: string;
timestamp: string; // ISO 8601
// Optional fields for richer monitoring
description?: string;
imageUrl?: string;
reviewCount?: number;
averageRating?: number;
}
interface PriceHistory {
productId: string;
snapshots: ProductSnapshot[];
lastChecked: string;
alertThreshold: number; // Alert if price changes by this %
}
2. Price Extraction Engine
The core extractor uses Agorio's agent tools to get structured data:
class PriceExtractor {
private agent: ShoppingAgent;
constructor(apiKey: string) {
this.agent = new ShoppingAgent({
llm: new GeminiAdapter({ apiKey }),
verbose: false, // Set true for debugging
});
}
async extractProduct(
merchantDomain: string,
searchQuery: string
): Promise<ProductSnapshot | null> {
try {
// Run the shopping agent with extraction prompt
const result = await this.agent.run(`
Go to ${merchantDomain} and search for "${searchQuery}".
Find the first available product and extract:
- Product ID
- Exact title
- Current price (amount and currency)
- Availability status
- Product URL if available
Return just the structured data, no purchase.
`);
// Parse the agent's response into our schema
const extracted = this.parseAgentResponse(result.answer);
return {
productId: extracted.id,
title: extracted.title,
price: {
amount: Math.round(extracted.price * 100), // Convert to cents
currency: extracted.currency || 'USD',
},
availability: this.normalizeAvailability(extracted.availability),
url: extracted.url || `${merchantDomain}/search?q=${encodeURIComponent(searchQuery)}`,
merchantDomain,
timestamp: new Date().toISOString(),
description: extracted.description,
imageUrl: extracted.imageUrl,
};
} catch (error) {
console.error(`Extraction failed for ${merchantDomain}:`, error);
return null;
}
}
private parseAgentResponse(response: string): any {
// The agent returns natural language, extract structured data
// In production, you'd use the agent's tool calls or structured output
const priceMatch = response.match(/price[:\s]*\$?([0-9,]+\.?[0-9]*)/i);
const titleMatch = response.match(/title[:\s]*([^\n]+)/i);
const idMatch = response.match(/id[:\s]*([^\s\n]+)/i);
return {
id: idMatch?.[1] || `product_${Date.now()}`,
title: titleMatch?.[1]?.trim() || 'Unknown Product',
price: priceMatch ? parseFloat(priceMatch[1].replace(',', '')) : 0,
currency: 'USD',
availability: 'in_stock',
};
}
private normalizeAvailability(status: string): ProductSnapshot['availability'] {
const normalized = status.toLowerCase();
if (normalized.includes('out') || normalized.includes('sold')) return 'out_of_stock';
if (normalized.includes('limited') || normalized.includes('few')) return 'limited';
if (normalized.includes('preorder') || normalized.includes('pre-order')) return 'preorder';
return 'in_stock';
}
}
3. Historical Tracking with Retry Logic
Add persistence and failure handling:
import { promises as fs } from 'fs';
import path from 'path';
class PriceTracker {
private extractor: PriceExtractor;
private dataDir: string;
constructor(apiKey: string, dataDir = './price-data') {
this.extractor = new PriceExtractor(apiKey);
this.dataDir = dataDir;
}
async track(
merchantDomain: string,
searchQuery: string,
options: {
alertThreshold?: number; // % change to trigger alert
maxRetries?: number;
retryDelay?: number; // ms
} = {}
): Promise<ProductSnapshot | null> {
const { alertThreshold = 10, maxRetries = 3, retryDelay = 2000 } = options;
let lastError: Error | null = null;
// Retry logic with exponential backoff
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
console.log(`[Attempt ${attempt}/${maxRetries}] Tracking ${searchQuery} on ${merchantDomain}`);
const snapshot = await this.extractor.extractProduct(merchantDomain, searchQuery);
if (!snapshot) {
throw new Error('Failed to extract product data');
}
// Load existing history
const history = await this.loadHistory(snapshot.productId);
// Add new snapshot
history.snapshots.push(snapshot);
history.lastChecked = snapshot.timestamp;
// Check for price changes
const priceChanged = this.detectPriceChange(history, alertThreshold);
if (priceChanged) {
this.triggerAlert(history, priceChanged);
}
// Save updated history
await this.saveHistory(history);
console.log(`✅ Tracked ${snapshot.title}: $${snapshot.price.amount / 100}`);
return snapshot;
} catch (error) {
lastError = error as Error;
console.error(`❌ Attempt ${attempt} failed:`, error.message);
if (attempt < maxRetries) {
const delay = retryDelay * Math.pow(2, attempt - 1); // Exponential backoff
console.log(`⏳ Retrying in ${delay}ms...`);
await this.sleep(delay);
}
}
}
console.error(`🚨 All ${maxRetries} attempts failed. Last error:`, lastError?.message);
return null;
}
private async loadHistory(productId: string): Promise<PriceHistory> {
const filePath = path.join(this.dataDir, `${productId}.json`);
try {
await fs.mkdir(this.dataDir, { recursive: true });
const data = await fs.readFile(filePath, 'utf-8');
return JSON.parse(data);
} catch {
// File doesn't exist, create new history
return {
productId,
snapshots: [],
lastChecked: new Date().toISOString(),
alertThreshold: 10,
};
}
}
private async saveHistory(history: PriceHistory): Promise<void> {
const filePath = path.join(this.dataDir, `${history.productId}.json`);
await fs.writeFile(filePath, JSON.stringify(history, null, 2));
}
private detectPriceChange(history: PriceHistory, threshold: number) {
if (history.snapshots.length < 2) return null;
const current = history.snapshots[history.snapshots.length - 1];
const previous = history.snapshots[history.snapshots.length - 2];
const currentPrice = current.price.amount;
const previousPrice = previous.price.amount;
if (previousPrice === 0) return null; // Avoid division by zero
const changePercent = ((currentPrice - previousPrice) / previousPrice) * 100;
if (Math.abs(changePercent) >= threshold) {
return {
changePercent: changePercent.toFixed(2),
previousPrice,
currentPrice,
direction: changePercent > 0 ? 'increased' : 'decreased',
current,
previous,
};
}
return null;
}
private triggerAlert(history: PriceHistory, change: any) {
console.log(`
🔔 PRICE ALERT for ${change.current.title}
📈 Price ${change.direction} by ${Math.abs(change.changePercent)}%
💰 Was: $${change.previousPrice / 100} → Now: $${change.currentPrice / 100}
🏪 Store: ${change.current.merchantDomain}
⏰ Time: ${new Date(change.current.timestamp).toLocaleString()}
`.trim());
// In production, send email, SMS, or webhook here
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
4. Monitoring Multiple Products
Scale to multiple products and merchants:
class PriceMonitor {
private tracker: PriceTracker;
private watchlist: Array<{
merchantDomain: string;
searchQuery: string;
alertThreshold: number;
}> = [];
constructor(apiKey: string) {
this.tracker = new PriceTracker(apiKey);
}
addWatch(merchantDomain: string, searchQuery: string, alertThreshold = 10) {
this.watchlist.push({ merchantDomain, searchQuery, alertThreshold });
console.log(`Added to watchlist: "${searchQuery}" on ${merchantDomain}`);
}
async runFullCheck(): Promise<void> {
console.log(`🏃 Running full price check on ${this.watchlist.length} items...`);
const results = await Promise.allSettled(
this.watchlist.map(item =>
this.tracker.track(item.merchantDomain, item.searchQuery, {
alertThreshold: item.alertThreshold,
})
)
);
const successful = results.filter(r => r.status === 'fulfilled').length;
const failed = results.length - successful;
console.log(`✅ Check complete: ${successful} successful, ${failed} failed`);
}
async startScheduled(intervalMinutes = 60): Promise<void> {
console.log(`⏰ Starting scheduled monitoring every ${intervalMinutes} minutes`);
// Run immediately
await this.runFullCheck();
// Then run on interval
setInterval(() => {
this.runFullCheck().catch(console.error);
}, intervalMinutes * 60 * 1000);
}
}
5. Complete Working Example
Put it all together with real configuration:
async function main() {
const apiKey = process.env.GEMINI_API_KEY;
if (!apiKey) {
throw new Error('Set GEMINI_API_KEY environment variable');
}
// For testing, start with MockMerchant
const merchant = new MockMerchant();
await merchant.start();
try {
const monitor = new PriceMonitor(apiKey);
// Add products to monitor
monitor.addWatch(merchant.domain, 'wireless headphones', 15); // 15% threshold
monitor.addWatch(merchant.domain, 'laptop stand', 10);
monitor.addWatch(merchant.domain, 'mechanical keyboard', 20);
// Run a single check
await monitor.runFullCheck();
// For continuous monitoring:
// await monitor.startScheduled(30); // Every 30 minutes
} finally {
await merchant.stop();
}
}
// Run with error handling
main().catch(console.error);
6. Production Deployment
For production monitoring, consider these enhancements:
Database Storage
Replace JSON files with PostgreSQL or MongoDB:
// Example with PostgreSQL
import { Pool } from 'pg';
class DatabasePriceTracker extends PriceTracker {
private db: Pool;
constructor(apiKey: string, connectionString: string) {
super(apiKey);
this.db = new Pool({ connectionString });
}
async saveSnapshot(snapshot: ProductSnapshot): Promise<void> {
await this.db.query(`
INSERT INTO price_snapshots
(product_id, title, price_amount, currency, availability, timestamp, merchant_domain)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, [
snapshot.productId,
snapshot.title,
snapshot.price.amount,
snapshot.price.currency,
snapshot.availability,
snapshot.timestamp,
snapshot.merchantDomain
]);
}
}
Real Merchant Discovery
Replace mock merchant with real UCP discovery:
async function discoverRealMerchants(): Promise<string[]> {
const domains = [
'shop.example.com',
'store.anothersite.com',
// Add UCP-enabled merchant domains
];
const ucpClient = new UcpClient();
const validDomains: string[] = [];
for (const domain of domains) {
try {
await ucpClient.discover(domain);
validDomains.push(domain);
console.log(`✅ UCP enabled: ${domain}`);
} catch {
console.log(`❌ UCP not found: ${domain}`);
}
}
return validDomains;
}
Error Recovery & Alerts
Add webhook notifications:
async function sendSlackAlert(change: any) {
const webhook = process.env.SLACK_WEBHOOK_URL;
if (!webhook) return;
await fetch(webhook, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
text: `🔔 Price Alert: ${change.current.title} ${change.direction} by ${change.changePercent}%`,
attachments: [{
color: change.direction === 'decreased' ? 'good' : 'warning',
fields: [
{ title: 'Product', value: change.current.title, short: true },
{ title: 'Store', value: change.current.merchantDomain, short: true },
{ title: 'Previous', value: `$${change.previousPrice / 100}`, short: true },
{ title: 'Current', value: `$${change.currentPrice / 100}`, short: true },
]
}]
})
});
}
Why This Approach Works
Protocol-Based > Screen Scraping: UCP/ACP provide structured APIs that don't break when sites redesign.
Agent Intelligence: The LLM adapter handles natural language search queries and adapts to different merchant layouts automatically.
Multi-Protocol: Auto-detection between UCP and ACP means maximum merchant coverage without code changes.
Type Safety: Full TypeScript types prevent runtime errors in production monitoring systems.
Next Steps
- Scale Testing: Use MockMerchant with different product catalogs
- Add More LLMs: Try Claude, OpenAI, or local Ollama models
- Visual Monitoring: Add screenshot capture for manual verification
- Market Analysis: Track competitor pricing across multiple merchants
- Inventory Alerts: Monitor stock levels, not just prices
The complete code is available as examples in the @agorio/sdk repository. The SDK handles the protocol complexity so you can focus on business logic.
Found this useful? Star @agorio/sdk on GitHub and let me know what you build with agentic commerce!
Top comments (0)