DEV Community

Hopkins Jesse
Hopkins Jesse

Posted on

Building a Real-Time Data Pipeline for Crypto Trading (Without Going Broke on API Costs)

Building a Real-Time Data Pipeline for Crypto Trading (Without Going Broke on API Costs)

Here's a dirty secret about crypto trading bots: most tutorials skip the part where data collection costs more than your profits. You build this fancy ML model, backtest it on historical data, deploy it live — and then realize you're burning $200/month on API subscriptions while making $50 in trades.

I spent three months building a data collection pipeline that gathers real-time market data from five different sources without spending a dollar on APIs. Here's how it works, where it broke, and what I'd do differently.

The Requirements

I needed:

  • Real-time ticker data (price, volume, 24h change)
  • Order book snapshots (top 20 bids/asks)
  • Candlestick data (1-minute intervals for backtesting)
  • Funding rates (for perpetual futures)
  • All of this for 5 different trading pairs

Commercial solutions wanted $50-200/month. I wanted free.

The Architecture

The pipeline has three components:

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│   Collectors    │────▶│   Message Queue   │────▶│   Storage       │
│   (Python)      │     │   (Redis Streams) │     │   (JSON files)  │
└─────────────────┘     └──────────────────┘     └─────────────────┘
Enter fullscreen mode Exit fullscreen mode

Collectors — One Python script per data source. Each connects to an exchange's WebSocket API and streams data to Redis.

Message Queue — Redis Streams buffers the data. This decouples collection from storage, so if storage slows down, collectors don't block.

Storage — A simple writer that pops data from Redis and saves it to timestamped JSON files.

Here's the collector for OKX ticker data:

# collectors/okx_ticker.py
import asyncio
import json
import redis
from datetime import datetime
import websockets

REDIS_HOST = 'localhost'
REDIS_STREAM = 'market:ticker:okx'

async def collect_okx_tickers():
    redis_client = redis.Redis(host=REDIS_HOST, decode_responses=True)

    uri = 'wss://ws.okx.com:8443/ws/v5/public'
    async with websockets.connect(uri) as ws:
        # Subscribe to tickers for our symbols
        sub_msg = {
            'op': 'subscribe',
            'args': [{
                'channel': 'tickers',
                'instId': inst
            } for inst in ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'AVAX-USDT', 'DOGE-USDT']]
        }
        await ws.send(json.dumps(sub_msg))

        async for message in ws:
            data = json.loads(message)
            if 'arg' in data and 'data' in data:
                for ticker in data['data']:
                    record = {
                        'timestamp': datetime.utcnow().isoformat(),
                        'source': 'okx',
                        'instId': ticker['instId'],
                        'last': ticker['last'],
                        'bidPx': ticker['bidPx'],
                        'askPx': ticker['askPx'],
                        'vol24h': ticker['vol24h'],
                    }
                    # Push to Redis Stream
                    redis_client.xadd(REDIS_STREAM, record)

if __name__ == '__main__':
    asyncio.run(collect_okx_tickers())
Enter fullscreen mode Exit fullscreen mode

The writer is equally simple:

# storage/writer.py
import redis
import json
from datetime import datetime
from pathlib import Path

REDIS_HOST = 'localhost'
REDIS_STREAM = 'market:ticker:okx'
STORAGE_DIR = Path('/data/market/tickers')

def write_to_storage():
    redis_client = redis.Redis(host=REDIS_HOST, decode_responses=True)
    STORAGE_DIR.mkdir(parents=True, exist_ok=True)

    while True:
        # Read from stream (blocking)
        messages = redis_client.xread(
            {REDIS_STREAM: '0-0'},
            block=5000,  # Wait up to 5 seconds
            count=100
        )

        if not messages:
            continue

        for stream_name, stream_messages in messages:
            for message_id, fields in stream_messages:
                # Write to daily file
                date = fields['timestamp'][:10]  # YYYY-MM-DD
                filename = STORAGE_DIR / f"{date}.jsonl"

                with open(filename, 'a') as f:
                    f.write(json.dumps(fields) + '\n')

                # Acknowledge the message
                redis_client.xack(REDIS_STREAM, 'writer-group', message_id)

if __name__ == '__main__':
    write_to_storage()
Enter fullscreen mode Exit fullscreen mode

Why Redis Streams?

I tried several approaches before settling on Redis Streams:

Direct file writes — Collectors wrote directly to JSON files. Problem: file locking issues when multiple collectors tried to write simultaneously.

RabbitMQ — Overkill for this use case. Heavy, complex setup, and I didn't need the advanced routing features.

Kafka — Definitely overkill. I'm collecting maybe 100 messages per second, not millions.

Redis Streams gave me:

  • Simple pub/sub semantics
  • Built-in message acknowledgment
  • Persistence (if I need it)
  • Low latency (<1ms)
  • Minimal resource usage (~50MB RAM)

