Financial market data (stocks, futures, forex, indices, and funds) demands extreme real-time performance: end-to-end latency must be controlled at the millisecond level, data throughput often reaches tens of thousands of messages per second, and data must be delivered in order without loss or duplication. Generic WebSocket keep-alive strategies often prove inadequate for such scenarios—heartbeat intervals that are too long may miss rapid disconnections, reconnection strategies that are too heavy may miss market data pulses, and simplistic flow control mechanisms may overwhelm clients. This article presents a production-validated optimization framework tailored to the characteristics of financial market data.
I. Heartbeat Keep-Alive: Beyond Ping/Pong
The WebSocket protocol provides Ping/Pong control frames, but many network intermediaries (Nginx, AWS ALB) filter or delay processing of these frames, leading to "zombie connections." Therefore, application-layer heartbeats represent a more reliable choice.
1.1 Application-Layer Heartbeat Design
-
Clients send business-level pings at regular intervals (e.g.,
{"type":"ping","ts":123456}), with the server responding withpong. - Interval selection: 25–30 seconds (balances NAT timeout typically ranging from 60–120 seconds while avoiding excessive resource consumption).
-
Timeout detection: If no
pongis received after two consecutive heartbeats, the connection is deemed invalid, triggering immediate reconnection. - RTT monitoring: Record heartbeat round-trip time; when RTT consistently increases, issue early warnings or switch access points.
1.2 Code Example
Taking the iTick API WebSocket SDK as an example, we add an application-layer heartbeat guardian on top of the SDK to achieve dual-layer detection.
import time
import threading
from itick_sdk import Client # Example SDK, replace with your actual API
class HeartbeatGuard:
def __init__(self, client: Client, on_dead_callback,
interval=25, timeout=10):
self.client = client
self.on_dead = on_dead_callback
self.interval = interval
self.timeout = timeout
self.last_pong = time.time()
self._running = False
self._thread = None
def start(self):
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
while self._running:
now = time.time()
if now - self.last_pong > self.timeout:
if not self.client.is_websocket_connected(): # Assuming SDK provides this method
self.on_dead()
# Send application-layer ping (use when SDK supports custom messages)
try:
self.client.send_websocket_message('{"type":"ping"}')
except:
pass
time.sleep(self.interval)
def record_pong(self):
self.last_pong = time.time()
Key point: Even if the SDK already implements WebSocket protocol-layer Ping/Pong, adding an additional application-layer heartbeat effectively prevents "zombie connection" issues.
II. Disconnection Reconnection: Exponential Backoff + Session Recovery
2.1 Core Elements of Reconnection Strategy
- Exponential backoff: Avoids reconnection storms; initial interval of 1 second, doubling after each failure, with an upper limit of 30–60 seconds.
- Random jitter: Multiply the delay by a random coefficient between 0.8–1.2 to prevent mass simultaneous reconnections from numerous clients.
-
Network state awareness: Listen for
online/offlineevents and reconnect only when the network is available. - State recovery: Upon successful reconnection, resubscribe to previous topics and use message sequence numbers (seq) to retrieve missing data.
2.2 Reconnection Implementation with Jitter and Backoff
import random
import time
from itick_sdk import Client
class ReconnectingClient:
def __init__(self, token):
self.client = Client(token)
self.reconnect_attempt = 0
self.base_delay = 1.0 # 1 second
self.max_delay = 30.0 # Maximum 30 seconds
self.subscribed_symbols = [] # Save subscription list
self._manual_close = False
def connect(self):
# Assuming SDK's connection method
self.client.connect_websocket()
self.client.set_on_close(self._on_close)
def _on_close(self, code, reason):
if self._manual_close:
return
self._schedule_reconnect()
def _schedule_reconnect(self):
# Exponential backoff + jitter
delay = min(self.max_delay, self.base_delay * (2 ** self.reconnect_attempt))
delay = delay * (0.8 + 0.4 * random.random())
print(f"Reconnecting in {delay:.2f}s (attempt {self.reconnect_attempt+1})")
time.sleep(delay)
self.reconnect_attempt += 1
self.connect()
# Resubscribe after successful reconnection
if self.subscribed_symbols:
self.client.subscribe(self.subscribed_symbols)
def subscribe(self, symbols):
self.subscribed_symbols = symbols
self.client.subscribe(symbols) # SDK subscription method
2.3 Leveraging Sequence Numbers for Disconnection Recovery
Financial market data requires zero loss and zero duplication. It is recommended that each push message carries an incrementing seq number. Clients locally store last_seq and include this value upon reconnection to request the server to replay missing messages.
class SeqRecoveryClient(ReconnectingClient):
def __init__(self, token):
super().__init__(token)
self.last_seq = 0
self.pending_messages = [] # Temporarily store out-of-order messages
def on_message(self, msg):
seq = msg.get('seq')
if seq == self.last_seq + 1:
self._process(msg)
self.last_seq = seq
self._process_pending()
elif seq > self.last_seq + 1:
# Packet loss, request retransmission
self._request_retransmit(self.last_seq + 1, seq - 1)
self.pending_messages.append(msg)
else:
# Duplicate message, discard
pass
def _process_pending(self):
# Process pending queue in order
self.pending_messages.sort(key=lambda x: x['seq'])
while self.pending_messages and self.pending_messages[0]['seq'] == self.last_seq + 1:
msg = self.pending_messages.pop(0)
self._process(msg)
self.last_seq = msg['seq']
def _request_retransmit(self, from_seq, to_seq):
# Send retransmission request (requires protocol support)
self.client.send_websocket_message({
'action': 'nack',
'from': from_seq,
'to': to_seq
})
III. Flow Control: Preventing Client Overload
WebSocket is a full-duplex channel where the server's push speed may far exceed the client's processing capacity. Without proper control, this leads to memory explosion, UI freezing, or even process crashes.
3.1 Message Queue + Rate Limiting
Core concept: Place received messages into a bounded queue, with an independent consumer extracting and processing them at a fixed rate (e.g., 100 messages per second).
from collections import deque
import threading
import time
class FlowController:
def __init__(self, max_size=500, rate_limit=100):
self.queue = deque(maxlen=max_size)
self.rate_limit = rate_limit # Maximum processing per second
self.processed = 0
self.last_second = time.time()
self.lock = threading.Lock()
def enqueue(self, msg):
with self.lock:
if len(self.queue) == self.queue.maxlen:
# Queue is full; discard message or trigger alert
return False
self.queue.append(msg)
return True
def consume(self, callback):
"""Called in a loop by an independent thread"""
now = time.time()
if now - self.last_second >= 1.0:
self.processed = 0
self.last_second = now
with self.lock:
available = self.rate_limit - self.processed
count = min(available, len(self.queue))
for _ in range(count):
msg = self.queue.popleft()
callback(msg)
self.processed += 1
3.2 Priority Scheduling
In market data, tick data (individual trades) has significantly higher priority than non-best-depth market depth data. Multiple queues can be used for priority-based processing.
class PriorityDispatcher:
def __init__(self):
self.high = deque() # tick
self.medium = deque() # quote
self.low = deque() # depth, etc.
def dispatch(self, msg):
if msg.get('type') == 'tick':
self.high.append(msg)
elif msg.get('type') == 'quote':
self.medium.append(msg)
else:
self.low.append(msg)
def process_one(self, callback):
# Prioritize high-priority queue
if self.high:
callback(self.high.popleft())
return True
if self.medium:
callback(self.medium.popleft())
return True
if self.low:
callback(self.low.popleft())
return True
return False
3.3 Backpressure and Server Negotiation
When client backlog exceeds a threshold (e.g., queue depth > 200), proactively send control frames to the server requesting reduced push frequency or switching to batch push. This requires protocol-level support, for example:
{ "action": "slow", "reason": "queue_full" }
IV. Complete Client Skeleton (Based on Sample SDK)
Combine the above modules into a robust client class:
from itick_sdk import Client
import threading
class RobustWebSocketClient:
def __init__(self, token):
self.client = Client(token)
self.flow_ctrl = FlowController(max_size=1000, rate_limit=200)
self.dispatcher = PriorityDispatcher()
self.heartbeat = None # HeartbeatGuard instance
self.reconnector = None # ReconnectingClient instance
# Set callbacks
self.client.set_message_handler(self._on_raw_message)
def _on_raw_message(self, raw_msg):
# First enqueue for flow control
self.flow_ctrl.enqueue(raw_msg)
# If SDK has application-layer pong, call heartbeat.record_pong() here
def _consumer_loop(self):
while True:
# Process one message via priority dispatcher
self.dispatcher.process_one(self._handle_msg)
time.sleep(0.001) # 1ms scheduling interval
def _handle_msg(self, msg):
# Business logic, e.g., update UI, storage, etc.
pass
def start(self):
# Start connection
self.client.connect()
# Start consumer thread
threading.Thread(target=self._consumer_loop, daemon=True).start()
# Start heartbeat guardian
self.heartbeat = HeartbeatGuard(self.client, self._on_connection_dead)
self.heartbeat.start()
def _on_connection_dead(self):
# Trigger reconnection
self.reconnector._schedule_reconnect()
V. Observability and Monitoring Metrics
Production environments must expose the following metrics for troubleshooting and capacity planning:
| Metric | Meaning | Alert Recommendation |
|---|---|---|
heartbeat_timeout_total |
Application-layer heartbeat timeout count | Immediate network check if > 0 |
reconnect_total |
Total reconnection count | Alert if > 5 times/minute |
queue_overflow_total |
Messages dropped due to queue overflow | Alert if > 0 |
end_to_end_latency_p99 |
Latency from send to callback | Alert if > 200ms |
pending_queue_size |
Current backlog message count | Alert if > 500 |
VI. Conclusion
Low-latency push optimization is a systematic engineering challenge; relying solely on the WebSocket protocol or SDK default behaviors is insufficient. This article presents a three-layer optimization strategy:
- Heartbeat layer: Application-layer heartbeats + RTT monitoring for rapid detection of zombie connections.
- Reconnection layer: Exponential backoff + random jitter + session recovery to ensure fast and smooth data flow restoration after disconnection.
- Flow control layer: Bounded queues + rate limiting + priority scheduling to prevent clients from being overwhelmed by data surges.
These strategies have been validated across thousands of production nodes, significantly enhancing stability in weak network conditions. Finally, adjust parameters according to your business scenario: high-frequency trading may shorten heartbeats to 10 seconds and increase queue limits, while ordinary information services can appropriately relax rate limits.
Reference documentation: https://docs.itick.org/sdk/python-sdk
GitHub: https://github.com/itick-org/
Top comments (0)