DEV Community

Cover image for Bybit Grid Trading Bot in Python: Architecture and Risk
Iurii Rogulia
Iurii Rogulia

Posted on • Originally published at iurii.rogulia.fi

Bybit Grid Trading Bot in Python: Architecture and Risk

Grid trading is one of those strategies that sounds simple until you implement it in live markets. The concept: place buy orders below the current price at fixed intervals, place sell orders above at the same intervals. When price moves down, you fill buys. When it moves back up, you fill sells. The spread between grid levels is your profit.

In practice, you're dealing with a futures exchange, perpetual funding rates, partial fills, exchange API quirks, crash recovery when your VPS restarts at 3am, and the specific category of bugs that only appear when two concurrent events arrive 50 milliseconds apart.

I've been running this system in production on Bybit since October 2025. It's at v2.12.0 now, with 248 commits and ~43,770 lines of Python. This article covers the engineering decisions — architecture, state management, risk control, and testing. Not the trading strategy specifics. The full project write-up is in the Perpetual Futures Grid Trading System card.

slug="mvp-development"
text="Building financial systems or event-driven SaaS from scratch? That's exactly the kind of greenfield project I take on end-to-end."
/>

Why Python, Not TypeScript

My default for most production systems is TypeScript. But for this project I chose Python, and I'd make the same choice again.

The pybit library is the best maintained Bybit SDK in any language — the Bybit team maintains it officially and it's updated immediately when the API changes. The pandas ecosystem is genuinely better for time-series analysis: calculating ATR (Average True Range) from OHLCV data in pandas is 10 lines; doing the same in TypeScript without a library is painful. And Python's threading model — explicit threading.Thread with RLock and Queue — makes concurrent state management readable in a way that JavaScript's event loop requires more ceremony for.

System Architecture

MultiAccountBot (orchestrator)
  → TradingAccount[] (per account, fully isolated)
      → GridStrategy[] (per symbol)
          → StateManager, OrderExecutor, EventQueue, TPCalculator
      → BybitClient (REST), BalanceManager, ProfitProtectionManager
  → PublicWebSocket[] (shared per symbol — minimize exchange connections)
  → PrivateWebSocket[] (per account)
Enter fullscreen mode Exit fullscreen mode

The design is intentionally multi-tenant from the start. Each TradingAccount is fully isolated: separate log files, separate state files, separate emergency stop flags. But PublicWebSocket connections to price feeds are shared across accounts trading the same symbol. If I'm running three accounts all trading BTCUSDT, they share one WebSocket subscription to the BTCUSDT orderbook — three subscriptions would be wasteful and could trigger Bybit's connection limits.

Event-Driven, Not asyncio

I deliberately chose threading over asyncio. This is a financial system where predictability matters more than throughput.

WebSocket callbacks are non-blocking: they immediately put events into a priority queue. A dedicated EventWorker thread processes them sequentially. This means you always know the order of operations.

import heapq
import threading
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedEvent:
    priority: int
    timestamp: float
    event: Any = field(compare=False)

class EventQueue:
    def __init__(self):
        self._queue: list[PrioritizedEvent] = []
        self._lock = threading.RLock()
        self._not_empty = threading.Condition(self._lock)

    def put(self, event: Any, priority: int) -> None:
        with self._not_empty:
            heapq.heappush(
                self._queue,
                PrioritizedEvent(priority=priority, timestamp=time.time(), event=event)
            )
            self._not_empty.notify()

    def get(self, timeout: float = 1.0) -> Any | None:
        with self._not_empty:
            deadline = time.time() + timeout
            while not self._queue:
                remaining = deadline - time.time()
                if remaining <= 0:
                    return None
                self._not_empty.wait(timeout=remaining)
            return heapq.heappop(self._queue).event
Enter fullscreen mode Exit fullscreen mode

Priority 0 = execution events (fills). Priority 1 = order events (new order, cancelled). Priority 2 = position updates. This ordering prevents a critical bug: an order state update arriving before its corresponding fill event could cause the strategy to miscount active orders.

Eight explicit RLock instances guard shared resources across components. No shared mutable state without a lock. In a financial system, a race condition isn't a test failure — it's a wrong trade.

