DEV Community

架构师小白
架构师小白

Posted on

Event Sourcing 实战:让业务数据自己说话

Event Sourcing 实战:让业务数据自己说话

如果你曾经因为"为什么这个订单状态变成这样了"而抓狂,或者因为数据库里的数据对不上而排查到凌晨——那么 Event Sourcing 可能是你一直在找的答案。

什么是 Event Sourcing?

传统架构把当前状态存在数据库里:订单表里存着"已发货",用户表里存着当前余额。这种做法的问题是:状态会覆盖历史

Event Sourcing 的核心思路完全不同:

不要存储当前状态,存储导致状态变化的所有事件。

当前状态是事件的结果,而不是数据本身。

举个例子,银行账户:

传统方式:余额 = 1000元
Event Sourcing:存款(+500) → 取款(-200) → 利息(+10) → 当前余额 = 1310元
Enter fullscreen mode Exit fullscreen mode

第二种方式告诉你的不只是"现在有多少钱",而是完整的资金流动历史

为什么值得用?

1. 完整审计日志是副产品

金融、订单、物流——这些场景天然需要审计。在传统架构里,你需要额外建审计表;在 Event Sourcing 里,你只是在用正确的方式存储数据。

2. 时间旅行成为可能

任意时间点的系统状态都可以重建。如果用户问"三天前的账户是什么情况",你只需要重放事件到那个时间点。这对于调试、复盘、甚至用户纠纷处理都是神器。

3. 事件驱动天然集成

事件就是消息。你可以在事件写入时触发下游流程——通知、分析、实时看板——不需要额外的消息队列介入。

4. 支持高并发写入

事件追加(Append-only)是顺序写的,天然支持高性能日志存储。Kafka、Pulsar 都能轻松承载每秒百万级事件写入。

架构实现

核心组件

命令 -> 聚合根 -> Event Store
                 |
                 v
              投影器 -> 读模型
Enter fullscreen mode Exit fullscreen mode

用代码说话

假设一个电商订单场景:

from dataclasses import dataclass
from datetime import datetime
from typing import List
import uuid

@dataclass
class Event:
    event_id: str
    timestamp: datetime
    aggregate_id: str
    event_type: str

@dataclass
class OrderPlaced(Event):
    customer_id: str
    items: List[dict]
    total_amount: float

@dataclass
class OrderPaid(Event):
    payment_id: str
    amount: float

@dataclass
class OrderShipped(Event):
    tracking_number: str
    carrier: str

class Order:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.events: List[Event] = []
        self.status = "draft"

    def place(self, customer_id: str, items: List[dict], total: float):
        if self.status != "draft":
            raise ValueError(f"订单已是 {self.status} 状态,无法下单")
        self.events.append(OrderPlaced(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.now(),
            aggregate_id=self.order_id,
            event_type="OrderPlaced",
            customer_id=customer_id,
            items=items,
            total_amount=total
        ))
        self.status = "placed"

    def pay(self, payment_id: str, amount: float):
        if self.status != "placed":
            raise ValueError(f"订单已是 {self.status} 状态,无法支付")
        self.events.append(OrderPaid(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.now(),
            aggregate_id=self.order_id,
            event_type="OrderPaid",
            payment_id=payment_id,
            amount=amount
        ))
        self.status = "paid"

    def ship(self, tracking: str, carrier: str):
        if self.status != "paid":
            raise ValueError(f"订单已是 {self.status} 状态,无法发货")
        self.events.append(OrderShipped(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.now(),
            aggregate_id=self.order_id,
            event_type="OrderShipped",
            tracking_number=tracking,
            carrier=carrier
        ))
        self.status = "shipped"

    def get_uncommitted_events(self) -> List[Event]:
        return self.events

    def mark_committed(self):
        self.events = []

    @staticmethod
    def rebuild(events: List[Event]) -> "Order":
        order = Order(events[0].aggregate_id)
        for event in events:
            if isinstance(event, OrderPlaced):
                order.status = "placed"
            elif isinstance(event, OrderPaid):
                order.status = "paid"
            elif isinstance(event, OrderShipped):
                order.status = "shipped"
        order.events = []
        return order
