Event Sourcing 实战:让业务数据自己说话
如果你曾经因为"为什么这个订单状态变成这样了"而抓狂,或者因为数据库里的数据对不上而排查到凌晨——那么 Event Sourcing 可能是你一直在找的答案。
什么是 Event Sourcing?
传统架构把当前状态存在数据库里:订单表里存着"已发货",用户表里存着当前余额。这种做法的问题是:状态会覆盖历史。
Event Sourcing 的核心思路完全不同:
不要存储当前状态,存储导致状态变化的所有事件。
当前状态是事件的结果,而不是数据本身。
举个例子,银行账户:
传统方式:余额 = 1000元
Event Sourcing:存款(+500) → 取款(-200) → 利息(+10) → 当前余额 = 1310元
第二种方式告诉你的不只是"现在有多少钱",而是完整的资金流动历史。
为什么值得用?
1. 完整审计日志是副产品
金融、订单、物流——这些场景天然需要审计。在传统架构里,你需要额外建审计表;在 Event Sourcing 里,你只是在用正确的方式存储数据。
2. 时间旅行成为可能
任意时间点的系统状态都可以重建。如果用户问"三天前的账户是什么情况",你只需要重放事件到那个时间点。这对于调试、复盘、甚至用户纠纷处理都是神器。
3. 事件驱动天然集成
事件就是消息。你可以在事件写入时触发下游流程——通知、分析、实时看板——不需要额外的消息队列介入。
4. 支持高并发写入
事件追加(Append-only)是顺序写的,天然支持高性能日志存储。Kafka、Pulsar 都能轻松承载每秒百万级事件写入。
架构实现
核心组件
命令 -> 聚合根 -> Event Store
|
v
投影器 -> 读模型
用代码说话
假设一个电商订单场景:
from dataclasses import dataclass
from datetime import datetime
from typing import List
import uuid
@dataclass
class Event:
event_id: str
timestamp: datetime
aggregate_id: str
event_type: str
@dataclass
class OrderPlaced(Event):
customer_id: str
items: List[dict]
total_amount: float
@dataclass
class OrderPaid(Event):
payment_id: str
amount: float
@dataclass
class OrderShipped(Event):
tracking_number: str
carrier: str
class Order:
def __init__(self, order_id: str):
self.order_id = order_id
self.events: List[Event] = []
self.status = "draft"
def place(self, customer_id: str, items: List[dict], total: float):
if self.status != "draft":
raise ValueError(f"订单已是 {self.status} 状态,无法下单")
self.events.append(OrderPlaced(
event_id=str(uuid.uuid4()),
timestamp=datetime.now(),
aggregate_id=self.order_id,
event_type="OrderPlaced",
customer_id=customer_id,
items=items,
total_amount=total
))
self.status = "placed"
def pay(self, payment_id: str, amount: float):
if self.status != "placed":
raise ValueError(f"订单已是 {self.status} 状态,无法支付")
self.events.append(OrderPaid(
event_id=str(uuid.uuid4()),
timestamp=datetime.now(),
aggregate_id=self.order_id,
event_type="OrderPaid",
payment_id=payment_id,
amount=amount
))
self.status = "paid"
def ship(self, tracking: str, carrier: str):
if self.status != "paid":
raise ValueError(f"订单已是 {self.status} 状态,无法发货")
self.events.append(OrderShipped(
event_id=str(uuid.uuid4()),
timestamp=datetime.now(),
aggregate_id=self.order_id,
event_type="OrderShipped",
tracking_number=tracking,
carrier=carrier
))
self.status = "shipped"
def get_uncommitted_events(self) -> List[Event]:
return self.events
def mark_committed(self):
self.events = []
@staticmethod
def rebuild(events: List[Event]) -> "Order":
order = Order(events[0].aggregate_id)
for event in events:
if isinstance(event, OrderPlaced):
order.status = "placed"
elif isinstance(event, OrderPaid):
order.status = "paid"
elif isinstance(event, OrderShipped):
order.status = "shipped"
order.events = []
return order
关键点:
- 命令验证状态:下单、支付、发货只能在正确的状态下执行
- 事件不可变:一旦写入 Event Store,永不修改
- 状态重建:通过重放事件可以还原任意时刻的订单状态
Event Store 设计
class OrderRepository:
def __init__(self, client):
self.client = client
def save(self, order: Order):
events = order.get_uncommitted_events()
if not events:
return
stream_name = f"order-{order.order_id}"
for event in events:
self.client.append_event(
stream=stream_name,
event_type=event.event_type,
data=event,
expected_version=order.version
)
order.mark_committed()
def get_by_id(self, order_id: str) -> Order:
stream_name = f"order-{order_id}"
events = self.client.read_stream(stream_name)
if not events:
return None
return Order.rebuild(events)
投影器:构建读模型
事件写入后,你需要把数据投影到读模型(用于查询):
class OrderReadModel:
def __init__(self, db):
self.db = db
def project(self, event: Event):
if isinstance(event, OrderPlaced):
self.db.execute(
"INSERT INTO orders (id, customer, total, status, created_at) VALUES (?, ?, ?, ?, ?)",
[event.aggregate_id, event.customer_id, event.total_amount, "placed", event.timestamp]
)
elif isinstance(event, OrderPaid):
self.db.execute(
"UPDATE orders SET status = 'paid', paid_at = ? WHERE id = ?",
[event.timestamp, event.aggregate_id]
)
elif isinstance(event, OrderShipped):
self.db.execute(
"UPDATE orders SET status = 'shipped', tracking = ? WHERE id = ?",
[event.tracking_number, event.aggregate_id]
)
实战中的挑战
1. 事件版本演化
业务在变,事件结构也会变。需要 Upcasting 来做版本迁移:
def upcast_v1_to_v2(event_data: dict) -> dict:
if event_data.get("version", 1) < 2:
event_data["currency"] = event_data.get("currency", "CNY")
return event_data
2. 快照优化
订单如果有一万条事件,重放会很慢。定期生成快照可以解决这个问题:
class Snapshot:
stream: str
version: int
state: dict
timestamp: datetime
3. 查询性能
Event Store 是 append-only 日志,复杂查询很慢。需要投影到专门的读模型数据库(PostgreSQL、Elasticsearch、ClickHouse)。
什么时候不该用?
- 简单 CRUD 应用:状态简单且不需要审计
- 需要强事务一致性:Event Sourcing 的最终一致性不是所有场景都适合
- 团队不熟悉:技术债务是真实存在的
工具链推荐
- EventStoreDB:专门为 Event Sourcing 设计的数据库
- Marten(.NET):基于 PostgreSQL 的轻量实现
- EventFlow(.NET):完整的 DDD + ES 框架
- Confluent Kafka:分布式事件流平台
- Axon Framework(Java):成熟的 CQRS/ES 框架
总结
Event Sourcing 不是银弹,但它解决的是真实痛点:当你的业务需要完整的历史、可追溯的决策、或者事件驱动的下游集成时,它值得认真考虑。
它的代价是复杂度——你需要在投影器、版本迁移、快照上投入额外的工程量。选择它的理由应该是业务需要,而不是技术酷炫。
如果你正在构建一个金融系统、订单处理流程、或者任何需要"为什么变成这样"的场景——现在你知道该往哪个方向走了。
*本文是软件架构系列的延续,建议从系统设计基础指南开始构建你的架构知识体系。
Top comments (0)