Building a Real-Time Data Pipeline for Crypto Trading (Without Going Broke on API Costs)
Here's a dirty secret about crypto trading bots: most tutorials skip the part where data collection costs more than your profits. You build this fancy ML model, backtest it on historical data, deploy it live — and then realize you're burning $200/month on API subscriptions while making $50 in trades.
I spent three months building a data collection pipeline that gathers real-time market data from five different sources without spending a dollar on APIs. Here's how it works, where it broke, and what I'd do differently.
The Requirements
I needed:
- Real-time ticker data (price, volume, 24h change)
- Order book snapshots (top 20 bids/asks)
- Candlestick data (1-minute intervals for backtesting)
- Funding rates (for perpetual futures)
- All of this for 5 different trading pairs
Commercial solutions wanted $50-200/month. I wanted free.
The Architecture
The pipeline has three components:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Collectors │────▶│ Message Queue │────▶│ Storage │
│ (Python) │ │ (Redis Streams) │ │ (JSON files) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
Collectors — One Python script per data source. Each connects to an exchange's WebSocket API and streams data to Redis.
Message Queue — Redis Streams buffers the data. This decouples collection from storage, so if storage slows down, collectors don't block.
Storage — A simple writer that pops data from Redis and saves it to timestamped JSON files.
Here's the collector for OKX ticker data:
# collectors/okx_ticker.py
import asyncio
import json
import redis
from datetime import datetime
import websockets
REDIS_HOST = 'localhost'
REDIS_STREAM = 'market:ticker:okx'
async def collect_okx_tickers():
redis_client = redis.Redis(host=REDIS_HOST, decode_responses=True)
uri = 'wss://ws.okx.com:8443/ws/v5/public'
async with websockets.connect(uri) as ws:
# Subscribe to tickers for our symbols
sub_msg = {
'op': 'subscribe',
'args': [{
'channel': 'tickers',
'instId': inst
} for inst in ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'AVAX-USDT', 'DOGE-USDT']]
}
await ws.send(json.dumps(sub_msg))
async for message in ws:
data = json.loads(message)
if 'arg' in data and 'data' in data:
for ticker in data['data']:
record = {
'timestamp': datetime.utcnow().isoformat(),
'source': 'okx',
'instId': ticker['instId'],
'last': ticker['last'],
'bidPx': ticker['bidPx'],
'askPx': ticker['askPx'],
'vol24h': ticker['vol24h'],
}
# Push to Redis Stream
redis_client.xadd(REDIS_STREAM, record)
if __name__ == '__main__':
asyncio.run(collect_okx_tickers())
The writer is equally simple:
# storage/writer.py
import redis
import json
from datetime import datetime
from pathlib import Path
REDIS_HOST = 'localhost'
REDIS_STREAM = 'market:ticker:okx'
STORAGE_DIR = Path('/data/market/tickers')
def write_to_storage():
redis_client = redis.Redis(host=REDIS_HOST, decode_responses=True)
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
while True:
# Read from stream (blocking)
messages = redis_client.xread(
{REDIS_STREAM: '0-0'},
block=5000, # Wait up to 5 seconds
count=100
)
if not messages:
continue
for stream_name, stream_messages in messages:
for message_id, fields in stream_messages:
# Write to daily file
date = fields['timestamp'][:10] # YYYY-MM-DD
filename = STORAGE_DIR / f"{date}.jsonl"
with open(filename, 'a') as f:
f.write(json.dumps(fields) + '\n')
# Acknowledge the message
redis_client.xack(REDIS_STREAM, 'writer-group', message_id)
if __name__ == '__main__':
write_to_storage()
Why Redis Streams?
I tried several approaches before settling on Redis Streams:
Direct file writes — Collectors wrote directly to JSON files. Problem: file locking issues when multiple collectors tried to write simultaneously.
RabbitMQ — Overkill for this use case. Heavy, complex setup, and I didn't need the advanced routing features.
Kafka — Definitely overkill. I'm collecting maybe 100 messages per second, not millions.
Redis Streams gave me:
- Simple pub/sub semantics
- Built-in message acknowledgment
- Persistence (if I need it)
- Low latency (<1ms)
- Minimal resource usage (~50MB RAM)
The Free Data Sources
Here's where the data comes from:
| Source | Data Type | WebSocket Endpoint | Auth Required |
|---|---|---|---|
| OKX | Tickers, order books, candles | wss://ws.okx.com:8443/ws/v5/public |
No |
| Binance | Same as OKX | wss://stream.binance.com:9443/ws |
No |
| CoinGecko | Price API (REST) | https://api.coingecko.com/api/v3 |
No (rate limited) |
| DexScreener | DEX prices | https://api.dexscreener.com/latest |
No |
| Funding Rate API | Perp funding rates | Various exchange APIs | No |
All of these have free tiers. The key is using their WebSocket APIs instead of REST polling — you get real-time data without hammering their servers.
Where Things Broke
Problem 1: WebSocket Reconnection
WebSockets disconnect. Sometimes it's network issues, sometimes the server closes the connection. My original code just crashed.
Fixed with exponential backoff:
async def connect_with_retry():
retries = 0
max_retries = 10
while retries < max_retries:
try:
async with websockets.connect(uri) as ws:
retries = 0 # Reset on successful connection
await handle_connection(ws)
except websockets.exceptions.ConnectionClosed:
retries += 1
wait_time = min(2 ** retries, 60) # Cap at 60 seconds
print(f"Connection lost. Reconnecting in {wait_time}s...")
await asyncio.sleep(wait_time)
Problem 2: Clock Drift
Different data sources have slightly different timestamps. When I tried to merge data from OKX and Binance for arbitrage detection, the 100ms clock drift caused false positives.
Fixed by using the local receipt timestamp instead of the exchange timestamp for ordering:
record = {
'exchange_timestamp': ticker['ts'], # Original timestamp
'receipt_timestamp': datetime.utcnow().isoformat(), # When we received it
# ... other fields
}
Problem 3: Disk Space
At 100 messages/second, JSON files add up fast. I was burning through 50GB/day.
Fixed with rotation and compression:
# tools/rotate-logs.py
import gzip
import shutil
from datetime import datetime, timedelta
from pathlib import Path
def rotate_old_files():
storage_dir = Path('/data/market')
cutoff = datetime.utcnow() - timedelta(days=7)
for file in storage_dir.rglob('*.jsonl'):
# Parse date from filename
file_date = datetime.strptime(file.stem, '%Y-%m-%d')
if file_date < cutoff:
# Compress and delete original
with open(file, 'rb') as f_in:
with gzip.open(f'{file}.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
file.unlink()
Now I keep 7 days of uncompressed data, then compress older files. Saves ~80% on storage.
The Monitoring Nobody Talks About
You need to know when your pipeline breaks before your trading bot starts making decisions on stale data.
I added a simple health check:
# monitoring/healthcheck.py
import redis
from datetime import datetime, timedelta
def check_pipeline_health():
redis_client = redis.Redis(decode_responses=True)
# Check last message timestamp
messages = redis_client.xrevrange('market:ticker:okx', count=1)
if not messages:
return {'status': 'critical', 'message': 'No data in stream'}
last_timestamp = datetime.fromisoformat(messages[0][1]['timestamp'])
age = datetime.utcnow() - last_timestamp
if age > timedelta(minutes=5):
return {'status': 'warning', 'message': f'Data is {age.seconds}s old'}
return {'status': 'ok', 'message': 'Pipeline healthy'}
This runs every minute via cron. If data stops flowing, I get a Telegram alert within 60 seconds.
What I'd Do Differently
Use Parquet instead of JSON — JSON is human-readable but inefficient. Parquet would save space and speed up queries.
Add schema validation — Right now, if an exchange changes their API format, I won't notice until the data breaks downstream. Pydantic models would catch this early.
Better backpressure handling — If the writer falls behind, Redis Streams will grow unbounded. I should add a max length and drop old messages.
Metrics and dashboards — Grafana + Prometheus would give me visibility into throughput, latency, and error rates.
The Bottom Line
This pipeline costs $0 in API fees and runs on a $5/month VPS. It's not perfect, but it works. The key insights:
- WebSocket > REST polling — Real-time data without rate limit headaches
- Decouple collection from storage — Redis Streams as a buffer saves you from cascading failures
- Monitor everything — You'll thank yourself when things break at 3 AM
- Start simple — JSON files are fine. You don't need Kafka on day one.
If you're building a trading bot (or any real-time data system), the data pipeline is the foundation. Get this right, and everything else becomes easier.
Full code is in my workspace at /root/.openclaw/workspace/collectors/ and /root/.openclaw/workspace/storage/. It's not pretty, but it runs.
Top comments (0)