Enter fullscreen mode Exit fullscreen mode

关键点:

  • 命令验证状态:下单、支付、发货只能在正确的状态下执行
  • 事件不可变:一旦写入 Event Store,永不修改
  • 状态重建:通过重放事件可以还原任意时刻的订单状态

Event Store 设计

class OrderRepository:
    def __init__(self, client):
        self.client = client

    def save(self, order: Order):
        events = order.get_uncommitted_events()
        if not events:
            return

        stream_name = f"order-{order.order_id}"
        for event in events:
            self.client.append_event(
                stream=stream_name,
                event_type=event.event_type,
                data=event,
                expected_version=order.version
            )
        order.mark_committed()

    def get_by_id(self, order_id: str) -> Order:
        stream_name = f"order-{order_id}"
        events = self.client.read_stream(stream_name)
        if not events:
            return None
        return Order.rebuild(events)
Enter fullscreen mode Exit fullscreen mode

投影器:构建读模型

事件写入后,你需要把数据投影到读模型(用于查询):

class OrderReadModel:
    def __init__(self, db):
        self.db = db

    def project(self, event: Event):
        if isinstance(event, OrderPlaced):
            self.db.execute(
                "INSERT INTO orders (id, customer, total, status, created_at) VALUES (?, ?, ?, ?, ?)",
                [event.aggregate_id, event.customer_id, event.total_amount, "placed", event.timestamp]
            )
        elif isinstance(event, OrderPaid):
            self.db.execute(
                "UPDATE orders SET status = 'paid', paid_at = ? WHERE id = ?",
                [event.timestamp, event.aggregate_id]
            )
        elif isinstance(event, OrderShipped):
            self.db.execute(
                "UPDATE orders SET status = 'shipped', tracking = ? WHERE id = ?",
                [event.tracking_number, event.aggregate_id]
            )
Enter fullscreen mode Exit fullscreen mode

实战中的挑战

1. 事件版本演化

业务在变,事件结构也会变。需要 Upcasting 来做版本迁移:

def upcast_v1_to_v2(event_data: dict) -> dict:
    if event_data.get("version", 1) < 2:
        event_data["currency"] = event_data.get("currency", "CNY")
    return event_data
Enter fullscreen mode Exit fullscreen mode

2. 快照优化

订单如果有一万条事件,重放会很慢。定期生成快照可以解决这个问题:

class Snapshot:
    stream: str
    version: int
    state: dict
    timestamp: datetime
Enter fullscreen mode Exit fullscreen mode

3. 查询性能

Event Store 是 append-only 日志,复杂查询很慢。需要投影到专门的读模型数据库(PostgreSQL、Elasticsearch、ClickHouse)。

什么时候不该用?

  • 简单 CRUD 应用:状态简单且不需要审计
  • 需要强事务一致性:Event Sourcing 的最终一致性不是所有场景都适合
  • 团队不熟悉:技术债务是真实存在的

工具链推荐

  • EventStoreDB:专门为 Event Sourcing 设计的数据库
  • Marten(.NET):基于 PostgreSQL 的轻量实现
  • EventFlow(.NET):完整的 DDD + ES 框架
  • Confluent Kafka:分布式事件流平台
  • Axon Framework(Java):成熟的 CQRS/ES 框架

总结

Event Sourcing 不是银弹,但它解决的是真实痛点:当你的业务需要完整的历史、可追溯的决策、或者事件驱动的下游集成时,它值得认真考虑。

它的代价是复杂度——你需要在投影器、版本迁移、快照上投入额外的工程量。选择它的理由应该是业务需要,而不是技术酷炫

如果你正在构建一个金融系统、订单处理流程、或者任何需要"为什么变成这样"的场景——现在你知道该往哪个方向走了。


*本文是软件架构系列的延续,建议从系统设计基础指南开始构建你的架构知识体系。

Top comments (0)