Commodity prices move fast — oil, gold, copper, wheat. Traders, analysts, and supply chain managers need real-time visibility. Here's how to build an automated commodity price tracker that scrapes, stores, and alerts on price movements.
Data Sources
- Trading Economics: Comprehensive commodity overview pages
- Investing.com: Real-time futures prices
- CME Group: Official exchange data for major commodities
- World Bank: Monthly commodity price indices (Pink Sheet)
Building the Price Scraper
import requests
from bs4 import BeautifulSoup
import re
import json
from datetime import datetime
import sqlite3
import time
API_KEY = "YOUR_SCRAPERAPI_KEY"
class CommodityTracker:
def __init__(self, api_key, db_path="commodities.db"):
self.api_key = api_key
self.db = sqlite3.connect(db_path)
self.db.execute("""
CREATE TABLE IF NOT EXISTS prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
commodity TEXT, price REAL,
change_pct REAL, unit TEXT,
source TEXT, scraped_at TEXT
)
""")
def scrape(self, url, render=True):
params = {
"api_key": self.api_key,
"url": url,
"render": "true" if render else "false"
}
resp = requests.get(
"https://api.scraperapi.com", params=params, timeout=60
)
return BeautifulSoup(resp.text, "html.parser")
ScraperAPI handles the JavaScript rendering needed for real-time price widgets on financial sites.
Scraping Trading Economics
def scrape_trading_economics(self, category="metals"):
url = f"https://tradingeconomics.com/commodities/{category}"
soup = self.scrape(url)
commodities = []
table = soup.find("table", class_=lambda c: c and "table" in str(c))
if not table:
return commodities
for row in table.find_all("tr")[1:]:
cols = row.find_all("td")
if len(cols) >= 5:
name = cols[0].get_text(strip=True)
price = self.parse_number(cols[1].get_text(strip=True))
change = self.parse_number(cols[4].get_text(strip=True))
if price:
commodities.append({
"name": name,
"price": price,
"change_pct": change,
"source": "tradingeconomics",
})
return commodities
@staticmethod
def parse_number(text):
cleaned = re.sub(r"[^\d.\-]", "", text)
try:
return float(cleaned)
except ValueError:
return None
Recording and Change Detection
def record_prices(self, commodities):
alerts = []
now = datetime.utcnow().isoformat()
for c in commodities:
# Get previous price
cursor = self.db.execute(
"SELECT price FROM prices WHERE commodity = ? ORDER BY id DESC LIMIT 1",
(c["name"],)
)
prev = cursor.fetchone()
# Store new price
self.db.execute(
"INSERT INTO prices (commodity, price, change_pct, source, scraped_at) "
"VALUES (?, ?, ?, ?, ?)",
(c["name"], c["price"], c.get("change_pct"), c.get("source"), now)
)
# Check for significant movement
if prev and prev[0]:
movement = ((c["price"] - prev[0]) / prev[0]) * 100
if abs(movement) > 2: # 2% threshold
alerts.append({
"commodity": c["name"],
"prev_price": prev[0],
"new_price": c["price"],
"movement_pct": round(movement, 2),
})
self.db.commit()
return alerts
Multi-Source Price Aggregation
def aggregate_prices(self):
"""Scrape from multiple sources and average"""
categories = ["metals", "energy", "grains", "softs"]
all_commodities = []
for cat in categories:
prices = self.scrape_trading_economics(cat)
all_commodities.extend(prices)
time.sleep(3) # Be respectful
# Deduplicate and average if same commodity from multiple sources
aggregated = {}
for c in all_commodities:
name = c["name"]
if name not in aggregated:
aggregated[name] = []
aggregated[name].append(c["price"])
return [{
"name": name,
"price": sum(prices) / len(prices),
"sources": len(prices),
} for name, prices in aggregated.items()]
Historical Trend Analysis
def get_price_history(self, commodity, days=30):
cursor = self.db.execute("""
SELECT price, scraped_at FROM prices
WHERE commodity = ?
AND scraped_at > datetime('now', ?)
ORDER BY scraped_at
""", (commodity, f"-{days} days"))
return [{
"price": row[0],
"timestamp": row[1]
} for row in cursor.fetchall()]
def calculate_volatility(self, commodity, days=30):
history = self.get_price_history(commodity, days)
if len(history) < 2:
return None
prices = [h["price"] for h in history if h["price"]]
returns = [(prices[i] - prices[i-1]) / prices[i-1]
for i in range(1, len(prices))]
import statistics
return {
"commodity": commodity,
"volatility": round(statistics.stdev(returns) * 100, 2),
"min_price": min(prices),
"max_price": max(prices),
"current": prices[-1],
"period_days": days,
}
Alert System
import smtplib
from email.mime.text import MIMEText
def send_commodity_alert(alerts, email_to):
if not alerts:
return
lines = ["Commodity Price Alert\n"]
for a in alerts:
direction = "UP" if a["movement_pct"] > 0 else "DOWN"
lines.append(
f"{a['commodity']}: {direction} {abs(a['movement_pct']):.1f}% "
f"(${a['prev_price']:.2f} → ${a['new_price']:.2f})"
)
msg = MIMEText("\n".join(lines))
msg["Subject"] = f"Commodity Alert: {len(alerts)} significant movements"
msg["To"] = email_to
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login("your-email", "app-password")
server.send_message(msg)
Running 24/7
import schedule
def price_check_cycle():
tracker = CommodityTracker(API_KEY)
prices = tracker.aggregate_prices()
alerts = tracker.record_prices(prices)
if alerts:
send_commodity_alert(alerts, "trader@example.com")
print(f"{len(alerts)} alerts sent")
else:
print(f"Tracked {len(prices)} commodities — no alerts")
# Check every 2 hours during market hours
schedule.every(2).hours.do(price_check_cycle)
while True:
schedule.run_pending()
time.sleep(60)
Scaling and Reliability
Financial data scraping requires reliability. ThorData residential proxies prevent IP blocks from financial sites. ScrapeOps monitors success rates and alerts when extraction patterns break due to site changes.
Commodity price tracking is essential for traders, procurement teams, and economic analysts. This Python system handles multi-source scraping, change detection, historical storage, and real-time alerting — a complete monitoring solution.
Happy scraping!
Top comments (0)