Grid Strategy: Dynamic Steps Based on Volatility

Static grid spacing doesn't work well across market regimes. A fixed $100 spacing is too tight in a high-volatility month (every level fills instantly, you accumulate huge positions) and too wide in a low-volatility month (no fills at all).

The solution: calculate grid step size from ATR (Average True Range) — a measure of how much an asset moves in a typical period.

import pandas as pd
import numpy as np

def calculate_grid_step(
    ohlcv_data: pd.DataFrame,
    config: GridConfig
) -> float:
    """
    Calculate grid step as EMA of daily ATR%, scaled to the trading horizon.

    ohlcv_data: DataFrame with columns [open, high, low, close, volume]
    Returns: step size as percentage of current price
    """
    # True Range: max of (H-L), |H-prev_close|, |L-prev_close|
    high = ohlcv_data['high']
    low = ohlcv_data['low']
    prev_close = ohlcv_data['close'].shift(1)

    tr = pd.concat([
        high - low,
        (high - prev_close).abs(),
        (low - prev_close).abs()
    ], axis=1).max(axis=1)

    # Daily ATR as percentage of close price
    daily_atr_pct = tr / ohlcv_data['close']

    # EMA of ATR% over lookback period
    atr_ema = daily_atr_pct.ewm(span=config.atr_lookback_periods).mean().iloc[-1]

    # Scale to trading horizon (sqrt of time rule)
    # Over N days, expected move scales as sqrt(N) * daily_volatility
    base_step = atr_ema * np.sqrt(config.horizon_days)

    return base_step * config.step_scale_factor
Enter fullscreen mode Exit fullscreen mode

Levels aren't equally spaced — they increase with distance from the current price using a square root scaling:

def calculate_level_steps(
    base_step: float,
    num_levels: int,
    current_price: float
) -> list[float]:
    """
    Level 1: base_step
    Level 2: base_step * sqrt(2)
    Level N: base_step * sqrt(N)

    This concentrates grid levels near the current price (where fills are likely)
    and spaces them wider further out (where fills represent larger moves).
    """
    steps = []
    cumulative = 0.0

    for level in range(1, num_levels + 1):
        level_step = base_step * np.sqrt(level) * current_price
        cumulative += level_step
        steps.append(cumulative)

    return steps
Enter fullscreen mode Exit fullscreen mode

Position sizing uses martingale doubling: qty[level] = base_qty * 2**level. Level 1 buys 0.01 BTC, level 2 buys 0.02, level 3 buys 0.04. The deeper the drawdown, the larger the position — this is how the strategy recovers when price eventually returns.

Both the grid steps and the base quantity are locked after initial calculation. This is critical. If you recalculate grid steps on every restart, you end up with levels that don't match your open positions. The exchange knows your orders by price; your bot knows them by level number. A mismatch between what the exchange holds and what the bot thinks it holds triggers false emergency stops.

State Persistence: Atomic JSON Writes

No database. State is stored in JSON files with atomic writes:

import json
import os
import tempfile
from pathlib import Path

class StateManager:
    def __init__(self, state_path: Path):
        self._path = state_path
        self._lock = threading.RLock()

    def save(self, state: dict) -> None:
        """Atomic write using OS rename — survives crashes mid-write."""
        with self._lock:
            dir_path = self._path.parent
            with tempfile.NamedTemporaryFile(
                mode='w',
                dir=dir_path,
                delete=False,
                suffix='.tmp'
            ) as f:
                json.dump(state, f, indent=2, default=str)
                temp_path = f.name

            # os.replace is atomic on Linux (rename syscall)
            # The old file is never partially written
            os.replace(temp_path, self._path)

    def load(self) -> dict | None:
        with self._lock:
            if not self._path.exists():
                return None
            with open(self._path) as f:
                return json.load(f)
Enter fullscreen mode Exit fullscreen mode

The os.replace atomicity guarantee: on Linux (where this runs under systemd), a rename is atomic at the OS level. Either the old file exists, or the new file exists — there's no intermediate state where neither exists. A crash during json.dump leaves the .tmp file but doesn't corrupt the main state file.

The exchange API is the source of truth on restart. The state file is an optimization — it saves the reconstruction work. If the state file is corrupt or missing, the 6-step recovery workflow reconstructs everything from the exchange.

