DEV Community

Propfirmkey
Propfirmkey

Posted on

WebSocket Real-Time Futures Data Processing in Node.js

Processing real-time futures market data requires efficient WebSocket handling. Let's build a production-ready Node.js service that connects to a futures data feed, processes tick data, and computes OHLCV bars in real time.

Architecture

WebSocket Feed -> Parser -> Aggregator -> OHLCV Builder -> Event Emitter
Enter fullscreen mode Exit fullscreen mode

The WebSocket Client

const WebSocket = require('ws');
const EventEmitter = require('events');

class FuturesDataFeed extends EventEmitter {
  constructor(config) {
    super();
    this.url = config.url;
    this.apiKey = config.apiKey;
    this.symbols = config.symbols || ['ES', 'NQ'];
    this.ws = null;
    this.reconnectDelay = 1000;
    this.maxReconnectDelay = 30000;
    this.bars = new Map();
    this.barInterval = config.barInterval || 60000;
  }

  connect() {
    console.log(`Connecting to ${this.url}...`);
    this.ws = new WebSocket(this.url, {
      headers: { 'Authorization': `Bearer ${this.apiKey}` }
    });

    this.ws.on('open', () => {
      console.log('Connected to futures data feed');
      this.reconnectDelay = 1000;
      this.subscribe(this.symbols);
    });

    this.ws.on('message', (raw) => {
      try {
        const data = JSON.parse(raw);
        this.handleMessage(data);
      } catch (err) {
        console.error('Parse error:', err.message);
      }
    });

    this.ws.on('close', (code) => {
      console.log(`Disconnected (${code}). Reconnecting in ${this.reconnectDelay}ms...`);
      setTimeout(() => this.connect(), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
    });

    this.ws.on('error', (err) => console.error('WebSocket error:', err.message));
  }

  subscribe(symbols) {
    this.ws.send(JSON.stringify({
      action: 'subscribe',
      symbols: symbols,
      channels: ['trades', 'quotes']
    }));
  }

  handleMessage(data) {
    switch (data.type) {
      case 'trade': this.processTrade(data); break;
      case 'quote': this.emit('quote', data); break;
    }
  }

  processTrade(trade) {
    const { symbol, price, volume, timestamp } = trade;
    this.emit('trade', trade);
    this.updateBar(symbol, price, volume, timestamp);
  }

  updateBar(symbol, price, volume, timestamp) {
    const barStart = Math.floor(timestamp / this.barInterval) * this.barInterval;
    if (!this.bars.has(symbol) || this.bars.get(symbol).timestamp !== barStart) {
      if (this.bars.has(symbol)) this.emit('bar', this.bars.get(symbol));
      this.bars.set(symbol, {
        symbol, timestamp: barStart,
        open: price, high: price, low: price, close: price,
        volume, trades: 1,
      });
    } else {
      const bar = this.bars.get(symbol);
      bar.high = Math.max(bar.high, price);
      bar.low = Math.min(bar.low, price);
      bar.close = price;
      bar.volume += volume;
      bar.trades += 1;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Tick Data Aggregator with Rolling Stats

class TickAggregator {
  constructor(windowSize = 100) {
    this.windowSize = windowSize;
    this.ticks = new Map();
  }

  addTick(symbol, price, volume) {
    if (!this.ticks.has(symbol)) this.ticks.set(symbol, []);
    const buffer = this.ticks.get(symbol);
    buffer.push({ price, volume, time: Date.now() });
    if (buffer.length > this.windowSize) buffer.shift();
  }

  getStats(symbol) {
    const buffer = this.ticks.get(symbol);
    if (!buffer || buffer.length === 0) return null;
    const prices = buffer.map(t => t.price);
    const volumes = buffer.map(t => t.volume);
    const mean = prices.reduce((a, b) => a + b, 0) / prices.length;
    const variance = prices.reduce((sum, p) => sum + (p - mean) ** 2, 0) / prices.length;
    return {
      symbol, last: prices[prices.length - 1],
      mean: mean.toFixed(2),
      std: Math.sqrt(variance).toFixed(4),
      vwap: (prices.reduce((sum, p, i) => sum + p * volumes[i], 0) / volumes.reduce((a, b) => a + b, 0)).toFixed(2),
      totalVolume: volumes.reduce((a, b) => a + b, 0),
      tickCount: buffer.length,
    };
  }
}
Enter fullscreen mode Exit fullscreen mode

Putting It Together

const feed = new FuturesDataFeed({
  url: 'wss://your-data-feed.com/ws',
  apiKey: process.env.DATA_FEED_KEY,
  symbols: ['ES', 'NQ', 'CL'],
  barInterval: 60000,
});

const aggregator = new TickAggregator(200);

feed.on('trade', (trade) => {
  aggregator.addTick(trade.symbol, trade.price, trade.volume);
});

feed.on('bar', (bar) => {
  console.log(`[BAR] ${bar.symbol} O:${bar.open} H:${bar.high} L:${bar.low} C:${bar.close} V:${bar.volume}`);
  const stats = aggregator.getStats(bar.symbol);
  if (stats) console.log(`  VWAP: ${stats.vwap} | Std: ${stats.std}`);
});

feed.connect();
Enter fullscreen mode Exit fullscreen mode

Performance Tips

  1. Use binary WebSocket frames when available -- JSON parsing is expensive at high tick rates
  2. Batch emit events instead of per-tick to reduce event loop overhead
  3. Use SharedArrayBuffer for multi-threaded aggregation on Node.js worker threads

For traders building systems to use with funded accounts, choosing a firm that supports API-friendly platforms is key. PropFirmKey lists which platforms and data feeds each firm supports -- for example, Alpha Futures offers connectivity through major platforms that support WebSocket data feeds.

Top comments (0)