DEV Community

架构师小白
架构师小白

Posted on

观察者模式深度指南:构建响应式事件系统的艺术

观察者模式深度指南:构建响应式事件系统的艺术

在软件开发中,我们经常需要在对象之间建立一对多的依赖关系,当一个对象的状态发生变化时,所有依赖于它的对象都能自动得到通知。观察者模式正是解决这一问题的经典设计模式。

为什么需要观察者模式?

想象一下你在开发一个股票交易应用:

  • 用户订阅了某只股票的价格变动
  • 当价格变化时,系统需要通知所有订阅的用户
  • 有的用户用手机APP看,有的用网页,还有的可能需要邮件通知

如果不用观察者模式,你可能需要:

# 糟糕的实现方式
class Stock:
    def update_price(self, new_price):
        # 每加一个通知渠道,就要修改这个类
        if self.mobile_users:
            send_push_notification()
        if self.web_users:
            update_web_ui()
        if self.email_users:
            send_email()
Enter fullscreen mode Exit fullscreen mode

这违反了开闭原则——每增加一种通知方式,都要修改核心类。

观察者模式定义

观察者模式(Observer Pattern) 是一种行为型设计模式,定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,当主题对象的状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。

核心角色

┌─────────────────┐      ┌─────────────────┐
│    Subject      │◄────│    Observer     │
│  (被观察者)     │     │   (观察者)      │
├─────────────────┤     ├─────────────────┤
│ +attach()      │     │ +update()      │
│ +detach()      │     └─────────────────┘
│ +notify()      │            △
└─────────────────┘            │
       ▲                       │
       │ 1:N                  │ implements
       ▼                      │
┌─────────────────┐     ┌─────────────────┐
│ ConcreteSubject │────►│ConcreteObserver│
│ (具体主题)      │     │ (具体观察者)    │
├─────────────────┤     ├─────────────────┤
│ +getState()    │     │ +update()      │
│ +setState()    │     │ +display()    │
└─────────────────┘     └─────────────────┘
Enter fullscreen mode Exit fullscreen mode

类图解释

  • Subject(主题/被观察者):知道所有的观察者,提供attach和detach方法
  • Observer(观察者):为所有的观察者定义统一的更新接口
  • ConcreteSubject:维护真实状态,在状态改变时通知观察者
  • ConcreteObserver:实现Observer的更新接口

Python 实现

基础版本

from abc import ABC, abstractmethod
from typing import List

# 观察者抽象基类
class Observer(ABC):
    @abstractmethod
    def update(self, message: str) -> None:
        pass

# 具体观察者:邮件订阅者
class EmailObserver(Observer):
    def __init__(self, email: str):
        self.email = email

    def update(self, message: str) -> None:
        print(f"📧 发送给 {self.email}: {message}")

# 具体观察者:短信订阅者  
class SMSObserver(Observer):
    def __init__(self, phone: str):
        self.phone = phone

    def update(self, message: str) -> None:
        print(f"📱 发送给 {self.phone}: {message}")

# 具体观察者:APP推送
class AppPushObserver(Observer):
    def __init__(self, device_id: str):
        self.device_id = device_id

    def update(self, message: str) -> None:
        print(f"🔔 推送到设备 {self.device_id}: {message}")

# 主题类
class Subject:
    def __init__(self):
        self._observers: List[Observer] = []

    def attach(self, observer: Observer) -> None:
        self._observers.append(observer)
        print(f"✅ 已添加观察者")

    def detach(self, observer: Observer) -> None:
        self._observers.remove(observer)
        print(f"❌ 已移除观察者")

    def notify(self, message: str) -> None:
        for observer in self._observers:
            observer.update(message)

# 股票价格主题
class StockSubject(Subject):
    def __init__(self, symbol: str):
        super().__init__()
        self._symbol = symbol
        self._price = 0.0

    @property
    def price(self) -> float:
        return self._price

    @price.setter
    def price(self, new_price: float) -> None:
        old_price = self._price
        self._price = new_price
        if old_price != new_price:
            self.notify(f"股票 {self._symbol} 价格从 {old_price} 变更为 {new_price}")

# 使用示例
stock = StockSubject("AAPL")
email_user = EmailObserver("user@example.com")
sms_user = SMSObserver("13800138000")
app_user = AppPushObserver("device123")

stock.attach(email_user)
stock.attach(sms_user)
stock.attach(app_user)

print("\n=== 价格第一次变更 ===")
stock.price = 150.0

print("\n=== 价格第二次变更 ===") 
stock.price = 155.0
Enter fullscreen mode Exit fullscreen mode

运行结果:

✅ 已添加观察者
✅ 已添加观察者
✅ 已添加观察者

