Commodity prices move fast — oil, gold, copper, wheat. Traders, analysts, and supply chain managers need real-time visibility. Here's how to build an automated commodity price tracker that scrapes, stores, and alerts on price movements.
Data Sources
- Trading Economics: Comprehensive commodity overview pages
- Investing.com: Real-time futures prices
- CME Group: Official exchange data for major commodities
- World Bank: Monthly commodity price indices (Pink Sheet)
Building the Price Scraper
# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
ScraperAPI handles the JavaScript rendering needed for real-time price widgets on financial sites.
Scraping Trading Economics
def scrape_trading_economics(self, category="metals"):
url = f"https://tradingeconomics.com/commodities/{category}"
soup = self.scrape(url)
commodities = []
table = soup.find("table", class_=lambda c: c and "table" in str(c))
if not table:
return commodities
for row in table.find_all("tr")[1:]:
cols = row.find_all("td")
if len(cols) >= 5:
name = cols[0].get_text(strip=True)
price = self.parse_number(cols[1].get_text(strip=True))
change = self.parse_number(cols[4].get_text(strip=True))
if price:
commodities.append({
"name": name,
"price": price,
"change_pct": change,
"source": "tradingeconomics",
})
return commodities
@staticmethod
def parse_number(text):
cleaned = re.sub(r"[^\d.\-]", "", text)
try:
return float(cleaned)
except ValueError:
return None
Recording and Change Detection
def record_prices(self, commodities):
alerts = []
now = datetime.utcnow().isoformat()
for c in commodities:
# Get previous price
cursor = self.db.execute(
"SELECT price FROM prices WHERE commodity = ? ORDER BY id DESC LIMIT 1",
(c["name"],)
)
prev = cursor.fetchone()
# Store new price
self.db.execute(
"INSERT INTO prices (commodity, price, change_pct, source, scraped_at) "
"VALUES (?, ?, ?, ?, ?)",
(c["name"], c["price"], c.get("change_pct"), c.get("source"), now)
)
# Check for significant movement
if prev and prev[0]:
movement = ((c["price"] - prev[0]) / prev[0]) * 100
if abs(movement) > 2: # 2% threshold
alerts.append({
"commodity": c["name"],
"prev_price": prev[0],
"new_price": c["price"],
"movement_pct": round(movement, 2),
})
self.db.commit()
return alerts
Multi-Source Price Aggregation
def aggregate_prices(self):
"""Scrape from multiple sources and average"""
categories = ["metals", "energy", "grains", "softs"]
all_commodities = []
for cat in categories:
prices = self.scrape_trading_economics(cat)
all_commodities.extend(prices)
time.sleep(3) # Be respectful
# Deduplicate and average if same commodity from multiple sources
aggregated = {}
for c in all_commodities:
name = c["name"]
if name not in aggregated:
aggregated[name] = []
aggregated[name].append(c["price"])
return [{
"name": name,
"price": sum(prices) / len(prices),
"sources": len(prices),
} for name, prices in aggregated.items()]
Historical Trend Analysis
def get_price_history(self, commodity, days=30):
cursor = self.db.execute("""
SELECT price, scraped_at FROM prices
WHERE commodity = ?
AND scraped_at > datetime('now', ?)
ORDER BY scraped_at
""", (commodity, f"-{days} days"))
return [{
"price": row[0],
"timestamp": row[1]
} for row in cursor.fetchall()]
def calculate_volatility(self, commodity, days=30):
history = self.get_price_history(commodity, days)
if len(history) < 2:
return None
prices = [h["price"] for h in history if h["price"]]
returns = [(prices[i] - prices[i-1]) / prices[i-1]
for i in range(1, len(prices))]
import statistics
return {
"commodity": commodity,
"volatility": round(statistics.stdev(returns) * 100, 2),
"min_price": min(prices),
"max_price": max(prices),
"current": prices[-1],
"period_days": days,
}
Alert System
import smtplib
from email.mime.text import MIMEText
def send_commodity_alert(alerts, email_to):
if not alerts:
return
lines = ["Commodity Price Alert\n"]
for a in alerts:
direction = "UP" if a["movement_pct"] > 0 else "DOWN"
lines.append(
f"{a['commodity']}: {direction} {abs(a['movement_pct']):.1f}% "
f"(${a['prev_price']:.2f} → ${a['new_price']:.2f})"
)
msg = MIMEText("\n".join(lines))
msg["Subject"] = f"Commodity Alert: {len(alerts)} significant movements"
msg["To"] = email_to
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login("your-email", "app-password")
server.send_message(msg)
Running 24/7
import schedule
def price_check_cycle():
tracker = CommodityTracker(API_KEY)
prices = tracker.aggregate_prices()
alerts = tracker.record_prices(prices)
if alerts:
send_commodity_alert(alerts, "trader@example.com")
print(f"{len(alerts)} alerts sent")
else:
print(f"Tracked {len(prices)} commodities — no alerts")
# Check every 2 hours during market hours
schedule.every(2).hours.do(price_check_cycle)
while True:
schedule.run_pending()
time.sleep(60)
Scaling and Reliability
Financial data scraping requires reliability. ThorData residential proxies prevent IP blocks from financial sites. ScrapeOps monitors success rates and alerts when extraction patterns break due to site changes.
Commodity price tracking is essential for traders, procurement teams, and economic analysts. This Python system handles multi-source scraping, change detection, historical storage, and real-time alerting — a complete monitoring solution.
Happy scraping!
Top comments (0)