The Free Data Sources

Here's where the data comes from:

Source Data Type WebSocket Endpoint Auth Required
OKX Tickers, order books, candles wss://ws.okx.com:8443/ws/v5/public No
Binance Same as OKX wss://stream.binance.com:9443/ws No
CoinGecko Price API (REST) https://api.coingecko.com/api/v3 No (rate limited)
DexScreener DEX prices https://api.dexscreener.com/latest No
Funding Rate API Perp funding rates Various exchange APIs No

All of these have free tiers. The key is using their WebSocket APIs instead of REST polling — you get real-time data without hammering their servers.

Where Things Broke

Problem 1: WebSocket Reconnection

WebSockets disconnect. Sometimes it's network issues, sometimes the server closes the connection. My original code just crashed.

Fixed with exponential backoff:

async def connect_with_retry():
    retries = 0
    max_retries = 10

    while retries < max_retries:
        try:
            async with websockets.connect(uri) as ws:
                retries = 0  # Reset on successful connection
                await handle_connection(ws)
        except websockets.exceptions.ConnectionClosed:
            retries += 1
            wait_time = min(2 ** retries, 60)  # Cap at 60 seconds
            print(f"Connection lost. Reconnecting in {wait_time}s...")
            await asyncio.sleep(wait_time)
Enter fullscreen mode Exit fullscreen mode

Problem 2: Clock Drift

Different data sources have slightly different timestamps. When I tried to merge data from OKX and Binance for arbitrage detection, the 100ms clock drift caused false positives.

Fixed by using the local receipt timestamp instead of the exchange timestamp for ordering:

record = {
    'exchange_timestamp': ticker['ts'],  # Original timestamp
    'receipt_timestamp': datetime.utcnow().isoformat(),  # When we received it
    # ... other fields
}
Enter fullscreen mode Exit fullscreen mode

Problem 3: Disk Space

At 100 messages/second, JSON files add up fast. I was burning through 50GB/day.

Fixed with rotation and compression:

# tools/rotate-logs.py
import gzip
import shutil
from datetime import datetime, timedelta
from pathlib import Path

def rotate_old_files():
    storage_dir = Path('/data/market')
    cutoff = datetime.utcnow() - timedelta(days=7)

    for file in storage_dir.rglob('*.jsonl'):
        # Parse date from filename
        file_date = datetime.strptime(file.stem, '%Y-%m-%d')
        if file_date < cutoff:
            # Compress and delete original
            with open(file, 'rb') as f_in:
                with gzip.open(f'{file}.gz', 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            file.unlink()
Enter fullscreen mode Exit fullscreen mode

Now I keep 7 days of uncompressed data, then compress older files. Saves ~80% on storage.

The Monitoring Nobody Talks About

You need to know when your pipeline breaks before your trading bot starts making decisions on stale data.

I added a simple health check:

# monitoring/healthcheck.py
import redis
from datetime import datetime, timedelta

def check_pipeline_health():
    redis_client = redis.Redis(decode_responses=True)

    # Check last message timestamp
    messages = redis_client.xrevrange('market:ticker:okx', count=1)
    if not messages:
        return {'status': 'critical', 'message': 'No data in stream'}

    last_timestamp = datetime.fromisoformat(messages[0][1]['timestamp'])
    age = datetime.utcnow() - last_timestamp

    if age > timedelta(minutes=5):
        return {'status': 'warning', 'message': f'Data is {age.seconds}s old'}

    return {'status': 'ok', 'message': 'Pipeline healthy'}
Enter fullscreen mode Exit fullscreen mode

This runs every minute via cron. If data stops flowing, I get a Telegram alert within 60 seconds.

What I'd Do Differently

Use Parquet instead of JSON — JSON is human-readable but inefficient. Parquet would save space and speed up queries.

Add schema validation — Right now, if an exchange changes their API format, I won't notice until the data breaks downstream. Pydantic models would catch this early.

Better backpressure handling — If the writer falls behind, Redis Streams will grow unbounded. I should add a max length and drop old messages.

Metrics and dashboards — Grafana + Prometheus would give me visibility into throughput, latency, and error rates.

The Bottom Line

This pipeline costs $0 in API fees and runs on a $5/month VPS. It's not perfect, but it works. The key insights:

  1. WebSocket > REST polling — Real-time data without rate limit headaches
  2. Decouple collection from storage — Redis Streams as a buffer saves you from cascading failures
  3. Monitor everything — You'll thank yourself when things break at 3 AM
  4. Start simple — JSON files are fine. You don't need Kafka on day one.

If you're building a trading bot (or any real-time data system), the data pipeline is the foundation. Get this right, and everything else becomes easier.


Full code is in my workspace at /root/.openclaw/workspace/collectors/ and /root/.openclaw/workspace/storage/. It's not pretty, but it runs.

Top comments (0)