前言
在现代Web应用中,实时通知已经成为标配功能。无论是即时消息、系统告警、协作编辑还是实时数据推送,WebSocket都是最可靠的技术选择。本文将手把手教你用FastAPI和WebSocket构建一个生产级的实时通知系统,支持多实例部署和消息持久化。
系统架构概览
我们的实时通知系统包含以下核心组件:
客户端 <--WebSocket--> FastAPI实例1 <--Redis Pub/Sub--> FastAPI实例2
|
Redis/DB (消息持久化)
- ConnectionManager:管理所有WebSocket连接
- 用户路由:将消息精确推送给目标用户
- Redis Pub/Sub:支持多实例部署时的消息广播
- 消息持久化:离线消息存储和补发
- 心跳保活:检测断线并自动重连
基础架构:ConnectionManager
首先实现一个连接管理器,负责维护所有活跃的WebSocket连接。
[Code example omitted for brevity - see full article on WD Tech Blog]
FastAPI WebSocket端点
接下来实现WebSocket端点,处理连接、消息接收和心跳保活。
[Code example omitted for brevity - see full article on WD Tech Blog]
Redis Pub/Sub:多实例支持
在生产环境中,通常会有多个FastAPI实例运行。Redis Pub/Sub用于在实例之间传递消息。
import redis.asyncio as redis
import json
from typing import Optional
import asyncio
class RedisPubSub:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.pubsub = None
self._running = False
async def connect(self):
"""建立Redis连接"""
self.redis = redis.from_url(self.redis_url, decode_responses=True)
self.pubsub = self.redis.pubsub()
self._running = True
async def publish(self, channel: str, message: dict):
"""发布消息到指定频道"""
await self.redis.publish(channel, json.dumps(message, ensure_ascii=False))
async def subscribe(self, channel: str, callback):
"""订阅频道并注册回调"""
await self.pubsub.subscribe(channel)
asyncio.create_task(self._listen(callback))
async def _listen(self, callback):
"""监听消息"""
async for message in self.pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await callback(data)
async def close(self):
"""关闭连接"""
self._running = False
if self.pubsub:
await self.pubsub.unsubscribe()
await self.pubsub.close()
if self.redis:
await self.redis.close()
# 初始化Redis Pub/Sub
pubsub = RedisPubSub()
@app.on_event("startup")
async def startup():
await pubsub.connect()
# 订阅通知频道
async def handle_notification(message: dict):
target_user = message.get("target_user")
if target_user:
await manager.send_personal_message(message, target_user)
else:
await manager.broadcast(message, message.get("channel"))
await pubsub.subscribe("notifications", handle_notification)
@app.on_event("shutdown")
async def shutdown():
await pubsub.close()
消息持久化:离线消息补发
当用户离线时,消息需要被存储起来,等用户上线后补发。
import redis.asyncio as redis
from datetime import datetime, timedelta
import json
class MessageStore:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis: Optional[redis.Redis] = None
async def connect(self):
self.redis = redis.from_url(self.redis_url, decode_responses=True)
async def store_offline_message(self, user_id: str, message: dict):
"""存储离线消息到Redis列表"""
message["stored_at"] = datetime.now().isoformat()
message_id = str(uuid.uuid4())
await self.redis.hset(
f"offline:{user_id}",
message_id,
json.dumps(message, ensure_ascii=False)
)
# 设置过期时间(7天)
await self.redis.expire(f"offline:{user_id}", timedelta(days=7))
async def get_offline_messages(self, user_id: str) -> list:
"""获取离线消息"""
messages = []
data = await self.redis.hgetall(f"offline:{user_id}")
for msg_id, msg_data in data.items():
messages.append(json.loads(msg_data))
return messages
async def clear_offline_messages(self, user_id: str):
"""清除已读的离线消息"""
await self.redis.delete(f"offline:{user_id}")
async def close(self):
if self.redis:
await self.redis.close()
message_store = MessageStore()
# 发送通知的统一接口
async def send_notification(
target_user: str,
notification_type: str,
content: str,
channel: Optional[str] = None,
data: Optional[dict] = None,
):
"""发送通知的统一入口"""
message = {
"id": str(uuid.uuid4()),
"type": notification_type,
"content": content,
"data": data or {},
"timestamp": datetime.now().isoformat(),
"target_user": target_user,
"channel": channel,
}
if manager.is_online(target_user):
# 用户在线,直接发送
await manager.send_personal_message(message, target_user)
else:
# 用户离线,存储消息
await message_store.store_offline_message(target_user, message)
# 通过Redis广播(多实例场景)
await pubsub.publish("notifications", message)
REST API:发送通知接口
除了WebSocket,还需要提供REST API让后端服务发送通知。
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
class NotificationRequest(BaseModel):
target_user: str
notification_type: str
content: str
channel: Optional[str] = None
data: Optional[dict] = None
@app.post("/api/notifications/send")
async def send_notification_api(request: NotificationRequest):
"""发送通知的REST API"""
await send_notification(
target_user=request.target_user,
notification_type=request.notification_type,
content=request.content,
channel=request.channel,
data=request.data,
)
return {"status": "ok", "message": "通知已发送"}
@app.post("/api/notifications/broadcast")
async def broadcast_notification(
notification_type: str,
content: str,
channel: Optional[str] = None,
):
"""广播通知"""
message = {
"id": str(uuid.uuid4()),
"type": notification_type,
"content": content,
"timestamp": datetime.now().isoformat(),
"channel": channel,
}
await manager.broadcast(message, channel)
await pubsub.publish("notifications", message)
return {"status": "ok", "message": "广播通知已发送"}
@app.get("/api/notifications/online-count")
async def get_online_count():
"""获取在线用户数"""
return {"online_count": manager.get_online_count()}
前端JavaScript代码
最后是前端JavaScript代码,处理WebSocket连接、心跳保活和消息接收。
class NotificationClient {
constructor(token) {
this.token = token;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.heartbeatInterval = null;
this.handlers = new Map();
}
connect() {
const wsUrl = `wss://your-domain.com/ws/notifications?token=${this.token}`;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('WebSocket已连接');
this.reconnectAttempts = 0;
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'pong') return;
// 触发对应的消息处理器
const handler = this.handlers.get(message.type);
if (handler) {
handler(message);
}
// 触发通用处理器
const globalHandler = this.handlers.get('*');
if (globalHandler) {
globalHandler(message);
}
};
this.ws.onclose = (event) => {
console.log(`WebSocket已断开: code=${event.code}`);
this.stopHeartbeat();
this.reconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
};
}
on(type, handler) {
this.handlers.set(type, handler);
}
subscribe(channel) {
this.send({ type: 'subscribe', channel });
}
unsubscribe(channel) {
this.send({ type: 'unsubscribe', channel });
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
this.send({ type: 'ping' });
}, 30000); // 每30秒发送心跳
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('达到最大重连次数,停止重连');
return;
}
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`${delay}ms后尝试第${this.reconnectAttempts}次重连...`);
setTimeout(() => this.connect(), delay);
}
disconnect() {
this.stopHeartbeat();
if (this.ws) {
this.ws.close();
}
}
}
// 使用示例
const client = new NotificationClient('your-jwt-token');
client.on('new_message', (msg) => {
showNotification(msg.content, msg.data);
});
client.on('system_alert', (msg) => {
showAlertBanner(msg.content);
});
client.on('*', (msg) => {
console.log('收到通知:', msg);
});
client.connect();
部署建议
Nginx配置
location /ws/ {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 86400; # WebSocket长连接超时
}
性能优化建议
- 连接数管理:单实例建议控制在10000个并发连接以内
- 消息压缩:大消息使用gzip压缩后再发送
- 限流保护:对消息发送频率进行限制,防止滥用
- 监控告警:监控在线连接数、消息延迟和异常断线率
总结
本文实现了一个完整的实时通知系统,核心特性包括:
- ConnectionManager:统一的连接管理
- 用户级路由:精确的消息推送
- Redis Pub/Sub:多实例消息广播
- 消息持久化:离线消息存储和补发
- 心跳保活:自动检测断线并重连
- 频道订阅:灵活的消息分组
这个架构可以直接用于生产环境,根据实际需求进行水平扩展即可。
Originally published at WD Tech Blog
More tech articles at wdsega.github.io
Top comments (0)