Processing real-time futures market data requires efficient WebSocket handling. Let's build a production-ready Node.js service that connects to a futures data feed, processes tick data, and computes OHLCV bars in real time.
Architecture
WebSocket Feed -> Parser -> Aggregator -> OHLCV Builder -> Event Emitter
The WebSocket Client
const WebSocket = require('ws');
const EventEmitter = require('events');
class FuturesDataFeed extends EventEmitter {
constructor(config) {
super();
this.url = config.url;
this.apiKey = config.apiKey;
this.symbols = config.symbols || ['ES', 'NQ'];
this.ws = null;
this.reconnectDelay = 1000;
this.maxReconnectDelay = 30000;
this.bars = new Map();
this.barInterval = config.barInterval || 60000;
}
connect() {
console.log(`Connecting to ${this.url}...`);
this.ws = new WebSocket(this.url, {
headers: { 'Authorization': `Bearer ${this.apiKey}` }
});
this.ws.on('open', () => {
console.log('Connected to futures data feed');
this.reconnectDelay = 1000;
this.subscribe(this.symbols);
});
this.ws.on('message', (raw) => {
try {
const data = JSON.parse(raw);
this.handleMessage(data);
} catch (err) {
console.error('Parse error:', err.message);
}
});
this.ws.on('close', (code) => {
console.log(`Disconnected (${code}). Reconnecting in ${this.reconnectDelay}ms...`);
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
});
this.ws.on('error', (err) => console.error('WebSocket error:', err.message));
}
subscribe(symbols) {
this.ws.send(JSON.stringify({
action: 'subscribe',
symbols: symbols,
channels: ['trades', 'quotes']
}));
}
handleMessage(data) {
switch (data.type) {
case 'trade': this.processTrade(data); break;
case 'quote': this.emit('quote', data); break;
}
}
processTrade(trade) {
const { symbol, price, volume, timestamp } = trade;
this.emit('trade', trade);
this.updateBar(symbol, price, volume, timestamp);
}
updateBar(symbol, price, volume, timestamp) {
const barStart = Math.floor(timestamp / this.barInterval) * this.barInterval;
if (!this.bars.has(symbol) || this.bars.get(symbol).timestamp !== barStart) {
if (this.bars.has(symbol)) this.emit('bar', this.bars.get(symbol));
this.bars.set(symbol, {
symbol, timestamp: barStart,
open: price, high: price, low: price, close: price,
volume, trades: 1,
});
} else {
const bar = this.bars.get(symbol);
bar.high = Math.max(bar.high, price);
bar.low = Math.min(bar.low, price);
bar.close = price;
bar.volume += volume;
bar.trades += 1;
}
}
}
Tick Data Aggregator with Rolling Stats
class TickAggregator {
constructor(windowSize = 100) {
this.windowSize = windowSize;
this.ticks = new Map();
}
addTick(symbol, price, volume) {
if (!this.ticks.has(symbol)) this.ticks.set(symbol, []);
const buffer = this.ticks.get(symbol);
buffer.push({ price, volume, time: Date.now() });
if (buffer.length > this.windowSize) buffer.shift();
}
getStats(symbol) {
const buffer = this.ticks.get(symbol);
if (!buffer || buffer.length === 0) return null;
const prices = buffer.map(t => t.price);
const volumes = buffer.map(t => t.volume);
const mean = prices.reduce((a, b) => a + b, 0) / prices.length;
const variance = prices.reduce((sum, p) => sum + (p - mean) ** 2, 0) / prices.length;
return {
symbol, last: prices[prices.length - 1],
mean: mean.toFixed(2),
std: Math.sqrt(variance).toFixed(4),
vwap: (prices.reduce((sum, p, i) => sum + p * volumes[i], 0) / volumes.reduce((a, b) => a + b, 0)).toFixed(2),
totalVolume: volumes.reduce((a, b) => a + b, 0),
tickCount: buffer.length,
};
}
}
Putting It Together
const feed = new FuturesDataFeed({
url: 'wss://your-data-feed.com/ws',
apiKey: process.env.DATA_FEED_KEY,
symbols: ['ES', 'NQ', 'CL'],
barInterval: 60000,
});
const aggregator = new TickAggregator(200);
feed.on('trade', (trade) => {
aggregator.addTick(trade.symbol, trade.price, trade.volume);
});
feed.on('bar', (bar) => {
console.log(`[BAR] ${bar.symbol} O:${bar.open} H:${bar.high} L:${bar.low} C:${bar.close} V:${bar.volume}`);
const stats = aggregator.getStats(bar.symbol);
if (stats) console.log(` VWAP: ${stats.vwap} | Std: ${stats.std}`);
});
feed.connect();
Performance Tips
- Use binary WebSocket frames when available -- JSON parsing is expensive at high tick rates
- Batch emit events instead of per-tick to reduce event loop overhead
- Use SharedArrayBuffer for multi-threaded aggregation on Node.js worker threads
For traders building systems to use with funded accounts, choosing a firm that supports API-friendly platforms is key. PropFirmKey lists which platforms and data feeds each firm supports -- for example, Alpha Futures offers connectivity through major platforms that support WebSocket data feeds.
Top comments (0)