模式

一种订阅机制, 在可观察对象事件发生时通知多个 “观察” 该对象的其他对象。中文以订阅者(观察者)和订阅对象(可观察对象)更容易理解,而发布者理解为统一的通知部门。

啊〰老师老师,有人就要问了,为什么不用Kafka?Redis?RabbitMQ?
没有为什么,Kafka、Redis、RabbitMQ都是消息队列,但观察者模式是一种更加通用的模式,可以用于非使命必达的场景。

  1. 发布者 (Publisher):
    • 定义:当可观察对象发生变更,筛选对应的订阅者并发布他们关注的内容
  2. 订阅者 (Subscriber):
    • 定义:除了有update方法,订阅者还需要实现逻辑来处理发布者的通知参数

场景

这个模式的生活场景巨多,就比如 一蓑烟雨 的博客就有文章订阅 哈哈哈

  • 邮箱订阅:给感兴趣的人推送更新,当然现在不感兴趣也会被迫收到。
  • 期刊订阅:小学订阅的小学生之友,还有英语老师让大家(可自愿)订阅的英语报。
  • 菜市场:和老板娘说有漂亮的五花肉记得打电话给我。就是她有时候会忘记。
  • 群聊通知:排除掉开启了免打扰的成员,剩下的都是订阅者。

案例

简单点

一个商品降价订阅通知,商品为小米SU7,为了能在线分享用 TypeScript 写案例分享。

以下代码点击 codesandbox 按钮即可运行。
Edit ThatCoder-Design

观察者接口

定义了基本的观察者接口,有观察者的信息和可观察对象的变更回调方法update()

观察者接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Observer.ts 观察者接口
export interface Observer {
  // 可观察对象变更回调
  update(product: string, price: number): void;
  userUUID: string;
  email: string;
  subscriptionType: SubscriptionType;
  discountThreshold?: number; // 仅对 DISCOUNT_TO 类型有效
}

// 订阅类型枚举
export class SubscriptionType {
  private constructor(public readonly model: string) {}

  static readonly IN_STOCK = new SubscriptionType("IN_STOCK");
  static readonly DISCOUNT = new SubscriptionType("DISCOUNT");
  static readonly DISCOUNT_TO = new SubscriptionType("DISCOUNT_TO");

  getDescription(): string {
    switch (this.model) {
      case "IN_STOCK":
        return "来货通知";
      case "DISCOUNT":
        return "降价通知";
      case "DISCOUNT_TO":
        return "降价到预期通知";
      default:
        return "未知订阅";
    }
  }
}

观察者实现

实现了观察者,增加了发送邮箱这个实际的通知方法,在update()实现通知调用

观察者接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// UserObserver.ts 实现具体的观察者,处理不同类型的通知
import {logger} from "../util/Logger"
import { Observer, SubscriptionType } from "./Observer";

export class UserObserver implements Observer {
  constructor(
    public userUUID: string,
    public email: string,
    public subscriptionType: SubscriptionType,
    public discountThreshold?: number // 仅对 DISCOUNT_TO 类型有效
  ) {}

  update(product: string, price: number): void {
    switch (this.subscriptionType) {
      case SubscriptionType.IN_STOCK:
        this.sendEmailNotification(`${product} 来货了!`);
        break;
      case SubscriptionType.DISCOUNT:
        this.sendEmailNotification(`${product} 现在已经降价至 $${price}!`);
        break;
      case SubscriptionType.DISCOUNT_TO:
        this.sendEmailNotification(
          `${product} 现在已经降价至 $${price}, 满足您期待的降价 $${
            this.discountThreshold ?? 0
          }% !`
        );
        break;
    }
  }

  private sendEmailNotification(message: string): void {
    logger.info(`发送邮件 ${this.email}: ${message}`);
  }
}

可观察者接口

定义了基本的可观察者接口,主要有订阅、取消订阅、通知三要素。