=== 价格第一次变更 ===
📧 发送给 user@example.com: 股票 AAPL 价格从 0.0 变更为 150.0
📱 发送给 13800138000: 股票 AAPL 价格从 0.0 变更为 150.0
🔔 推送到设备 device123: 股票 AAPL 价格从 0.0 变更为 150.0

=== 价格第二次变更 ===
📧 发送给 user@example.com: 股票 AAPL 价格从 150.0 变更为 155.0
📱 发送给 13800138000: 股票 AAPL 价格从 150.0 变更为 155.0
🔔 推送到设备 device123: 股票 AAPL 价格从 150.0 变更为 155.0
Enter fullscreen mode Exit fullscreen mode

实际应用场景

1. 微信公众号/博客订阅

class BlogSubscriber(Observer):
    def __init__(self, name: str):
        self.name = name
        self.articles = []

    def update(self, article: dict) -> None:
        self.articles.append(article)
        print(f"✍️ {self.name} 收到新文章: {article['title']}")

class Blog(Subject):
    def __init__(self):
        super().__init__()
        self._articles = []

    def publish(self, title: str, content: str) -> None:
        article = {"title": title, "content": content}
        self._articles.append(article)
        self.notify(article)

# 使用
blog = Blog()
blog.attach(BlogSubscriber("读者小明"))
blog.attach(BlogSubscriber("读者小红"))

blog.publish("深入理解Python装饰器", "装饰器是Python最强大的特性之一...")
Enter fullscreen mode Exit fullscreen mode

2. 事件总线(Event Bus)

class EventBus:
    """全局事件总线"""
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._topics = {}
        return cls._instance

    def subscribe(self, topic: str, callback: Callable) -> None:
        if topic not in self._topics:
            self._topics[topic] = []
        self._topics[topic].append(callback)

    def publish(self, topic: str, data: Any) -> None:
        if topic in self._topics:
            for callback in self._topics[topic]:
                callback(data)

# 使用
event_bus = EventBus()

def handle_user_registered(user: dict):
    print(f"👋 欢迎新用户: {user['name']}")

def handle_user_registeredAnalytics(user: dict):
    print(f"📊 上报注册事件: {user}")

event_bus.subscribe("user.registered", handle_user_registered)
event_bus.subscribe("user.registered", handle_user_registeredAnalytics)

# 触发事件
event_bus.publish("user.registered", {"name": "张三", "email": "zhangsan@example.com"})
Enter fullscreen mode Exit fullscreen mode

观察者模式 vs 发布-订阅模式

很多人把这两个概念混为一谈,但实际上有细微区别:

特性 观察者模式 发布-订阅模式
耦合度 Subject 和 Observer 直接通信 通过消息队列解耦
通信方式 同步调用 通常异步
消息格式 紧耦合 解耦(通过消息格式化)
典型实现 Java Observable/Observable Kafka、RabbitMQ

优点与缺点

✅ 优点

  1. 开闭原则:新增观察者无需修改主题代码
  2. 动态关系:可以在运行时建立/解除观察关系
  3. 触发机制:支持广播式通信
  4. 解耦:主题和观察者之间是抽象耦合

❌ 缺点

  1. 通知顺序:如果观察者多,通知时间可能较长
  2. 循环依赖:注意避免死循环
  3. 内存泄漏:未及时detach可能导致问题

实际框架中的运用

Django Signals

from django.db.models.signals import post_save
from django.dispatch import receiver

@receiver(post_save, sender=User)
def user_created_handler(sender, instance, created, **kwargs):
    if created:
        send_welcome_email(instance)
        create_default_profile(instance)
        track_registration_event(instance)
Enter fullscreen mode Exit fullscreen mode

JavaScript EventEmitter

const EventEmitter = require('events');

class Stock extends EventEmitter {
  setPrice(newPrice) {
    const oldPrice = this.price;
    this.price = newPrice;
    if (oldPrice !== newPrice) {
      this.emit('priceChanged', { 
        oldPrice, 
        newPrice,
        symbol: this.symbol 
      });
    }
  }
}

const stock = new Stock('AAPL');
stock.on('priceChanged', (data) => console.log('价格变动:', data));
stock.setPrice(150);
Enter fullscreen mode Exit fullscreen mode

总结

观察者模式是建筑响应式系统的基础:

  1. 核心思想:建立一对多的依赖关系
  2. 实现关键:抽象Observer接口 + Subject管理
  3. 常见变体:事件总线、信号系统
  4. 最佳实践:确保及时detach,避免内存泄漏

掌握观察者模式,你就能轻松构建各种响应式、系统联动、事件驱动的应用!


下期预告:我们将介绍与观察者模式互补的发布-订阅模式,以及如何在大型系统中使用消息队列实现真正的解耦。

如果你喜欢这篇文章,欢迎点赞、收藏、转发!关注后第一时间获取最新教程。

设计模式 #软件架构 #Python #后端开发

Top comments (0)