10-Level Risk Management

The risk management system has 10 distinct protection levels, each with a specific trigger and response:

from enum import IntEnum
from dataclasses import dataclass

class RiskLevel(IntEnum):
    NONE = 0
    HIGH_IM_RATE = 1          # Initial margin rate >= 90%
    HIGH_IM_WITH_PROFIT = 2   # High IM + position is profitable → close now
    TRAILING_STOP = 3         # Lock profit: if price drops X% from peak, close
    POSITION_SIZE_LIMIT = 4   # Position value > 40% of risk limit → block new orders
    POSITION_TIMEOUT = 5      # Position stuck at max level too long → force close
    BOTH_SIDES_PROFITABLE = 6 # Long AND short both profitable → close both
    TP_SAFETY = 7             # Not enough margin to reopen after TP → pause TP
    TP_SAFETY_PRE = 8         # Pre-check: block protective orders if margin insufficient
    LEVEL_RATE_LIMIT = 9      # Budget: max N level-ups per 48h sliding window
    EMERGENCY_STOP = 10       # File-based flag: requires manual reset

@dataclass
class RiskState:
    active_level: RiskLevel = RiskLevel.NONE
    emergency_flag_path: str = ""
    peak_unrealized_pnl: float = 0.0
    level_up_timestamps: list[float] = field(default_factory=list)
Enter fullscreen mode Exit fullscreen mode

Level 7 and 8 need explanation — they're the most non-obvious.

At high grid levels, you've accumulated a large losing position. The take-profit (TP) order exists at your entry average, waiting for price to recover. But here's the problem: if the TP fills, you close the position. That frees up margin. But to continue the strategy, you need to immediately reopen the position (place new grid orders). If the freed margin isn't enough to reopen — because you're deeply leveraged — then taking profit actually kills the strategy.

The solution:

def check_tp_safety(
    self,
    current_position: Position,
    tp_order: Order,
    available_margin: float
) -> bool:
    """
    Returns True if it's safe to keep TP active.
    Returns False if TP would close position but leave insufficient margin to reopen.
    """
    # Estimate margin needed to reopen at current grid level
    required_margin_to_reopen = self._estimate_reopen_margin(
        current_position.level,
        current_position.symbol
    )

    # Margin freed when TP closes position
    margin_freed_by_tp = self._estimate_freed_margin(tp_order)

    projected_available = available_margin + margin_freed_by_tp

    # Hysteresis: 5% buffer to enter safety mode, 10% buffer to exit
    if not self._in_tp_safety_mode:
        safe = projected_available >= required_margin_to_reopen * 1.05
    else:
        safe = projected_available >= required_margin_to_reopen * 1.10

    return safe
Enter fullscreen mode Exit fullscreen mode

If TP safety triggers (Level 7), the protective orders (grid orders below current price) are blocked (Level 8). The position stays open, TP stays active, and you wait for the position to close when price recovers. Counterintuitive — but without this check, the bot would keep averaging down while TP is unreachable.

Level Rate Limiting (Level 9)

Martingale doubling means each level-up doubles your position size. If you level up 8 times, you have 256× your initial position. Level 9 limits how frequently you can level up:

def check_level_rate_limit(
    self,
    max_levels: int,
    current_timestamps: list[float],
    now: float
) -> bool:
    """
    Budget: ceil(max_levels * 0.5) level-ups per 48-hour sliding window.
    Returns True if a new level-up is allowed.
    """
    budget = math.ceil(max_levels * 0.5)
    window_start = now - 48 * 3600  # 48-hour window

    # Count level-ups in the window
    recent_level_ups = [t for t in current_timestamps if t >= window_start]

    return len(recent_level_ups) < budget
Enter fullscreen mode Exit fullscreen mode

If the budget is exhausted, new grid orders below the current deepest level are blocked. The existing position can still TP or be closed by other risk levels, but you can't dig deeper.

pytz and the Timezone Problem

Bybit's API returns timestamps in UTC milliseconds. Python's datetime.now() gives local time, and on a Finnish VPS that might be UTC+2 or UTC+3 depending on DST. Mix these up and your ATR calculations, level rate limit windows, and daily reset logic silently produce wrong results.

