DEV Community

Cover image for Building Real-Time Applications with Python WebSockets: A Practical Guide
Aarav Joshi
Aarav Joshi

Posted on

1

Building Real-Time Applications with Python WebSockets: A Practical Guide

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

I've been working with real-time data processing systems for several years, and Python combined with WebSockets offers exceptional capabilities for building responsive applications. Let me share what I've learned from implementing these solutions across various projects.

WebSockets have revolutionized how we think about client-server communication. Unlike traditional HTTP, which follows a request-response pattern, WebSockets maintain persistent connections allowing for true bidirectional data flow. This creates opportunities for applications where immediacy matters - from financial trading platforms to collaborative tools.

Python makes WebSocket implementation remarkably straightforward. The language's asynchronous capabilities pair perfectly with the event-driven nature of WebSocket communication. When handling thousands of simultaneous connections, this becomes critical for performance.

The Foundation: Python's WebSocket Libraries

The websockets library stands out as the most robust pure-Python implementation. It builds on Python's asyncio framework, making it ideal for high-concurrency applications.

A basic WebSocket server looks like this:

import asyncio
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def main():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()  # Run forever

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This simple example demonstrates the elegant API that websockets provides. The server accepts connections and echoes back any messages it receives.

For the client side:

import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await websocket.send("Hello world!")
        response = await websocket.recv()
        print(f"Received: {response}")

asyncio.run(hello())
Enter fullscreen mode Exit fullscreen mode

While these examples are basic, they illustrate the core pattern. The real power comes when building more complex systems.

Real-Time Data Broadcasting

When working with real-time data, I often need to broadcast updates to multiple clients simultaneously. Here's a pattern I've found effective:

import asyncio
import json
import websockets
from datetime import datetime

# Track connected clients
connected_clients = set()

async def register(websocket):
    connected_clients.add(websocket)
    try:
        await websocket.wait_closed()
    finally:
        connected_clients.remove(websocket)

async def broadcast(message):
    if connected_clients:
        await asyncio.gather(
            *[client.send(message) for client in connected_clients]
        )

async def handler(websocket):
    # Register new client
    await register(websocket)

    # Process incoming messages
    async for message in websocket:
        data = json.loads(message)

        # Add timestamp and process data
        data["server_time"] = datetime.now().isoformat()
        processed = json.dumps(data)

        # Broadcast to all clients
        await broadcast(processed)

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        await asyncio.Future()  # Run forever

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This pattern maintains a set of connected clients and provides a broadcast mechanism to send messages to all of them. The handler function processes incoming messages and forwards them to all connected clients.

Integration with FastAPI

When building modern web applications, I frequently use FastAPI for its performance and simplicity. Integrating WebSockets with FastAPI creates a powerful combination:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            parsed_data = json.loads(data)

            # Process data here
            processed_data = {
                "source": parsed_data.get("source", "unknown"),
                "value": float(parsed_data.get("value", 0)) * 1.5,
                "processed": True
            }

            # Broadcast to all clients
            await manager.broadcast(json.dumps(processed_data))
    except WebSocketDisconnect:
        manager.disconnect(websocket)
Enter fullscreen mode Exit fullscreen mode

This approach allows you to combine RESTful endpoints with WebSocket connections in the same application, giving you flexibility to design APIs that best suit your needs.

Scaling with Redis Pub/Sub

As applications grow, running everything in a single process becomes impractical. I've found Redis Pub/Sub to be an excellent solution for distributing WebSocket messages across multiple server instances:

import asyncio
import json
import websockets
import aioredis

# Redis connection
redis = None

async def redis_listener(channel_name):
    pubsub = redis.pubsub()
    await pubsub.subscribe(channel_name)

    async for message in pubsub.listen():
        if message['type'] == 'message':
            data = message['data'].decode('utf-8')
            await broadcast(data)

# WebSocket connections
connected_clients = set()

async def register(websocket):
    connected_clients.add(websocket)
    try:
        await websocket.wait_closed()
    finally:
        connected_clients.remove(websocket)

async def broadcast(message):
    if connected_clients:
        await asyncio.gather(
            *[client.send(message) for client in connected_clients]
        )

async def handler(websocket):
    await register(websocket)

    async for message in websocket:
        # Publish message to Redis
        await redis.publish("data_channel", message)

async def main():
    global redis
    redis = await aioredis.create_redis_pool("redis://localhost")

    # Start Redis listener
    asyncio.create_task(redis_listener("data_channel"))

    async with websockets.serve(handler, "localhost", 8765):
        await asyncio.Future()  # Run forever

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This pattern allows multiple server instances to communicate through Redis. When a server receives a message from a client, it publishes it to Redis, which then distributes it to all servers. Each server then broadcasts the message to its connected clients.

Processing Data Streams in Real-Time

