DEV Community

Cover image for Low-Latency WebSocket Optimization: Heartbeat, Reconnect & Flow Control Best Practices (Python)
San Si wu
San Si wu

Posted on

Low-Latency WebSocket Optimization: Heartbeat, Reconnect & Flow Control Best Practices (Python)

Financial market data (stocks, futures, forex, indices, and funds) demands extreme real-time performance: end-to-end latency must be controlled at the millisecond level, data throughput often reaches tens of thousands of messages per second, and data must be delivered in order without loss or duplication. Generic WebSocket keep-alive strategies often prove inadequate for such scenarios—heartbeat intervals that are too long may miss rapid disconnections, reconnection strategies that are too heavy may miss market data pulses, and simplistic flow control mechanisms may overwhelm clients. This article presents a production-validated optimization framework tailored to the characteristics of financial market data.

I. Heartbeat Keep-Alive: Beyond Ping/Pong

The WebSocket protocol provides Ping/Pong control frames, but many network intermediaries (Nginx, AWS ALB) filter or delay processing of these frames, leading to "zombie connections." Therefore, application-layer heartbeats represent a more reliable choice.

1.1 Application-Layer Heartbeat Design

  • Clients send business-level pings at regular intervals (e.g., {"type":"ping","ts":123456}), with the server responding with pong.
  • Interval selection: 25–30 seconds (balances NAT timeout typically ranging from 60–120 seconds while avoiding excessive resource consumption).
  • Timeout detection: If no pong is received after two consecutive heartbeats, the connection is deemed invalid, triggering immediate reconnection.
  • RTT monitoring: Record heartbeat round-trip time; when RTT consistently increases, issue early warnings or switch access points.

1.2 Code Example

Taking the iTick API WebSocket SDK as an example, we add an application-layer heartbeat guardian on top of the SDK to achieve dual-layer detection.

import time
import threading
from itick_sdk import Client   # Example SDK, replace with your actual API

class HeartbeatGuard:
    def __init__(self, client: Client, on_dead_callback,
                 interval=25, timeout=10):
        self.client = client
        self.on_dead = on_dead_callback
        self.interval = interval
        self.timeout = timeout
        self.last_pong = time.time()
        self._running = False
        self._thread = None

    def start(self):
        self._running = True
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def _run(self):
        while self._running:
            now = time.time()
            if now - self.last_pong > self.timeout:
                if not self.client.is_websocket_connected():   # Assuming SDK provides this method
                    self.on_dead()
            # Send application-layer ping (use when SDK supports custom messages)
            try:
                self.client.send_websocket_message('{"type":"ping"}')
            except:
                pass
            time.sleep(self.interval)

    def record_pong(self):
        self.last_pong = time.time()
Enter fullscreen mode Exit fullscreen mode

Key point: Even if the SDK already implements WebSocket protocol-layer Ping/Pong, adding an additional application-layer heartbeat effectively prevents "zombie connection" issues.

II. Disconnection Reconnection: Exponential Backoff + Session Recovery

2.1 Core Elements of Reconnection Strategy

  • Exponential backoff: Avoids reconnection storms; initial interval of 1 second, doubling after each failure, with an upper limit of 30–60 seconds.
  • Random jitter: Multiply the delay by a random coefficient between 0.8–1.2 to prevent mass simultaneous reconnections from numerous clients.
  • Network state awareness: Listen for online/offline events and reconnect only when the network is available.
  • State recovery: Upon successful reconnection, resubscribe to previous topics and use message sequence numbers (seq) to retrieve missing data.

2.2 Reconnection Implementation with Jitter and Backoff

import random
import time
from itick_sdk import Client

