WebSockets enable real-time, bidirectional communication. Here's how to build real-time features that actually scale.
WebSocket vs HTTP
HTTP: Client asks, server answers. One request, one response.
WebSocket: Both sides can send messages anytime. Persistent connection.
Use WebSockets for: chat, live notifications, collaborative editing, live dashboards, multiplayer games.
Basic WebSocket Server with FastAPI
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import list
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
async def send_personal(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{username}")
async def websocket_endpoint(websocket: WebSocket, username: str):
await manager.connect(websocket)
await manager.broadcast(f"{username} joined the chat")
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"{username}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"{username} left the chat")
JavaScript Client
const ws = new WebSocket('ws://localhost:8000/ws/alice');
ws.onopen = () => {
console.log('Connected');
ws.send('Hello everyone!');
};
ws.onmessage = (event) => {
console.log('Received:', event.data);
// Update UI
const messages = document.getElementById('messages');
messages.innerHTML += `<p>${event.data}</p>`;
};
ws.onclose = () => console.log('Disconnected');
ws.onerror = (error) => console.error('Error:', error);
Room-Based Chat
from collections import defaultdict
class RoomManager:
def __init__(self):
self.rooms: dict[str, list[WebSocket]] = defaultdict(list)
async def join_room(self, room: str, websocket: WebSocket):
await websocket.accept()
self.rooms[room].append(websocket)
def leave_room(self, room: str, websocket: WebSocket):
self.rooms[room].remove(websocket)
if not self.rooms[room]:
del self.rooms[room]
async def broadcast_to_room(self, room: str, message: str):
for ws in self.rooms.get(room, []):
await ws.send_text(message)
rooms = RoomManager()
@app.websocket("/ws/{room}/{username}")
async def room_chat(websocket: WebSocket, room: str, username: str):
await rooms.join_room(room, websocket)
await rooms.broadcast_to_room(room, f"{username} joined #{room}")
try:
while True:
data = await websocket.receive_text()
await rooms.broadcast_to_room(room, f"{username}: {data}")
except WebSocketDisconnect:
rooms.leave_room(room, websocket)
await rooms.broadcast_to_room(room, f"{username} left #{room}")
JSON Messages with Types
import json
from pydantic import BaseModel
class WSMessage(BaseModel):
type: str # "chat", "typing", "status"
data: dict
@app.websocket("/ws/{username}")
async def typed_websocket(websocket: WebSocket, username: str):
await manager.connect(websocket)
try:
while True:
raw = await websocket.receive_text()
msg = WSMessage.model_validate_json(raw)
if msg.type == "chat":
await manager.broadcast(json.dumps({
"type": "chat",
"data": {"user": username, "text": msg.data["text"]}
}))
elif msg.type == "typing":
await manager.broadcast(json.dumps({
"type": "typing",
"data": {"user": username}
}))
except WebSocketDisconnect:
manager.disconnect(websocket)
Heartbeat / Keep-Alive
import asyncio
async def heartbeat(websocket: WebSocket):
while True:
try:
await asyncio.sleep(30)
await websocket.send_text(json.dumps({"type": "ping"}))
except Exception:
break
@app.websocket("/ws/{username}")
async def ws_with_heartbeat(websocket: WebSocket, username: str):
await manager.connect(websocket)
beat_task = asyncio.create_task(heartbeat(websocket))
try:
while True:
data = await websocket.receive_text()
msg = json.loads(data)
if msg["type"] == "pong":
continue
await manager.broadcast(data)
except WebSocketDisconnect:
beat_task.cancel()
manager.disconnect(websocket)
Key Takeaways
- Use WebSockets for real-time bidirectional communication
- ConnectionManager pattern handles multiple clients cleanly
- Room-based architecture for scoped broadcasting
- Use JSON messages with a type field for structured communication
- Implement heartbeats to detect dead connections
6. For multi-server scaling, use Redis Pub/Sub as a message broker
🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99
Top comments (0)