Real-time data often comes in streams that need processing before delivery to clients. Here's a pattern I use for processing streaming data:

import asyncio
import json
import websockets
import numpy as np
from collections import deque

# Data processing class
class StreamProcessor:
    def __init__(self, window_size=10):
        self.window = deque(maxlen=window_size)

    def process(self, value):
        # Add value to window
        self.window.append(value)

        # Calculate statistics
        if len(self.window) > 1:
            return {
                "value": value,
                "mean": np.mean(self.window),
                "std": np.std(self.window),
                "min": np.min(self.window),
                "max": np.max(self.window)
            }
        return {"value": value}

# Shared processor instance
processor = StreamProcessor()

# WebSocket connections
connected_clients = set()

async def register(websocket):
    connected_clients.add(websocket)
    try:
        await websocket.wait_closed()
    finally:
        connected_clients.remove(websocket)

async def broadcast(message):
    if connected_clients:
        await asyncio.gather(
            *[client.send(message) for client in connected_clients]
        )

async def handler(websocket):
    await register(websocket)

    async for message in websocket:
        data = json.loads(message)

        # Process the incoming data
        value = float(data.get("value", 0))
        processed = processor.process(value)

        # Broadcast processed data
        await broadcast(json.dumps(processed))

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        await asyncio.Future()  # Run forever

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This example maintains a sliding window of values and computes statistics in real-time as new data arrives. This pattern is useful for monitoring systems, financial data processing, and other applications that need to track trends over time.

Authentication and Security

In production applications, authentication is critical. Here's how I typically handle WebSocket authentication:

import asyncio
import json
import websockets
import jwt

# Secret key for JWT
SECRET_KEY = "your-secret-key"

# Authenticate function
async def authenticate(token):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.InvalidTokenError:
        return None

async def handler(websocket):
    # Wait for authentication message
    try:
        message = await asyncio.wait_for(websocket.recv(), timeout=10.0)
        data = json.loads(message)

        if "token" not in data:
            await websocket.close(1008, "Authentication required")
            return

        # Verify token
        user = await authenticate(data["token"])
        if not user:
            await websocket.close(1008, "Invalid credentials")
            return

        # Authentication successful
        await websocket.send(json.dumps({"status": "authenticated", "user": user["username"]}))

        # Process messages
        async for message in websocket:
            # Handle authorized messages
            pass

    except asyncio.TimeoutError:
        await websocket.close(1013, "Authentication timeout")

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        await asyncio.Future()  # Run forever

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This pattern requires clients to send an authentication token on connection. The server validates the token before allowing the client to proceed with normal communication.

Efficient Binary Data Transfer

For applications dealing with large datasets, binary data transfer is more efficient than JSON. I use this pattern for sending binary data:

import asyncio
import websockets
import numpy as np
import io

async def handler(websocket):
    async for message in websocket:
        # Assume message contains parameters for generating data
        # For example, shape of the array to generate

        # Generate a large numpy array
        data = np.random.random((1000, 1000))

        # Convert array to binary
        buffer = io.BytesIO()
        np.save(buffer, data, allow_pickle=False)
        binary_data = buffer.getvalue()

        # Send binary data
        await websocket.send(binary_data)

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        await asyncio.Future()  # Run forever

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

On the client side, you would receive and decode the binary data:

import asyncio
import websockets
import numpy as np
import io

async def receive_data():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        # Send request for data
        await websocket.send('{"shape": [1000, 1000]}')

        # Receive binary data
        binary_data = await websocket.recv()

        # Convert binary to numpy array
        buffer = io.BytesIO(binary_data)
        data = np.load(buffer, allow_pickle=False)

        print(f"Received array with shape: {data.shape}")

asyncio.run(receive_data())
Enter fullscreen mode Exit fullscreen mode

This approach is particularly useful for scientific applications, image processing, or any scenario involving large datasets.

Handling Connection Interruptions

Network connections are inherently unreliable. I implement reconnection logic to maintain system resilience:

import asyncio
import websockets
import random
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def connect_with_retry(uri, max_retries=None):
    retries = 0
    backoff_factor = 1.5
    base_delay = 1.0

    while max_retries is None or retries < max_retries:
        try:
            connection = await websockets.connect(uri)
            logger.info(f"Connected to {uri}")
            return connection
        except (websockets.ConnectionClosed, OSError) as e:
            retries += 1

            if max_retries is not None and retries >= max_retries:
                logger.error(f"Max retries ({max_retries}) exceeded. Giving up.")
                raise

            # Calculate exponential backoff with jitter
            delay = min(60, base_delay * (backoff_factor ** (retries - 1)))
            jitter = random.uniform(0, 0.1 * delay)
            wait_time = delay + jitter

            logger.warning(f"Connection failed: {e}. Retrying in {wait_time:.2f} seconds...")
            await asyncio.sleep(wait_time)

