DEV Community

agenthustler
agenthustler

Posted on

Building a Personalized Deal Finder Across 50+ Retail Sites

Everyone loves a good deal, but nobody has time to check 50 websites daily. Let's build a deal finder that scrapes retail sites, learns your preferences, and alerts you when prices drop on items you care about.

How Deal Finders Work

The best deal finders combine three things:

  1. Price scraping across multiple retailers
  2. Historical price tracking to identify real deals vs fake markdowns
  3. Personalization to surface deals you actually want

Setting Up

pip install requests beautifulsoup4 pandas scikit-learn schedule
Enter fullscreen mode Exit fullscreen mode

We'll use ScraperAPI to handle anti-bot protections on major retail sites:

import requests
from bs4 import BeautifulSoup
import json
import re

SCRAPER_KEY = "YOUR_SCRAPERAPI_KEY"

def scrape(url, render=True):
    """Fetch page through ScraperAPI."""
    params = {
        "api_key": SCRAPER_KEY,
        "url": url,
        "render": str(render).lower()
    }
    resp = requests.get(
        "http://api.scraperapi.com",
        params=params,
        timeout=60
    )
    return BeautifulSoup(resp.text, "html.parser")
Enter fullscreen mode Exit fullscreen mode

Multi-Retailer Price Scrapers

class RetailScraper:
    """Base class for retail site scrapers."""

    def search(self, query):
        raise NotImplementedError

    def parse_price(self, text):
        match = re.search(r"\$(\d+[,\d]*\.?\d*)", text.replace(",", ""))
        return float(match.group(1)) if match else None


class AmazonScraper(RetailScraper):
    def search(self, query):
        url = f"https://www.amazon.com/s?k={query.replace(' ', '+')}"
        soup = scrape(url)
        products = []

        for item in soup.select("[data-component-type='s-search-result']"):
            title = item.select_one("h2 span")
            price = item.select_one(".a-price .a-offscreen")
            rating = item.select_one(".a-icon-alt")
            link = item.select_one("h2 a")

            if title and price:
                products.append({
                    "title": title.text.strip(),
                    "price": self.parse_price(price.text),
                    "rating": rating.text if rating else "N/A",
                    "url": f"https://www.amazon.com{link['href']}" if link else "",
                    "retailer": "Amazon"
                })
        return products[:10]


class WalmartScraper(RetailScraper):
    def search(self, query):
        url = f"https://www.walmart.com/search?q={query.replace(' ', '+')}"
        soup = scrape(url)
        products = []

        for item in soup.select("[data-item-id]"):
            title = item.select_one("[data-automation-id='product-title']")
            price = item.select_one("[data-automation-id='product-price']")
            link = item.select_one("a")

            if title and price:
                products.append({
                    "title": title.text.strip(),
                    "price": self.parse_price(price.text),
                    "url": f"https://www.walmart.com{link['href']}" if link else "",
                    "retailer": "Walmart"
                })
        return products[:10]


class TargetScraper(RetailScraper):
    def search(self, query):
        url = f"https://www.target.com/s?searchTerm={query.replace(' ', '+')}"
        soup = scrape(url)
        products = []

        for item in soup.select("[data-test='product-grid'] li"):
            title = item.select_one("[data-test='product-title']")
            price = item.select_one("[data-test='current-price']")
            link = item.select_one("a")

            if title and price:
                products.append({
                    "title": title.text.strip(),
                    "price": self.parse_price(price.text),
                    "url": f"https://www.target.com{link['href']}" if link else "",
                    "retailer": "Target"
                })
        return products[:10]
Enter fullscreen mode Exit fullscreen mode

Cross-Retailer Search

from concurrent.futures import ThreadPoolExecutor
import pandas as pd

class DealFinder:
    def __init__(self):
        self.scrapers = [
            AmazonScraper(),
            WalmartScraper(),
            TargetScraper(),
        ]

    def search_all(self, query):
        """Search across all retailers concurrently."""
        all_products = []

        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = {
                executor.submit(s.search, query): s.__class__.__name__
                for s in self.scrapers
            }
            for future in futures:
                try:
                    products = future.result(timeout=30)
                    all_products.extend(products)
                except Exception as e:
                    print(f"{futures[future]} failed: {e}")

        df = pd.DataFrame(all_products)
        if not df.empty:
            df = df.sort_values("price")
        return df

