Real-time market data is essential for active traders. Here's how to build a lightweight dashboard using Python, WebSockets, and a simple web frontend.
Architecture
Market Data Provider (WebSocket)
β
Python Backend (asyncio + websockets)
β
Browser Dashboard (EventSource/SSE)
Backend: WebSocket Consumer
import asyncio
import websockets
import json
from collections import defaultdict
class MarketDataHandler:
def __init__(self):
self.latest_prices = {}
self.subscribers = set()
async def connect_feed(self, url, symbols):
async with websockets.connect(url) as ws:
# Subscribe to symbols
await ws.send(json.dumps({
'action': 'subscribe',
'symbols': symbols
}))
async for message in ws:
data = json.loads(message)
symbol = data.get('symbol')
if symbol:
self.latest_prices[symbol] = {
'price': data['price'],
'bid': data.get('bid'),
'ask': data.get('ask'),
'volume': data.get('volume', 0),
'timestamp': data['timestamp']
}
await self.notify_subscribers(symbol, self.latest_prices[symbol])
async def notify_subscribers(self, symbol, data):
dead = set()
for queue in self.subscribers:
try:
queue.put_nowait({'symbol': symbol, **data})
except asyncio.QueueFull:
dead.add(queue)
self.subscribers -= dead
Adding Alert Logic
class PriceAlerts:
def __init__(self):
self.alerts = []
def add_alert(self, symbol, condition, threshold, callback):
self.alerts.append({
'symbol': symbol,
'condition': condition, # 'above' or 'below'
'threshold': threshold,
'callback': callback,
'triggered': False
})
def check(self, symbol, price):
for alert in self.alerts:
if alert['symbol'] != symbol or alert['triggered']:
continue
if alert['condition'] == 'above' and price > alert['threshold']:
alert['callback'](symbol, price, alert['threshold'])
alert['triggered'] = True
elif alert['condition'] == 'below' and price < alert['threshold']:
alert['callback'](symbol, price, alert['threshold'])
alert['triggered'] = True
Spread Monitor
class SpreadMonitor:
def __init__(self, window_size=100):
self.spreads = defaultdict(list)
self.window = window_size
def update(self, symbol, bid, ask):
spread = ask - bid
spread_pct = (spread / ask) * 100
self.spreads[symbol].append(spread_pct)
if len(self.spreads[symbol]) > self.window:
self.spreads[symbol] = self.spreads[symbol][-self.window:]
avg_spread = sum(self.spreads[symbol]) / len(self.spreads[symbol])
# Alert if spread is 2x normal
if spread_pct > avg_spread * 2:
return {'alert': 'wide_spread', 'current': spread_pct, 'average': avg_spread}
return None
Simple HTTP Server for Dashboard
from http.server import HTTPServer, SimpleHTTPRequestHandler
import threading
def serve_dashboard(port=8080):
handler = SimpleHTTPRequestHandler
server = HTTPServer(('', port), handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
print(f'Dashboard: http://localhost:{port}')
Data Storage
import sqlite3
from datetime import datetime
class TickDatabase:
def __init__(self, db_path='ticks.db'):
self.conn = sqlite3.connect(db_path)
self.conn.execute('''CREATE TABLE IF NOT EXISTS ticks (
timestamp TEXT, symbol TEXT,
price REAL, bid REAL, ask REAL, volume INTEGER
)''')
def store(self, symbol, data):
self.conn.execute(
'INSERT INTO ticks VALUES (?, ?, ?, ?, ?, ?)',
(datetime.now().isoformat(), symbol,
data['price'], data.get('bid'), data.get('ask'), data.get('volume', 0))
)
if random.random() < 0.01: # Commit every ~100 ticks
self.conn.commit()
Use Cases
- Monitor multiple instruments simultaneously
- Track spread widening during news events
- Log tick data for later analysis
- Set price alerts without keeping a chart open
Tools like this help you understand market behavior across different instruments and conditions. If you're evaluating which markets to trade, propfirmkey.com has detailed breakdowns of what instruments each firm offers.
What's your market data setup? Are you using commercial feeds or free alternatives?
Top comments (0)