Last updated: January 2026
Quick heads up: I work at CoinPaprika/DexPaprika, so I'm using our APIs in these examples. But all the measurements are real - you can run the code yourself and verify everything.
I spent way too long debugging why my "real-time" price feed was showing stale data. Turned out my 5-second polling was giving me 10-second-old prices.
You see "real-time prices" everywhere. Crypto dashboards promise "live updates." APIs claim "instant data." But what does "real-time" actually mean when you're building applications that need crypto price data?
The answer: it depends. "Real-time" is a marketing term, not a technical specification. One API's "real-time" might deliver data every second. Another's might update every 10 seconds. Both call themselves "real-time."
In this article, you'll compare three different approaches to getting crypto prices - two using polling (REST APIs) and one using streaming (Server-Sent Events). You'll write Python code to measure actual latency and see the differences yourself. No theory. Just working code and real measurements.
You'll learn what latency, freshness, and guarantees actually mean, how polling differs from streaming, and how to measure any API's "real-time" claims. Plus why your 5-second poll interval gives you 10-second-old data.
We'll use three real APIs:
- CoinPaprika REST - Centralized exchange data via polling
- DexPaprika REST - DEX data via polling
- DexPaprika Streaming - DEX data via Server-Sent Events
All code examples are fully functional, and the expected outputs shown use actual values from live API calls (BTC: ~$96,600, ETH: ~$3,300).
Three metrics that actually matter
You'll measure three things in this article:
Latency: Time from trade -> your app
Freshness: How old the data is when you receive it
Guarantees: What the API promises (if anything)
Here's what these mean with actual code and measurements.
Latency: time from event to delivery
Latency is the time between when a price changes and when that change reaches your application.
For example:
- Trade happens on Ethereum at 10:00:00
- Transaction confirmed in block at 10:00:12 (12 seconds - Ethereum block time)
- API indexes the transaction at 10:00:13 (1 second - processing)
- API sends update to your app at 10:00:13.5 (0.5 seconds - network)
Total latency: 13.5 seconds.
For DEX (decentralized exchange) prices on Ethereum, you cannot beat the ~12 second block time. It's a fundamental constraint. No API can deliver confirmed on-chain prices faster than the blockchain produces them.
CEX (centralized exchange) prices are different. Coinbase doesn't need to wait for blockchain confirmation - the trade happens in their database. CEX APIs can achieve sub-second latency.
Freshness: how old is your data?
Freshness is how old the data is when you receive it. It's what you can actually measure client-side.
Formula: freshness = now - trade_timestamp
If you receive a price at 10:00:20 and the trade happened at 10:00:08, your data is 12 seconds stale. That's your freshness.
For polling, freshness ≠ latency. With streaming, freshness approximately equals latency (you get updates as they arrive).
With polling, freshness = latency + average poll interval. If your poll interval is 5 seconds and the API has 1 second latency:
- Best case: You poll right when new data arrives = 1-second freshness
- Worst case: New data arrives right after you poll = 6 second freshness (1s latency + 5s until next poll)
- Average: ~3.5 second freshness
Polling always adds latency because of the wait time between requests.
Guarantees: what can actually be promised?
Most APIs promise vague "real-time" delivery. Some actually guarantee numbers.
APIs can guarantee:
- Update frequency ("new data every 1 second")
- Uptime ("99.9% available")
- Freshness window ("never > 5 seconds old")
APIs cannot guarantee:
- Exact latency (networks vary)
- Faster than blockchain (physics)
- Zero message loss (connections fail)
Free APIs usually say "best effort." Paid tiers might guarantee "P95 < 2s." Enterprise SLAs add financial penalties.
Look for numbers, not marketing.
Polling approach: REST API requests on an interval
Polling is the traditional approach: make HTTP requests on a schedule.
How polling works
Loop forever:
1. Make HTTP request
2. Wait for response
3. Process data
4. Sleep for N seconds
5. Repeat
Your poll interval determines maximum freshness. Poll every 5 seconds? Your data will be 5-10 seconds stale on average.
Trade-off: Faster polling = more requests = higher costs and potential rate limiting.
Example: CoinPaprika REST API for centralized exchange prices
CoinPaprika provides centralized exchange data via a REST API. Let's poll it every 5 seconds.
# coinpaprika_polling.py
"""Poll CoinPaprika REST API every 5 seconds"""
import requests
import time
import json
POLL_INTERVAL = 5 # seconds
DURATION = 30 # seconds
COIN_ID = "btc-bitcoin"
API_URL = f"https://api.coinpaprika.com/v1/tickers/{COIN_ID}" # Public API, no auth required
def poll_coinpaprika():
print(f"Polling CoinPaprika every {POLL_INTERVAL}s for {DURATION}s...\n")
start_time = time.time()
request_count = 0
while time.time() - start_time < DURATION:
try:
response = requests.get(API_URL, timeout=10)
response.raise_for_status()
data = response.json()
price = data['quotes']['USD']['price']
last_updated = data['last_updated']
request_count += 1
print(f"[DATA] Request #{request_count}")
print(f" Price: ${price:,.2f}")
print(f" Last updated: {last_updated}")
print(f" Note: No trade timestamp - can't measure exact freshness\n")
except requests.exceptions.RequestException as e:
print(f"Error: {e}\n")
time.sleep(POLL_INTERVAL)
print(f"Completed {request_count} requests in {DURATION} seconds")
print(f"Average: {DURATION / request_count:.1f}s between updates")
if __name__ == "__main__":
poll_coinpaprika()
Run it:
pip install requests
python coinpaprika_polling.py
Expected output:
Polling CoinPaprika every 5s for 30s...
[DATA] Request #1
Price: $96,662.93
Last updated: 2026-01-15T17:30:27Z
Note: No trade timestamp - can't measure exact freshness
[DATA] Request #2
Price: $96,665.12
Last updated: 2026-01-15T17:30:32Z
Note: No trade timestamp - can't measure exact freshness
...
Completed 6 requests in 30 seconds
Average: 5.0s between updates
The API doesn't include the actual trade timestamp, only "last updated." Without the trade timestamp, you cannot verify freshness client-side. You have to trust the API.
Example: DexPaprika REST API for DEX prices
DexPaprika focuses on DEX (decentralized exchange) data. Let's poll it the same way.
# dexpaprika_polling.py
"""Poll DexPaprika REST API every 5 seconds"""
import requests
import time
POLL_INTERVAL = 5
DURATION = 30
CHAIN = "ethereum"
TOKEN_ADDRESS = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2" # WETH
API_URL = f"https://api.dexpaprika.com/networks/{CHAIN}/tokens/{TOKEN_ADDRESS}" # Public API, no auth required
def poll_dexpaprika():
print(f"Polling DexPaprika REST every {POLL_INTERVAL}s for {DURATION}s...\n")
start_time = time.time()
request_count = 0
while time.time() - start_time < DURATION:
try:
response = requests.get(API_URL, timeout=10)
response.raise_for_status()
data = response.json()
price = float(data['summary']['price_usd'])
now = int(time.time())
request_count += 1
print(f"[DATA] Request #{request_count}")
print(f" Price: ${price:,.2f}")
print(f" Retrieved at: {now}")
print(f" Note: No timestamp in response\n")
except Exception as e:
print(f"Error: {e}\n")
time.sleep(POLL_INTERVAL)
print(f"Completed {request_count} requests")
print(f"Freshness: ~{POLL_INTERVAL}-{POLL_INTERVAL * 2}s (estimated)")
if __name__ == "__main__":
poll_dexpaprika()
Same pattern: request, wait, repeat. Same problem: no trade timestamp to verify freshness.
Polling characteristics and when to use it
Latency: API inherent latency (100ms-1s) + your poll interval (5s) = 5-6 seconds total freshness
Freshness range:
- Best case: Poll right when data updates = ~1 second stale
- Worst case: Data updates right after poll = ~6 seconds stale
- Average: ~3.5 seconds stale
Guarantees: No live updates between polls. If price changes 3 times during your 5-second sleep, you only see the last value.
Trade-offs:
- Simple (just HTTP requests, works everywhere)
- Easy to implement (no connection management)
- Wastes requests (polling when nothing changed)
- Adds latency (poll interval delay)
- Rate limits restrict how fast you can poll
When polling makes sense:
- Updates needed infrequently (> 10 seconds)
- Simple integration required
- One-time data fetches (page load, not continuous)
- Firewall blocks persistent connections
Streaming approach: Server-Sent Events for real-time updates
Streaming flips the model: instead of asking for updates, the server pushes them to you.
How Server-Sent Events works
SSE (Server-Sent Events) is a simple protocol for server-to-client streaming:
1. Client opens persistent HTTP connection
2. Server sends updates as they occur
3. Client receives events in real-time
4. Connection stays open (no request loop)
5. Browser auto-reconnects if connection drops
No polling loop. No wasted requests. Updates arrive when they're available.
Example: DexPaprika streaming API with SSE
DexPaprika offers free streaming via SSE. Let's connect and measure.
# dexpaprika_streaming.py
"""Connect to DexPaprika streaming API"""
import sseclient
import requests
import time
import json
DURATION = 30
CHAIN = "ethereum"
TOKEN_ADDRESS = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2" # WETH
STREAM_URL = f"https://streaming.dexpaprika.com/stream?method=t_p&chain={CHAIN}&address={TOKEN_ADDRESS}"
def stream_dexpaprika():
print(f"Connecting to DexPaprika streaming...")
print(f"Duration: {DURATION}s\n")
start_time = time.time()
update_count = 0
try:
response = requests.get(STREAM_URL, stream=True, timeout=60)
client = sseclient.SSEClient(response)
print("Connected! Receiving updates...\n")
for event in client.events():
if time.time() - start_time > DURATION:
break
# Filter for trade price events (named event: t_p)
if event.event == 't_p':
try:
data = json.loads(event.data)
# Use probe-verified schema
price = data['p'] # Price
trade_time = data['t_p'] # Trade timestamp
server_time = data['t'] # Server timestamp
server_latency = server_time - trade_time
update_count += 1
print(f"[DATA] Update #{update_count}")
print(f" Price: ${price}")
print(f" Server latency: {server_latency}s\n")
except (json.JSONDecodeError, KeyError) as e:
print(f"Error: {e}\n")
except requests.exceptions.RequestException as e:
print(f"Connection error: {e}")
elapsed = time.time() - start_time
print(f"Received {update_count} updates in {elapsed:.1f}s")
if update_count > 0:
print(f"Average: {elapsed / update_count:.2f}s per update")
if __name__ == "__main__":
stream_dexpaprika()
Run it:
pip install requests sseclient-py
python dexpaprika_streaming.py
Expected output:
Connecting to DexPaprika streaming...
Duration: 30s
Connected! Receiving updates...
[DATA] Update #1
Price: $3326.51
Server latency: 1s
[DATA] Update #2
Price: $3326.47
Server latency: 1s
[DATA] Update #3
Price: $3326.52
Server latency: 1s
...
Received 28 updates in 30.1s
Average: 1.07s per update
Updates arrive continuously, roughly every second. No poll interval. No waiting.
Measuring streaming data freshness
But how fresh is this data really? Let's measure both server latency and total freshness.
# streaming_freshness.py
"""Measure streaming data freshness"""
import sseclient
import requests
import time
import json
DURATION = 60
CHAIN = "ethereum"
TOKEN_ADDRESS = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
STREAM_URL = f"https://streaming.dexpaprika.com/stream?method=t_p&chain={CHAIN}&address={TOKEN_ADDRESS}"
def measure_freshness():
print(f"Measuring freshness for {DURATION}s...\n")
start_time = time.time()
server_latencies = []
total_freshnesses = []
try:
response = requests.get(STREAM_URL, stream=True, timeout=120)
client = sseclient.SSEClient(response)
for event in client.events():
if time.time() - start_time > DURATION:
break
if event.event == 't_p':
try:
data = json.loads(event.data)
now = int(time.time())
trade_time = data['t_p'] # When trade occurred
server_time = data['t'] # When server sent
price = data['p']
server_latency = server_time - trade_time
total_freshness = now - trade_time
server_latencies.append(server_latency)
total_freshnesses.append(total_freshness)
print(f"[DATA] Price: ${price}")
print(f" Server latency: {server_latency}s")
print(f" Total freshness: {total_freshness}s")
print(f" Network delay: {total_freshness - server_latency}s\n")
except Exception as e:
print(f"Error: {e}\n")
except Exception as e:
print(f"Connection error: {e}")
# Print statistics
if server_latencies and total_freshnesses:
print(f"\n{'='*50}")
print("FRESHNESS STATISTICS")
print(f"{'='*50}")
print(f"\nServer Latency:")
print(f" Min: {min(server_latencies)}s")
print(f" Max: {max(server_latencies)}s")
print(f" Avg: {sum(server_latencies) / len(server_latencies):.2f}s")
print(f"\nTotal Freshness:")
print(f" Min: {min(total_freshnesses)}s")
print(f" Max: {max(total_freshnesses)}s")
print(f" Avg: {sum(total_freshnesses) / len(total_freshnesses):.2f}s")
if __name__ == "__main__":
measure_freshness()
Expected output:
Measuring freshness for 60s...
[DATA] Price: $3326.51
Server latency: 1s
Total freshness: 2s
Network delay: 1s
[DATA] Price: $3326.47
Server latency: 1s
Total freshness: 1s
Network delay: 0s
...
==================================================
FRESHNESS STATISTICS
==================================================
Server Latency:
Min: 1s
Max: 2s
Avg: 1.05s
Total Freshness:
Min: 1s
Max: 3s
Avg: 1.8s
Streaming gives consistent 1-2 second freshness. Server processing adds ~1s. Network adds 0-1s. Total: 1-2s from trade to your app.
Streaming characteristics and when to use it
Latency: ~1-2 seconds (measured above)
Freshness: Consistent 1-2 seconds (updates arrive without polling delay)
Guarantees: Updates every second (predictable frequency), but no guarantee of zero message loss
Trade-offs:
- Lower freshness (1-2s vs 5-10s polling)
- Efficient (no wasted requests)
- Immediate updates when prices change
- Requires persistent connection
- More complex (connection management, reconnection)
- Browser connection limits (6 per domain on HTTP/1.1)
When streaming makes sense:
- Updates needed frequently (< 5 seconds)
- Real-time user experience required
- Monitoring multiple tokens
- High-traffic applications (efficiency matters)
Polling vs streaming: direct performance comparison
Let's run both approaches side-by-side and compare results.
Side-by-side test
# compare_approaches.py
"""Compare polling vs streaming simultaneously"""
import threading
import requests
import sseclient
import time
import json
DURATION = 60
POLL_INTERVAL = 5
CHAIN = "ethereum"
TOKEN_ADDRESS = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
REST_URL = f"https://api.dexpaprika.com/networks/{CHAIN}/tokens/{TOKEN_ADDRESS}" # Public API, no auth
STREAM_URL = f"https://streaming.dexpaprika.com/stream?method=t_p&chain={CHAIN}&address={TOKEN_ADDRESS}"
poll_updates = []
stream_updates = []
lock = threading.Lock()
def polling_thread():
start_time = time.time()
while time.time() - start_time < DURATION:
try:
response = requests.get(REST_URL, timeout=10)
data = response.json()
price = float(data['summary']['price_usd'])
with lock:
poll_updates.append(int(time.time()))
print(f"[POLL] ${price:,.2f}")
except Exception as e:
print(f"[POLL] Error: {e}")
time.sleep(POLL_INTERVAL)
def streaming_thread():
start_time = time.time()
try:
response = requests.get(STREAM_URL, stream=True, timeout=120)
client = sseclient.SSEClient(response)
for event in client.events():
if time.time() - start_time > DURATION:
break
if event.event == 't_p':
try:
data = json.loads(event.data)
price = data['p']
with lock:
stream_updates.append(int(time.time()))
print(f"[STREAM] ${price}")
except Exception as e:
print(f"[STREAM] Error: {e}")
except Exception as e:
print(f"[STREAM] Connection error: {e}")
def compare_approaches():
print(f"Comparing polling vs streaming for {DURATION}s...\n")
poll_thread = threading.Thread(target=polling_thread)
stream_thread = threading.Thread(target=streaming_thread)
poll_thread.start()
stream_thread.start()
poll_thread.join()
stream_thread.join()
print(f"\n{'='*50}")
print("COMPARISON RESULTS")
print(f"{'='*50}")
print(f"\nPolling:")
print(f" Total updates: {len(poll_updates)}")
print(f" Frequency: Every {POLL_INTERVAL}s")
print(f"\nStreaming:")
print(f" Total updates: {len(stream_updates)}")
if len(stream_updates) > 0 and len(poll_updates) > 0:
print(f"\nStreaming received {len(stream_updates) / len(poll_updates):.1f}x more updates")
if __name__ == "__main__":
compare_approaches()
Run it:
python compare_approaches.py
Expected output:
Comparing polling vs streaming for 60s...
[STREAM] $3326.51
[STREAM] $3326.47
[POLL] $3,326.47
[STREAM] $3326.52
[STREAM] $3326.48
[STREAM] $3326.50
[POLL] $3,326.51
...
==================================================
COMPARISON RESULTS
==================================================
Polling:
Total updates: 12
Frequency: Every 5s
Streaming:
Total updates: 58
Streaming received 4.8x more updates
Metrics comparison table
| Metric | Polling (5s interval) | Streaming (SSE) |
|---|---|---|
| Average Freshness | 7-8 seconds | 1-2 seconds |
| Update Frequency | Every 5 seconds | Every ~1 second |
| Missed Updates | High (only see changes every 5s) | Low (see all changes) |
| Requests/Minute | 12 HTTP requests | 1 connection |
| Bandwidth | ~12KB/min | ~5KB/min |
| Complexity | Simple (requests library) | Moderate (SSE client) |
| Rate Limit Impact | High (12 req/min) | Low (1 connection) |
Decision: when to use each approach
Use polling (REST) when:
- Updates needed infrequently (> 10 seconds)
- Simple integration required (no connection management)
- Firewall/proxy blocks persistent connections
- Updating data once per page load (not continuous monitoring)
- Examples: Portfolio summary page, daily price charts, historical data
Use streaming (SSE) when:
- Updates needed frequently (< 5 seconds)
- Real-time user experience required
- Efficient resource usage matters (high-traffic app)
- Monitoring multiple tokens simultaneously
- Examples: Live price tickers, trading dashboards, price alerts
Measuring and monitoring API performance
How do you verify an API's "real-time" claims? Build a measurement tool.
Statistics dashboard with P50, P95, P99 percentiles
# measure_latency.py
"""Collect latency statistics (P50, P95, P99)"""
import sseclient
import requests
import time
import json
from collections import deque
DURATION = 120
CHAIN = "ethereum"
TOKEN_ADDRESS = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
STREAM_URL = f"https://streaming.dexpaprika.com/stream?method=t_p&chain={CHAIN}&address={TOKEN_ADDRESS}"
class LatencyMonitor:
def __init__(self, window_size=100):
self.latencies = deque(maxlen=window_size)
def add_measurement(self, latency):
self.latencies.append(latency)
def get_statistics(self):
if not self.latencies:
return None
sorted_lat = sorted(self.latencies)
n = len(sorted_lat)
return {
'count': n,
'min': sorted_lat[0],
'max': sorted_lat[-1],
'p50': sorted_lat[int(n * 0.50)],
'p95': sorted_lat[int(n * 0.95)],
'p99': sorted_lat[int(n * 0.99)],
'avg': sum(sorted_lat) / n
}
def measure_latency():
print(f"Measuring latency for {DURATION}s...\n")
monitor = LatencyMonitor()
start_time = time.time()
sample_count = 0
try:
response = requests.get(STREAM_URL, stream=True, timeout=180)
client = sseclient.SSEClient(response)
for event in client.events():
if time.time() - start_time > DURATION:
break
if event.event == 't_p':
try:
data = json.loads(event.data)
now = int(time.time())
latency = now - data['t_p']
monitor.add_measurement(latency)
sample_count += 1
if sample_count % 10 == 0:
stats = monitor.get_statistics()
print(f"[DATA] Samples: {stats['count']:4d} | "
f"P50: {stats['p50']}s | "
f"P95: {stats['p95']}s | "
f"Avg: {stats['avg']:.2f}s")
except Exception as e:
print(f"Error: {e}")
except Exception as e:
print(f"Connection error: {e}")
print(f"\n{'='*60}")
print("FINAL LATENCY STATISTICS")
print(f"{'='*60}")
final_stats = monitor.get_statistics()
if final_stats:
print(f"Total samples: {final_stats['count']}")
print(f"\nPercentiles:")
print(f" P50 (median): {final_stats['p50']}s")
print(f" P95: {final_stats['p95']}s - 95% faster than this")
print(f" P99: {final_stats['p99']}s - 99% faster than this")
print(f"\nRange:")
print(f" Min: {final_stats['min']}s")
print(f" Max: {final_stats['max']}s")
print(f" Average: {final_stats['avg']:.2f}s")
if __name__ == "__main__":
measure_latency()
Expected output:
Measuring latency for 120s...
[DATA] Samples: 10 | P50: 1s | P95: 2s | Avg: 1.20s
[DATA] Samples: 20 | P50: 1s | P95: 2s | Avg: 1.35s
[DATA] Samples: 30 | P50: 1s | P95: 2s | Avg: 1.40s
...
============================================================
FINAL LATENCY STATISTICS
============================================================
Total samples: 120
Percentiles:
P50 (median): 1s
P95: 2s - 95% faster than this
P99: 3s - 99% faster than this
Range:
Min: 1s
Max: 4s
Average: 1.43s
Understanding percentiles for API performance
Why percentiles matter more than averages:
- P50 (median): Half of updates are faster, half are slower
- P95: 95% of updates are faster - this is typical user experience
- P99: 99% of updates are faster - worst 1% experience
Example: Average is 1s, but P95 is 3s. Most users (95%) see 1-3s latency, not 1s. The average hides outliers.
Use P95 to understand real-world performance.
Evaluating API claims: red flags and green flags
How to verify "real-time" marketing:
Red flags:
- "Zero latency" or "instant" (impossible - physics limits)
- No specific numbers (vague "fast" claims)
- No timestamps in responses (can't verify freshness)
- Promises sub-second DEX prices (cannot beat 12s Ethereum blocks)
Green flags:
- Specific numbers ("P95 < 2s", "1 second updates")
- Timestamps included in every response
- Public status page showing uptime
- Documents failure modes and limitations
Action: Run your own measurements. Don't trust marketing—verify with code like the examples above.
What I learned measuring "real-time"
"Real-time" without numbers is marketing, not a spec. Always ask for P95 latency and update frequency.
Polling adds your poll interval to freshness. My 5-second polls gave 7-8 second old data. Streaming cut that to 1-2 seconds.
Blockchain sets the floor. No DEX API can beat Ethereum's 12-second blocks. Anyone promising faster is lying.
Run the code above on any API you're evaluating. Measure, don't trust.
Try it yourself:
pip install requests sseclient-py
python dexpaprika_streaming.py
Quick reference:
Use streaming when you need frequent updates (< 5s) and continuous monitoring. Use polling for simple integration and infrequent updates (> 10s).
Verify claims by running measurement tools, checking for timestamps in responses, and calculating P50/P95/P99 (not just average).
Resources:
Code examples coming to GitHub soon - follow me for updates or reach out if you need them immediately.
Top comments (0)