可观察者接口
1
2
3
4
5
6
7
8
9
10
11
12
13
// Observable.ts 定义一个可观察对象接口,包括订阅、取消订阅和通知方法
import { Observer } from "../Observer";

export interface Observable {
  // 订阅
  subscribe(observer: Observer): void;

  // 取消订阅
  unsubscribe(observer: Observer): void;

  // 通知
  notifyObservers(): void;
}

可观察者实现

实现了一个商品观察对象

可观察者实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// ProductObservable.ts  实现具体的可观察对象(商品通知器)
import { Observable } from "./Observable";
import { Observer, SubscriptionType } from "../Observer";
import { logger } from "../../util/Logger";

export class ProductObservable implements Observable {
  private publishers: Observer[] = [];
  private currentPrice: number = 0.0;
  private originalPrice: number = 100.0; // 原始价格,用于比较

  constructor(private product: string) {
    logger.info(
      `创建可观察对象(商品:${product}),价格 $${this.originalPrice}`
    );
  }

  subscribe(publisher: Observer): void {
    this.publishers.push(publisher);
    logger.info(
      `用户UUID: ${publisher.userUUID} ,成功订阅商品 ${
        this.product
      } ,订阅类型 ${publisher.subscriptionType.getDescription()}.`
    );
  }

  unsubscribe(publisher: Observer): void {
    this.publishers = this.publishers.filter(
      (obs) => obs.userUUID !== publisher.userUUID
    );
    logger.info(
      `用户UUID: ${publisher.userUUID} ,取消订阅商品 ${this.product} `
    );
  }

  notifyObservers(): void {
    for (const publisher of this.publishers) {
      switch (publisher.subscriptionType) {
        case SubscriptionType.IN_STOCK:
          publisher.update(this.product, this.currentPrice);
          break;
        case SubscriptionType.DISCOUNT:
          if (this.currentPrice < this.originalPrice) {
            publisher.update(this.product, this.currentPrice);
          }
          break;
        case SubscriptionType.DISCOUNT_TO:
          if (this.currentPrice <= (publisher.discountThreshold ?? 0)) {
            publisher.update(this.product, this.currentPrice);
          }
          break;
      }
    }
  }

  productRestocked(): void {
    logger.info(`商品 ${this.product} 采购成功`);
    this.notifyObservers();
  }

  productDiscounted(newPrice: number): void {
    this.currentPrice = newPrice;
    if (newPrice === this.originalPrice) {
      logger.info(`商品 ${this.product} 恢复原价`);
    } else {
      logger.info(`商品 ${this.product} 降价至: $${this.currentPrice}`);
    }
    this.notifyObservers();
  }
}

测试效果

创建 小米SU7 这个可观察对象
三个用户关注了 小米SU7,关注类型不一样
在 小米SU7 库存和价格变动时候可以观测到对应的通知变化

测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// main.ts
import { ProductObservable } from "./observable/ProductObservable";
import { UserObserver } from "./UserObserver";
import { SubscriptionType } from "./Observer";
import { logger } from "../util/Logger";

export const TestObserver = () => {
  // 创建可观察对象(商品通知器)
  const su7Notifier = new ProductObservable("小米SU7");

  // 创建观察者(用户)
  const user1 = new UserObserver(
    "UUID-1111",
    "user1@thatcoder.cn",
    SubscriptionType.IN_STOCK
  );
  const user2 = new UserObserver(
    "UUID-2222",
    "user2@thatcoder.cn",
    SubscriptionType.DISCOUNT
  );
  const user3 = new UserObserver(
    "UUID-3333",
    "user3@thatcoder.cn",
    SubscriptionType.DISCOUNT_TO,
    50
  );

  // 用户1订阅iPhone 15有货通知
  su7Notifier.subscribe(user1);
  // 用户2订阅iPhone 15降价通知
  su7Notifier.subscribe(user2);
  // 用户3订阅iPhone 15降价到50%通知
  su7Notifier.subscribe(user3);

  // 商品到货,通知相关用户
  su7Notifier.productRestocked();

  // 商品降价,通知相关用户
  su7Notifier.productDiscounted(60.0);

  // 商品恢复原价
  su7Notifier.productDiscounted(100.0);

  // 商品降价到50%,通知相关用户
  su7Notifier.productDiscounted(45.0);

  // 用户1取消iPhone 15的订阅
  su7Notifier.unsubscribe(user1);

  // 商品到货,通知剩余的用户
  su7Notifier.productRestocked();
};