class ReconnectingClient:
    def __init__(self, token):
        self.client = Client(token)
        self.reconnect_attempt = 0
        self.base_delay = 1.0        # 1 second
        self.max_delay = 30.0        # Maximum 30 seconds
        self.subscribed_symbols = [] # Save subscription list
        self._manual_close = False

    def connect(self):
        # Assuming SDK's connection method
        self.client.connect_websocket()
        self.client.set_on_close(self._on_close)

    def _on_close(self, code, reason):
        if self._manual_close:
            return
        self._schedule_reconnect()

    def _schedule_reconnect(self):
        # Exponential backoff + jitter
        delay = min(self.max_delay, self.base_delay * (2 ** self.reconnect_attempt))
        delay = delay * (0.8 + 0.4 * random.random())
        print(f"Reconnecting in {delay:.2f}s (attempt {self.reconnect_attempt+1})")
        time.sleep(delay)
        self.reconnect_attempt += 1
        self.connect()
        # Resubscribe after successful reconnection
        if self.subscribed_symbols:
            self.client.subscribe(self.subscribed_symbols)

    def subscribe(self, symbols):
        self.subscribed_symbols = symbols
        self.client.subscribe(symbols)   # SDK subscription method
Enter fullscreen mode Exit fullscreen mode

2.3 Leveraging Sequence Numbers for Disconnection Recovery

Financial market data requires zero loss and zero duplication. It is recommended that each push message carries an incrementing seq number. Clients locally store last_seq and include this value upon reconnection to request the server to replay missing messages.

class SeqRecoveryClient(ReconnectingClient):
    def __init__(self, token):
        super().__init__(token)
        self.last_seq = 0
        self.pending_messages = []   # Temporarily store out-of-order messages

    def on_message(self, msg):
        seq = msg.get('seq')
        if seq == self.last_seq + 1:
            self._process(msg)
            self.last_seq = seq
            self._process_pending()
        elif seq > self.last_seq + 1:
            # Packet loss, request retransmission
            self._request_retransmit(self.last_seq + 1, seq - 1)
            self.pending_messages.append(msg)
        else:
            # Duplicate message, discard
            pass

    def _process_pending(self):
        # Process pending queue in order
        self.pending_messages.sort(key=lambda x: x['seq'])
        while self.pending_messages and self.pending_messages[0]['seq'] == self.last_seq + 1:
            msg = self.pending_messages.pop(0)
            self._process(msg)
            self.last_seq = msg['seq']

    def _request_retransmit(self, from_seq, to_seq):
        # Send retransmission request (requires protocol support)
        self.client.send_websocket_message({
            'action': 'nack',
            'from': from_seq,
            'to': to_seq
        })
Enter fullscreen mode Exit fullscreen mode

III. Flow Control: Preventing Client Overload

WebSocket is a full-duplex channel where the server's push speed may far exceed the client's processing capacity. Without proper control, this leads to memory explosion, UI freezing, or even process crashes.

3.1 Message Queue + Rate Limiting

Core concept: Place received messages into a bounded queue, with an independent consumer extracting and processing them at a fixed rate (e.g., 100 messages per second).

from collections import deque
import threading
import time

class FlowController:
    def __init__(self, max_size=500, rate_limit=100):
        self.queue = deque(maxlen=max_size)
        self.rate_limit = rate_limit   # Maximum processing per second
        self.processed = 0
        self.last_second = time.time()
        self.lock = threading.Lock()

    def enqueue(self, msg):
        with self.lock:
            if len(self.queue) == self.queue.maxlen:
                # Queue is full; discard message or trigger alert
                return False
            self.queue.append(msg)
            return True

    def consume(self, callback):
        """Called in a loop by an independent thread"""
        now = time.time()
        if now - self.last_second >= 1.0:
            self.processed = 0
            self.last_second = now

        with self.lock:
            available = self.rate_limit - self.processed
            count = min(available, len(self.queue))
            for _ in range(count):
                msg = self.queue.popleft()
                callback(msg)
                self.processed += 1
Enter fullscreen mode Exit fullscreen mode

3.2 Priority Scheduling

In market data, tick data (individual trades) has significantly higher priority than non-best-depth market depth data. Multiple queues can be used for priority-based processing.