finder = DealFinder()
results = finder.search_all("sony wh-1000xm5")
print(results[["title", "price", "retailer"]].head(10))
Enter fullscreen mode Exit fullscreen mode

Price History Tracking

import sqlite3
from datetime import datetime, date

class PriceTracker:
    def __init__(self, db="prices.db"):
        self.conn = sqlite3.connect(db)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS prices (
                id INTEGER PRIMARY KEY,
                product_key TEXT,
                title TEXT,
                price REAL,
                retailer TEXT,
                url TEXT,
                date TEXT
            )
        """)
        self.conn.commit()

    def record(self, products_df):
        """Save current prices."""
        today = date.today().isoformat()
        for _, row in products_df.iterrows():
            key = f"{row['retailer']}:{row['title'][:50]}"
            self.conn.execute(
                "INSERT INTO prices VALUES (NULL,?,?,?,?,?,?)",
                (key, row["title"], row["price"],
                 row["retailer"], row.get("url", ""), today)
            )
        self.conn.commit()

    def is_good_deal(self, product_key, current_price):
        """Check if current price is historically low."""
        cursor = self.conn.execute(
            "SELECT MIN(price), AVG(price) FROM prices WHERE product_key = ?",
            (product_key,)
        )
        row = cursor.fetchone()
        if row[0] is None:
            return False, {}

        min_price, avg_price = row
        return current_price <= min_price, {
            "current": current_price,
            "historical_low": min_price,
            "average": round(avg_price, 2),
            "discount_vs_avg": round((1 - current_price / avg_price) * 100, 1)
        }
Enter fullscreen mode Exit fullscreen mode

Personalized Deal Scoring

class DealPersonalizer:
    def __init__(self):
        self.preferences = {}

    def set_preferences(self, categories=None, max_price=None,
                        brands=None, min_discount=10):
        """Set user preferences for deal filtering."""
        self.preferences = {
            "categories": categories or [],
            "max_price": max_price,
            "brands": brands or [],
            "min_discount": min_discount
        }

    def score_deal(self, product, price_history):
        """Score a deal based on preferences and price history."""
        score = 0

        # Price history score (0-40 points)
        if price_history.get("discount_vs_avg", 0) > 0:
            score += min(price_history["discount_vs_avg"], 40)

        # Brand preference (0-20 points)
        for brand in self.preferences.get("brands", []):
            if brand.lower() in product["title"].lower():
                score += 20
                break

        # Under budget (0-20 points)
        max_price = self.preferences.get("max_price")
        if max_price and product["price"] <= max_price:
            score += 20

        # Historical low (0-20 points)
        if price_history.get("current") == price_history.get("historical_low"):
            score += 20

        return score
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

def daily_deal_check(watchlist, finder, tracker, personalizer):
    """Run daily deal check and alert on good finds."""
    great_deals = []

    for query in watchlist:
        results = finder.search_all(query)
        tracker.record(results)

        for _, product in results.iterrows():
            key = f"{product['retailer']}:{product['title'][:50]}"
            is_low, history = tracker.is_good_deal(key, product["price"])
            score = personalizer.score_deal(product, history)

            if score >= 50:  # Only alert on high-scoring deals
                great_deals.append({
                    **product.to_dict(),
                    "score": score,
                    "history": history
                })

    if great_deals:
        print(f"\nFound {len(great_deals)} great deals!")
        for deal in sorted(great_deals, key=lambda d: -d["score"]):
            print(f"  [{deal['score']}] {deal['title'][:60]}")
            print(f"       ${deal['price']} at {deal['retailer']}")
            if deal["history"]:
                print(f"       {deal['history']['discount_vs_avg']}% below average")
    return great_deals
Enter fullscreen mode Exit fullscreen mode

Scaling to 50+ Sites

For scraping dozens of retail sites reliably, use ThorData for residential proxy rotation and ScrapeOps to monitor which scrapers are working. Different retailers need different approaches: some work with simple HTTP requests, others need full JavaScript rendering.

Conclusion

A personalized deal finder saves real money by automating the tedious work of checking multiple retailers. The key differentiator is price history, as it separates genuine deals from fake "sales" where the price was raised before being "discounted." Start with 3-5 retailers, build up price history for a month, then expand your coverage.

The best deals are the ones you don't have to hunt for.

Top comments (0)