The fix: always be explicit about timezones.

import pytz
from datetime import datetime, timezone

UTC = pytz.UTC
HELSINKI = pytz.timezone("Europe/Helsinki")

def parse_bybit_timestamp(ms: int) -> datetime:
    """Convert Bybit millisecond timestamp to timezone-aware UTC datetime."""
    return datetime.fromtimestamp(ms / 1000, tz=timezone.utc)

def get_utc_now() -> datetime:
    """Always use UTC for internal calculations."""
    return datetime.now(tz=timezone.utc)

def is_same_day_utc(ts1: datetime, ts2: datetime) -> bool:
    """Compare days in UTC, regardless of local time."""
    t1 = ts1.astimezone(timezone.utc)
    t2 = ts2.astimezone(timezone.utc)
    return t1.date() == t2.date()
Enter fullscreen mode Exit fullscreen mode

The naive mistake: datetime.now() without a timezone. On a machine in Helsinki during summer, this gives UTC+3. Subtract two such datetimes and you get a correct duration — but compare one to datetime(2026, 4, 19, 0, 0, 0) and you're silently comparing a UTC+3 time to a naive time, getting subtly wrong results.

Always create timezone-aware datetimes. Use pytz.UTC or datetime.timezone.utc consistently. Pass tz=timezone.utc everywhere you construct a datetime.

PyYAML Configuration

The bot is configured via YAML — one file per account:

# config/account-1.yaml
account_id: "account-1"
environment: "mainnet"
api_key: "${BYBIT_API_KEY_1}" # Resolved from environment at startup
api_secret: "${BYBIT_SECRET_1}"

symbols:
  BTCUSDT:
    enabled: true
    base_quantity: 0.001 # BTC per unit at level 1
    max_levels: 8
    horizon_days: 10
    atr_lookback_periods: 10
    step_scale_factor: 0.3
    leverage: 10

  ETHUSDT:
    enabled: false

risk:
  max_position_pct_of_balance: 0.40 # Level 4 trigger
  trailing_stop_drawdown_pct: 0.15 # Level 3 trigger
  level_rate_limit_window_hours: 48 # Level 9 window
Enter fullscreen mode Exit fullscreen mode
# config/loader.py
import yaml
import os
import re
from pathlib import Path

def load_config(config_path: Path) -> dict:
    """Load YAML config with environment variable interpolation."""
    with open(config_path) as f:
        raw = f.read()

    # Replace ${VAR_NAME} with environment variable values
    def replace_env(match: re.Match) -> str:
        var_name = match.group(1)
        value = os.environ.get(var_name)
        if value is None:
            raise ValueError(f"Environment variable {var_name} not set")
        return value

    interpolated = re.sub(r'\$\{([^}]+)\}', replace_env, raw)
    return yaml.safe_load(interpolated)
Enter fullscreen mode Exit fullscreen mode