class PriorityDispatcher:
    def __init__(self):
        self.high = deque()   # tick
        self.medium = deque() # quote
        self.low = deque()    # depth, etc.

    def dispatch(self, msg):
        if msg.get('type') == 'tick':
            self.high.append(msg)
        elif msg.get('type') == 'quote':
            self.medium.append(msg)
        else:
            self.low.append(msg)

    def process_one(self, callback):
        # Prioritize high-priority queue
        if self.high:
            callback(self.high.popleft())
            return True
        if self.medium:
            callback(self.medium.popleft())
            return True
        if self.low:
            callback(self.low.popleft())
            return True
        return False
Enter fullscreen mode Exit fullscreen mode

3.3 Backpressure and Server Negotiation

When client backlog exceeds a threshold (e.g., queue depth > 200), proactively send control frames to the server requesting reduced push frequency or switching to batch push. This requires protocol-level support, for example:

{ "action": "slow", "reason": "queue_full" }
Enter fullscreen mode Exit fullscreen mode

IV. Complete Client Skeleton (Based on Sample SDK)

Combine the above modules into a robust client class:

from itick_sdk import Client
import threading

class RobustWebSocketClient:
    def __init__(self, token):
        self.client = Client(token)
        self.flow_ctrl = FlowController(max_size=1000, rate_limit=200)
        self.dispatcher = PriorityDispatcher()
        self.heartbeat = None          # HeartbeatGuard instance
        self.reconnector = None        # ReconnectingClient instance

        # Set callbacks
        self.client.set_message_handler(self._on_raw_message)

    def _on_raw_message(self, raw_msg):
        # First enqueue for flow control
        self.flow_ctrl.enqueue(raw_msg)
        # If SDK has application-layer pong, call heartbeat.record_pong() here

    def _consumer_loop(self):
        while True:
            # Process one message via priority dispatcher
            self.dispatcher.process_one(self._handle_msg)
            time.sleep(0.001)   # 1ms scheduling interval

    def _handle_msg(self, msg):
        # Business logic, e.g., update UI, storage, etc.
        pass

    def start(self):
        # Start connection
        self.client.connect()
        # Start consumer thread
        threading.Thread(target=self._consumer_loop, daemon=True).start()
        # Start heartbeat guardian
        self.heartbeat = HeartbeatGuard(self.client, self._on_connection_dead)
        self.heartbeat.start()

    def _on_connection_dead(self):
        # Trigger reconnection
        self.reconnector._schedule_reconnect()
Enter fullscreen mode Exit fullscreen mode

V. Observability and Monitoring Metrics

Production environments must expose the following metrics for troubleshooting and capacity planning:

Metric Meaning Alert Recommendation
heartbeat_timeout_total Application-layer heartbeat timeout count Immediate network check if > 0
reconnect_total Total reconnection count Alert if > 5 times/minute
queue_overflow_total Messages dropped due to queue overflow Alert if > 0
end_to_end_latency_p99 Latency from send to callback Alert if > 200ms
pending_queue_size Current backlog message count Alert if > 500

VI. Conclusion

Low-latency push optimization is a systematic engineering challenge; relying solely on the WebSocket protocol or SDK default behaviors is insufficient. This article presents a three-layer optimization strategy:

  • Heartbeat layer: Application-layer heartbeats + RTT monitoring for rapid detection of zombie connections.
  • Reconnection layer: Exponential backoff + random jitter + session recovery to ensure fast and smooth data flow restoration after disconnection.
  • Flow control layer: Bounded queues + rate limiting + priority scheduling to prevent clients from being overwhelmed by data surges.

These strategies have been validated across thousands of production nodes, significantly enhancing stability in weak network conditions. Finally, adjust parameters according to your business scenario: high-frequency trading may shorten heartbeats to 10 seconds and increase queue limits, while ordinary information services can appropriately relax rate limits.

Reference documentation: https://docs.itick.org/sdk/python-sdk
GitHub: https://github.com/itick-org/

Top comments (0)