DEV Community

郑沛沛
郑沛沛

Posted on

Real-Time Apps with WebSockets in Python: Chat, Notifications, and Live Data

WebSockets enable real-time, bidirectional communication. Here's how to build real-time features that actually scale.

WebSocket vs HTTP

HTTP: Client asks, server answers. One request, one response.
WebSocket: Both sides can send messages anytime. Persistent connection.

Use WebSockets for: chat, live notifications, collaborative editing, live dashboards, multiplayer games.

Basic WebSocket Server with FastAPI

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import list

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

    async def send_personal(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{username}")
async def websocket_endpoint(websocket: WebSocket, username: str):
    await manager.connect(websocket)
    await manager.broadcast(f"{username} joined the chat")
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"{username}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"{username} left the chat")
Enter fullscreen mode Exit fullscreen mode

JavaScript Client

const ws = new WebSocket('ws://localhost:8000/ws/alice');

ws.onopen = () => {
    console.log('Connected');
    ws.send('Hello everyone!');
};

ws.onmessage = (event) => {
    console.log('Received:', event.data);
    // Update UI
    const messages = document.getElementById('messages');
    messages.innerHTML += `<p>${event.data}</p>`;
};

ws.onclose = () => console.log('Disconnected');
ws.onerror = (error) => console.error('Error:', error);
Enter fullscreen mode Exit fullscreen mode

Room-Based Chat

from collections import defaultdict

class RoomManager:
    def __init__(self):
        self.rooms: dict[str, list[WebSocket]] = defaultdict(list)

    async def join_room(self, room: str, websocket: WebSocket):
        await websocket.accept()
        self.rooms[room].append(websocket)

    def leave_room(self, room: str, websocket: WebSocket):
        self.rooms[room].remove(websocket)
        if not self.rooms[room]:
            del self.rooms[room]

    async def broadcast_to_room(self, room: str, message: str):
        for ws in self.rooms.get(room, []):
            await ws.send_text(message)

rooms = RoomManager()

@app.websocket("/ws/{room}/{username}")
async def room_chat(websocket: WebSocket, room: str, username: str):
    await rooms.join_room(room, websocket)
    await rooms.broadcast_to_room(room, f"{username} joined #{room}")
    try:
        while True:
            data = await websocket.receive_text()
            await rooms.broadcast_to_room(room, f"{username}: {data}")
    except WebSocketDisconnect:
        rooms.leave_room(room, websocket)
        await rooms.broadcast_to_room(room, f"{username} left #{room}")
Enter fullscreen mode Exit fullscreen mode

JSON Messages with Types

import json
from pydantic import BaseModel

class WSMessage(BaseModel):
    type: str  # "chat", "typing", "status"
    data: dict

@app.websocket("/ws/{username}")
async def typed_websocket(websocket: WebSocket, username: str):
    await manager.connect(websocket)
    try:
        while True:
            raw = await websocket.receive_text()
            msg = WSMessage.model_validate_json(raw)

            if msg.type == "chat":
                await manager.broadcast(json.dumps({
                    "type": "chat",
                    "data": {"user": username, "text": msg.data["text"]}
                }))
            elif msg.type == "typing":
                await manager.broadcast(json.dumps({
                    "type": "typing",
                    "data": {"user": username}
                }))
    except WebSocketDisconnect:
        manager.disconnect(websocket)
Enter fullscreen mode Exit fullscreen mode

Heartbeat / Keep-Alive

import asyncio

async def heartbeat(websocket: WebSocket):
    while True:
        try:
            await asyncio.sleep(30)
            await websocket.send_text(json.dumps({"type": "ping"}))
        except Exception:
            break

@app.websocket("/ws/{username}")
async def ws_with_heartbeat(websocket: WebSocket, username: str):
    await manager.connect(websocket)
    beat_task = asyncio.create_task(heartbeat(websocket))
    try:
        while True:
            data = await websocket.receive_text()
            msg = json.loads(data)
            if msg["type"] == "pong":
                continue
            await manager.broadcast(data)
    except WebSocketDisconnect:
        beat_task.cancel()
        manager.disconnect(websocket)
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Use WebSockets for real-time bidirectional communication
  2. ConnectionManager pattern handles multiple clients cleanly
  3. Room-based architecture for scoped broadcasting
  4. Use JSON messages with a type field for structured communication
  5. Implement heartbeats to detect dead connections

6. For multi-server scaling, use Redis Pub/Sub as a message broker

🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99

Top comments (0)