Everyone loves a good deal, but nobody has time to check 50 websites daily. Let's build a deal finder that scrapes retail sites, learns your preferences, and alerts you when prices drop on items you care about.
How Deal Finders Work
The best deal finders combine three things:
- Price scraping across multiple retailers
- Historical price tracking to identify real deals vs fake markdowns
- Personalization to surface deals you actually want
Setting Up
pip install requests beautifulsoup4 pandas scikit-learn schedule
We'll use ScraperAPI to handle anti-bot protections on major retail sites:
import requests
from bs4 import BeautifulSoup
import json
import re
SCRAPER_KEY = "YOUR_SCRAPERAPI_KEY"
def scrape(url, render=True):
"""Fetch page through ScraperAPI."""
params = {
"api_key": SCRAPER_KEY,
"url": url,
"render": str(render).lower()
}
resp = requests.get(
"http://api.scraperapi.com",
params=params,
timeout=60
)
return BeautifulSoup(resp.text, "html.parser")
Multi-Retailer Price Scrapers
class RetailScraper:
"""Base class for retail site scrapers."""
def search(self, query):
raise NotImplementedError
def parse_price(self, text):
match = re.search(r"\$(\d+[,\d]*\.?\d*)", text.replace(",", ""))
return float(match.group(1)) if match else None
class AmazonScraper(RetailScraper):
def search(self, query):
url = f"https://www.amazon.com/s?k={query.replace(' ', '+')}"
soup = scrape(url)
products = []
for item in soup.select("[data-component-type='s-search-result']"):
title = item.select_one("h2 span")
price = item.select_one(".a-price .a-offscreen")
rating = item.select_one(".a-icon-alt")
link = item.select_one("h2 a")
if title and price:
products.append({
"title": title.text.strip(),
"price": self.parse_price(price.text),
"rating": rating.text if rating else "N/A",
"url": f"https://www.amazon.com{link['href']}" if link else "",
"retailer": "Amazon"
})
return products[:10]
class WalmartScraper(RetailScraper):
def search(self, query):
url = f"https://www.walmart.com/search?q={query.replace(' ', '+')}"
soup = scrape(url)
products = []
for item in soup.select("[data-item-id]"):
title = item.select_one("[data-automation-id='product-title']")
price = item.select_one("[data-automation-id='product-price']")
link = item.select_one("a")
if title and price:
products.append({
"title": title.text.strip(),
"price": self.parse_price(price.text),
"url": f"https://www.walmart.com{link['href']}" if link else "",
"retailer": "Walmart"
})
return products[:10]
class TargetScraper(RetailScraper):
def search(self, query):
url = f"https://www.target.com/s?searchTerm={query.replace(' ', '+')}"
soup = scrape(url)
products = []
for item in soup.select("[data-test='product-grid'] li"):
title = item.select_one("[data-test='product-title']")
price = item.select_one("[data-test='current-price']")
link = item.select_one("a")
if title and price:
products.append({
"title": title.text.strip(),
"price": self.parse_price(price.text),
"url": f"https://www.target.com{link['href']}" if link else "",
"retailer": "Target"
})
return products[:10]
Cross-Retailer Search
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
class DealFinder:
def __init__(self):
self.scrapers = [
AmazonScraper(),
WalmartScraper(),
TargetScraper(),
]
def search_all(self, query):
"""Search across all retailers concurrently."""
all_products = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(s.search, query): s.__class__.__name__
for s in self.scrapers
}
for future in futures:
try:
products = future.result(timeout=30)
all_products.extend(products)
except Exception as e:
print(f"{futures[future]} failed: {e}")
df = pd.DataFrame(all_products)
if not df.empty:
df = df.sort_values("price")
return df
finder = DealFinder()
results = finder.search_all("sony wh-1000xm5")
print(results[["title", "price", "retailer"]].head(10))
Price History Tracking
import sqlite3
from datetime import datetime, date
class PriceTracker:
def __init__(self, db="prices.db"):
self.conn = sqlite3.connect(db)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS prices (
id INTEGER PRIMARY KEY,
product_key TEXT,
title TEXT,
price REAL,
retailer TEXT,
url TEXT,
date TEXT
)
""")
self.conn.commit()
def record(self, products_df):
"""Save current prices."""
today = date.today().isoformat()
for _, row in products_df.iterrows():
key = f"{row['retailer']}:{row['title'][:50]}"
self.conn.execute(
"INSERT INTO prices VALUES (NULL,?,?,?,?,?,?)",
(key, row["title"], row["price"],
row["retailer"], row.get("url", ""), today)
)
self.conn.commit()
def is_good_deal(self, product_key, current_price):
"""Check if current price is historically low."""
cursor = self.conn.execute(
"SELECT MIN(price), AVG(price) FROM prices WHERE product_key = ?",
(product_key,)
)
row = cursor.fetchone()
if row[0] is None:
return False, {}
min_price, avg_price = row
return current_price <= min_price, {
"current": current_price,
"historical_low": min_price,
"average": round(avg_price, 2),
"discount_vs_avg": round((1 - current_price / avg_price) * 100, 1)
}
Personalized Deal Scoring
class DealPersonalizer:
def __init__(self):
self.preferences = {}
def set_preferences(self, categories=None, max_price=None,
brands=None, min_discount=10):
"""Set user preferences for deal filtering."""
self.preferences = {
"categories": categories or [],
"max_price": max_price,
"brands": brands or [],
"min_discount": min_discount
}
def score_deal(self, product, price_history):
"""Score a deal based on preferences and price history."""
score = 0
# Price history score (0-40 points)
if price_history.get("discount_vs_avg", 0) > 0:
score += min(price_history["discount_vs_avg"], 40)
# Brand preference (0-20 points)
for brand in self.preferences.get("brands", []):
if brand.lower() in product["title"].lower():
score += 20
break
# Under budget (0-20 points)
max_price = self.preferences.get("max_price")
if max_price and product["price"] <= max_price:
score += 20
# Historical low (0-20 points)
if price_history.get("current") == price_history.get("historical_low"):
score += 20
return score
Putting It All Together
def daily_deal_check(watchlist, finder, tracker, personalizer):
"""Run daily deal check and alert on good finds."""
great_deals = []
for query in watchlist:
results = finder.search_all(query)
tracker.record(results)
for _, product in results.iterrows():
key = f"{product['retailer']}:{product['title'][:50]}"
is_low, history = tracker.is_good_deal(key, product["price"])
score = personalizer.score_deal(product, history)
if score >= 50: # Only alert on high-scoring deals
great_deals.append({
**product.to_dict(),
"score": score,
"history": history
})
if great_deals:
print(f"\nFound {len(great_deals)} great deals!")
for deal in sorted(great_deals, key=lambda d: -d["score"]):
print(f" [{deal['score']}] {deal['title'][:60]}")
print(f" ${deal['price']} at {deal['retailer']}")
if deal["history"]:
print(f" {deal['history']['discount_vs_avg']}% below average")
return great_deals
Scaling to 50+ Sites
For scraping dozens of retail sites reliably, use ThorData for residential proxy rotation and ScrapeOps to monitor which scrapers are working. Different retailers need different approaches: some work with simple HTTP requests, others need full JavaScript rendering.
Conclusion
A personalized deal finder saves real money by automating the tedious work of checking multiple retailers. The key differentiator is price history, as it separates genuine deals from fake "sales" where the price was raised before being "discounted." Start with 3-5 retailers, build up price history for a month, then expand your coverage.
The best deals are the ones you don't have to hunt for.
Top comments (0)