The ${VAR_NAME} interpolation means API keys stay in environment variables (managed by systemd's EnvironmentFile), not in the YAML files that might end up in version control.

Testing with pytest

The test suite has 280 test functions across 19 files. The challenging parts to test are: components that interact with the Bybit API, components that depend on wall-clock time, and components with complex state machines.

For Bybit API interaction, I use dependency injection — pass the client as an argument rather than importing it:

# tests/test_grid_strategy.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime, timezone

from src.strategy.grid import GridStrategy
from src.models import Order, Position, Symbol

@pytest.fixture
def mock_bybit_client():
    """Mock Bybit REST client — avoids real API calls in tests."""
    client = MagicMock()
    client.get_positions.return_value = {"result": {"list": []}}
    client.place_order.return_value = {"retCode": 0, "result": {"orderId": "test-123"}}
    return client

@pytest.fixture
def grid_strategy(mock_bybit_client):
    config = {
        "base_quantity": 0.001,
        "max_levels": 5,
        "leverage": 10,
    }
    return GridStrategy(
        symbol=Symbol.BTCUSDT,
        config=config,
        client=mock_bybit_client
    )

def test_grid_orders_placed_on_start(grid_strategy, mock_bybit_client):
    """On strategy start, grid orders should be placed below current price."""
    current_price = 50_000.0
    grid_strategy.start(current_price=current_price)

    # Should have placed orders at each grid level
    assert mock_bybit_client.place_order.call_count == grid_strategy.config["max_levels"]

def test_level_up_on_fill(grid_strategy, mock_bybit_client):
    """When a grid order fills, the strategy should place the next level."""
    grid_strategy.start(current_price=50_000.0)
    initial_level = grid_strategy.current_level

    # Simulate a fill event at level 1
    fill_event = {
        "orderId": "level-1-order",
        "side": "Buy",
        "qty": "0.001",
        "execType": "Trade"
    }
    grid_strategy.handle_execution(fill_event)

    assert grid_strategy.current_level == initial_level + 1

def test_emergency_stop_blocks_orders(grid_strategy, tmp_path):
    """After emergency stop, no new orders should be placed."""
    emergency_flag = tmp_path / "emergency_stop"
    emergency_flag.touch()

    grid_strategy._emergency_flag_path = str(emergency_flag)
    grid_strategy.start(current_price=50_000.0)

    # No orders should be placed when emergency stop is active
    assert grid_strategy._mock_bybit_client.place_order.call_count == 0
Enter fullscreen mode Exit fullscreen mode

For time-dependent tests, I inject a clock function:

# src/risk/rate_limiter.py
from typing import Callable
import math
import time

class LevelRateLimiter:
    def __init__(
        self,
        max_levels: int,
        window_hours: int,
        clock: Callable[[], float] = time.time  # Injectable for testing
    ):
        self._budget = math.ceil(max_levels * 0.5)
        self._window_seconds = window_hours * 3600
        self._clock = clock
        self._timestamps: list[float] = []

    def can_level_up(self) -> bool:
        now = self._clock()
        window_start = now - self._window_seconds
        self._timestamps = [t for t in self._timestamps if t >= window_start]
        return len(self._timestamps) < self._budget

    def record_level_up(self) -> None:
        self._timestamps.append(self._clock())

# tests/test_rate_limiter.py
def test_rate_limit_exhausted_after_budget():
    fake_time = [1_000_000.0]  # Start at a fixed timestamp

    limiter = LevelRateLimiter(
        max_levels=8,
        window_hours=48,
        clock=lambda: fake_time[0]
    )

    budget = math.ceil(8 * 0.5)  # 4

    for _ in range(budget):
        assert limiter.can_level_up() is True
        limiter.record_level_up()

    # Budget exhausted
    assert limiter.can_level_up() is False

    # Advance time past the window
    fake_time[0] += 48 * 3600 + 1

    # Budget should reset
    assert limiter.can_level_up() is True
Enter fullscreen mode Exit fullscreen mode

Monitoring in Production

The bot runs as a systemd service on a Vultr VPS. Monitoring is minimal but effective:

Telegram notifications for every significant event: level-up, TP execution, risk level trigger, error requiring attention. Each account has its own Telegram chat.

Structured JSON logs to files, rotated daily. When I need to debug a production incident, grep '"level": "ERROR"' logs/account-1-2026-04-18.json | jq gives me a clean timeline.

Emergency stop file — a file on disk that, when it exists, causes the bot to stop trading and send a Telegram alert. I can create this file from anywhere (SSH, a simple webhook endpoint) to halt trading immediately without deploying code.

def check_emergency_stop(flag_path: str) -> bool:
    """Check if emergency stop flag file exists."""
    return os.path.exists(flag_path)

# In the main event loop
if check_emergency_stop(config.emergency_flag_path):
    logger.critical("Emergency stop flag detected — halting trading")
    telegram.send(f"EMERGENCY STOP: {account_id}")
    break
Enter fullscreen mode Exit fullscreen mode

If you're building algorithmic trading systems and want to talk through architecture decisions — state persistence, risk management, crash recovery, testing concurrent state machines — get in touch.

I'm available for freelance projects and long-term engagements. If you're building something similar and need a technical consultation before committing to an architecture, that's a good starting point.


Related project: Perpetual Futures Grid Trading System — the full project this article is based on.

Related reading: UUID v7 in Production: Why Your Database Hates v4 — identifier strategy for the persistent state tables this bot relies on.

Further reading:

Top comments (0)