测试结果

和预想一致,可观察对象只需要关注自己的变动就可以了,用户考虑的就多了(还要点击订阅)。
降价到60,所以用户3不被通知
用户1取消订阅,所以来货了也不被通知
当然这是最简单的示例

运行结果
运行结果

Spring监听机制

Spring有EventListener类似去定义一个事件的处理逻辑,相当于在里面写了订阅者的通知方法。ApplicationEventPublisher会去发布定义的事件,相当于可观察者的对象发生了变动。不同的是我们只关心发布和处理逻辑即可,中间的调用交给了Listener

生命周期事件

在包 org.springframework.context.event 下面有很多与 ApplicationContext 生命周期相关的事件,这些事件都继承自 ApplicationContextEvent,包括 ContextRefreshedEvent, ContextStartedEvent, ContextStoppedEvent, ContextClosedEvent
到了对应的生命周期会调用订阅。

启动和刷新
1
2
3
4
5
6
7
8
9
10
import org.springframework.context.ApplicationListener
import org.springframework.context.event.ContextRefreshedEvent
import org.springframework.stereotype.Component

@Component
class StartupListener : ApplicationListener<ContextRefreshedEvent> {
override fun onApplicationEvent(event: ContextRefreshedEvent) {
println("应用刷新成功!")
}
}

事务监听

@TransactionalEventListener
举例一个下单成功后的发布事务

事件定义
1
data class OrderPlacedEvent(val orderId: String, val userEmail: String)
事件处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.context.event.TransactionalEventListener
import org.springframework.stereotype.Component

@Component
class OrderPlacedEventListener {

@TransactionalEventListener
@Async
fun handleOrderPlacedEvent(event: OrderPlacedEvent) {
// 发送订单确认邮件
val orderId = event.orderId
val userEmail = event.userEmail
println("发送 $orderId 信息到用户邮箱 $userEmail")
// 实际发送邮件的逻辑...
}
}
事件触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional

@Service
class OrderService(private val eventPublisher: ApplicationEventPublisher) {

@Transactional
fun save(order: Order) {
// 处理下单逻辑...
// 发布事件
eventPublisher.publishEvent(OrderPlacedEvent(orderId, userEmail))
}
}

总结

优点

  • 代码解耦:观察者和订阅者的逻辑分开,订阅者只引用了抽象的发布者接口,每个可观察者只需要关注自己的实现。
  • 抽象耦合:如上代码解耦后逻辑上依然保持着抽象的耦合,订阅者只需要注册订阅即可

缺点

  • 隐式依赖:抽象耦合就代表着事件通知机制是隐式的,系统的行为可能变得难以预测和理解。及时补充文档,不然就慢慢DEBUG。
  • 瞬时峰值:某个可观察对象有大量订阅时,触发update带来的巨额性能开销可能会导致性能瓶颈,甚至系统阻塞。注意异步和削峰。
  • 并发问题:多线程中,事件的发布和订阅者的变动可能带来并发问题。需要复杂的同步机制来确保线程安全,比如ConcurrentModificationException。除了线程安全的集合可能还需要考虑显式锁、读写锁或原子操作。
IDEA的监听耳机
IDEA的监听耳机

本站由 钟意 使用 Stellar 1.28.1 主题创建。
又拍云 提供CDN加速/云存储服务
vercel 提供托管服务
湘ICP备2023019799号-1
总访问 次 | 本页访问