async def client_with_reconnect():
    uri = "ws://localhost:8765"

    while True:
        try:
            websocket = await connect_with_retry(uri)

            async with websocket:
                # Normal message processing
                while True:
                    try:
                        message = await websocket.recv()
                        # Process message
                        print(f"Received: {message}")
                    except websockets.ConnectionClosed:
                        logger.warning("Connection closed. Reconnecting...")
                        break

        except Exception as e:
            logger.error(f"Fatal error: {e}")
            return

if __name__ == "__main__":
    asyncio.run(client_with_reconnect())
Enter fullscreen mode Exit fullscreen mode

This pattern implements exponential backoff with jitter, which prevents thundering herd problems when multiple clients try to reconnect simultaneously after a server outage.

Optimizing Performance with Connection Pooling

For applications making outgoing WebSocket connections to multiple services, connection pooling improves efficiency:

import asyncio
import websockets
import logging
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class WebSocketPool:
    def __init__(self, uri, pool_size=5, timeout=10):
        self.uri = uri
        self.pool_size = pool_size
        self.timeout = timeout
        self.pool = asyncio.Queue(maxsize=pool_size)
        self.active_connections = 0
        self.lock = asyncio.Lock()

    async def initialize(self):
        for _ in range(self.pool_size):
            connection = await self._create_connection()
            if connection:
                await self.pool.put(connection)

    async def _create_connection(self):
        try:
            self.active_connections += 1
            return await websockets.connect(self.uri, ping_interval=20, ping_timeout=20)
        except Exception as e:
            self.active_connections -= 1
            logger.error(f"Failed to create connection: {e}")
            return None

    async def get_connection(self):
        try:
            # Try to get a connection from the pool
            connection = await asyncio.wait_for(self.pool.get(), timeout=self.timeout)

            # Check if connection is still open
            if connection.open:
                return connection

            # Connection was closed, create a new one
            logger.info("Replacing closed connection")
            connection = await self._create_connection()
            return connection

        except asyncio.TimeoutError:
            # Pool is empty, create a new connection if under limit
            async with self.lock:
                if self.active_connections < self.pool_size * 2:  # Allow some overflow
                    logger.info("Creating overflow connection")
                    return await self._create_connection()
                else:
                    raise Exception("Connection pool exhausted")

    async def release_connection(self, connection):
        if connection.open:
            try:
                await self.pool.put_nowait(connection)
            except asyncio.QueueFull:
                # If the pool is full, close the extra connection
                await connection.close()
                self.active_connections -= 1
        else:
            # Connection is closed, decrement counter
            self.active_connections -= 1

    async def close(self):
        while not self.pool.empty():
            connection = await self.pool.get()
            await connection.close()
            self.active_connections -= 1

# Example usage
async def example():
    pool = WebSocketPool("ws://localhost:8765", pool_size=3)
    await pool.initialize()

    async def send_message(msg):
        connection = await pool.get_connection()
        try:
            await connection.send(msg)
            response = await connection.recv()
            return response
        finally:
            await pool.release_connection(connection)

    # Send multiple messages concurrently
    tasks = [send_message(f"Message {i}") for i in range(10)]
    results = await asyncio.gather(*tasks)

    # Close the pool
    await pool.close()

    return results

if __name__ == "__main__":
    results = asyncio.run(example())
    print(results)
Enter fullscreen mode Exit fullscreen mode

This connection pool manages a set of WebSocket connections, reusing them across multiple operations. This reduces the overhead of establishing new connections for each operation.

I've applied these patterns across various domains - from financial trading systems to IoT platforms - and they've proven reliable and efficient. The asynchronous nature of Python's WebSocket implementations makes it particularly well-suited for handling large numbers of concurrent connections with minimal resource usage.

Remember that WebSockets maintain persistent connections, so they're ideal for applications requiring real-time updates or bidirectional communication. However, they do require careful management of connection state and proper error handling to ensure reliability.

When implementing these patterns in production systems, consider adding monitoring, detailed logging, and health checks to maintain system visibility. With proper implementation, Python WebSockets can provide exceptional performance for real-time data processing applications.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Heroku

Built for developers, by developers.

Whether you're building a simple prototype or a business-critical product, Heroku's fully-managed platform gives you the simplest path to delivering apps quickly — using the tools and languages you already love!

Learn More

Top comments (0)

Jetbrains image

Build Secure, Ship Fast

Discover best practices to secure CI/CD without slowing down your pipeline.

Read more

👋 Kindness is contagious

Dive into this insightful write-up, celebrated within the collaborative DEV Community. Developers at any stage are invited to contribute and elevate our shared skills.

A simple "thank you" can boost someone’s spirits—leave your kudos in the comments!

On DEV, exchanging ideas fuels progress and deepens our connections. If this post helped you, a brief note of thanks goes a long way.

Okay