DEV Community

Propfirmkey
Propfirmkey

Posted on

Event-Driven Architecture for Trading Systems

Most trading systems start as simple scripts and grow into unmaintainable spaghetti. Event-driven architecture prevents this.

Why Event-Driven?

In trading, things happen asynchronously:

  • Market data arrives
  • Signals are generated
  • Orders are placed
  • Fills are received
  • Risk limits are checked

Trying to handle all of this in a sequential loop creates tight coupling and makes testing impossible.

The Event Bus

from collections import defaultdict
from dataclasses import dataclass, field
from typing import Callable, Any
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

@dataclass
class Event:
    type: str
    data: dict
    timestamp: datetime = field(default_factory=datetime.now)

class EventBus:
    def __init__(self):
        self.handlers = defaultdict(list)
        self.event_log = []

    def subscribe(self, event_type: str, handler: Callable):
        self.handlers[event_type].append(handler)

    def publish(self, event: Event):
        self.event_log.append(event)
        for handler in self.handlers.get(event.type, []):
            try:
                handler(event)
            except Exception as e:
                logger.error(f"Handler error for {event.type}: {e}")
Enter fullscreen mode Exit fullscreen mode

Components as Event Handlers

class MarketDataFeed:
    def __init__(self, bus: EventBus):
        self.bus = bus

    def on_tick(self, symbol, price, volume):
        self.bus.publish(Event('TICK', {
            'symbol': symbol,
            'price': price,
            'volume': volume
        }))

    def on_bar(self, symbol, ohlcv):
        self.bus.publish(Event('BAR', {
            'symbol': symbol,
            **ohlcv
        }))


class SignalGenerator:
    def __init__(self, bus: EventBus):
        self.bus = bus
        self.bus.subscribe('BAR', self.on_bar)
        self.prices = defaultdict(list)

    def on_bar(self, event):
        symbol = event.data['symbol']
        close = event.data['close']
        self.prices[symbol].append(close)

        if len(self.prices[symbol]) >= 20:
            ma20 = sum(self.prices[symbol][-20:]) / 20
            if close > ma20 and self.prices[symbol][-2] <= ma20:
                self.bus.publish(Event('SIGNAL', {
                    'symbol': symbol,
                    'direction': 'long',
                    'price': close,
                    'reason': 'MA20 crossover'
                }))


class RiskManager:
    def __init__(self, bus: EventBus, max_risk=0.02):
        self.bus = bus
        self.max_risk = max_risk
        self.daily_pnl = 0
        self.positions = {}
        self.bus.subscribe('SIGNAL', self.on_signal)
        self.bus.subscribe('FILL', self.on_fill)

    def on_signal(self, event):
        # Check risk limits before passing to execution
        if self.daily_pnl < -self.max_risk * 100000:
            logger.warning("Daily loss limit — signal rejected")
            return

        self.bus.publish(Event('ORDER', {
            **event.data,
            'quantity': self.calculate_size(event.data)
        }))

    def calculate_size(self, signal_data):
        return 1  # Simplified

    def on_fill(self, event):
        self.daily_pnl += event.data.get('pnl', 0)


class ExecutionEngine:
    def __init__(self, bus: EventBus):
        self.bus = bus
        self.bus.subscribe('ORDER', self.on_order)

    def on_order(self, event):
        # Execute order (paper or live)
        fill_price = event.data['price']  # Simplified
        self.bus.publish(Event('FILL', {
            'symbol': event.data['symbol'],
            'price': fill_price,
            'quantity': event.data['quantity']
        }))
Enter fullscreen mode Exit fullscreen mode

Wiring It Together

def create_trading_system():
    bus = EventBus()

    feed = MarketDataFeed(bus)
    signals = SignalGenerator(bus)
    risk = RiskManager(bus)
    execution = ExecutionEngine(bus)

    # Add logging
    bus.subscribe('SIGNAL', lambda e: logger.info(f"Signal: {e.data}"))
    bus.subscribe('ORDER', lambda e: logger.info(f"Order: {e.data}"))
    bus.subscribe('FILL', lambda e: logger.info(f"Fill: {e.data}"))

    return bus, feed
Enter fullscreen mode Exit fullscreen mode

Benefits

  1. Testability — test each component in isolation
  2. Extensibility — add new features by subscribing to events
  3. Debugging — event log shows exactly what happened and when
  4. Flexibility — swap components without touching others

Event Replay for Debugging

class EventReplayer:
    def __init__(self, bus: EventBus):
        self.bus = bus

    def replay(self, events, speed=1.0):
        for i, event in enumerate(events):
            if i > 0:
                delay = (event.timestamp - events[i-1].timestamp).total_seconds()
                time.sleep(delay / speed)
            self.bus.publish(event)
Enter fullscreen mode Exit fullscreen mode

This architecture scales from a simple paper trading system to a production system handling thousands of events per second. The same patterns apply whether you're building personal tools or systems that integrate with multiple trading platforms. For an overview of what different platforms offer, propfirmkey.com covers 30+ firms.


What architecture pattern does your trading system use?

Top comments (0)