Published on

RxJS(四):深入理解 Observable 和 Observer

Authors
  • avatar
    Name
    青雲
    Twitter

什么是 Observable

Observable 的定义与特点

Observable 是 RxJS 中最核心的概念之一,代表了一个可以被观察的数据流。Observable 的核心代码在 observable.ts 文件中。以下是 Observable 类的简化实现:

// Observable.ts
export class Observable<T> implements Subscribable<T> {
  constructor(
    private _subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic
  ) {}

  // 核心订阅方法
  subscribe(observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription {
    const subscriber =
      observerOrNext instanceof Subscriber ? observerOrNext : new Subscriber(observerOrNext)
    subscriber.add(this._trySubscribe(subscriber))
    return subscriber
  }

  private _trySubscribe(sink: Subscriber<T>): TeardownLogic {
    try {
      return this._subscribe(sink)
    } catch (err) {
      sink.error(err)
    }
  }
}

Observable 的构造函数接收一个可选的 subscribe 函数作为参数,这个设计很关键,构造函数本身并不执行任何订阅逻辑,只是存储了订阅函数供后续使用。

惰性订阅机制

Observable 是惰性的,只有在有订阅者订阅时才会执行。

  • 当创建一个 Observable 时(通过 new Observable()),只是定义了一个数据流的蓝图,但并不会立即执行任何操作
  • 只有当调用 subscribe() 方法时,传入的订阅函数才会被执行,开始真正的数据流处理
  • 每次调用 subscribe() 都会创建一个新的执行上下文,这意味着数据流是独立的

示例:

// 创建 Observable - 这时候什么都不会发生
const observable = new Observable((subscriber) => {
  console.log('开始执行')
  subscriber.next(1)
  subscriber.next(2)
  subscriber.complete()
})

// 只有调用 subscribe 时才会执行
observable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('完成'),
})

这种惰性订阅机制的设计让 RxJS 能够:

  1. 提高性能:避免不必要的计算和资源消耗
  2. 更好的内存管理:资源只在需要时被分配
  3. 提供更好的控制流:用户可以精确控制数据流的开始和结束
  4. 支持多次订阅:同一个 Observable 可以被多次订阅,每次都是独立的执行上下文

Observable 与 Iterator 的对比

在编写和理解 RxJS 中的 Observable 时,了解 Observable 和 Iterator 之间的对比是非常重要的。两者虽然都是处理数据流的数据结构,但在概念和行为上有显著的区别。

数据流动方向

  • Iterator(拉取模式 Pull):
const iterator = [1, 2, 3][Symbol.iterator]()
console.log(iterator.next().value) // 消费者主动拉取数据
console.log(iterator.next().value)
  • Observable(推送模式 Push):
const observable = new Observable((subscriber) => {
  subscriber.next(1) // 生产者主动推送数据
  subscriber.next(2)
  subscriber.complete()
})

observable.subscribe((value) => console.log(value))

同步与异步

  • Iterator:
    • 同步操作:每次调用 next() 都会立即返回值
    • 一次性获取:通常一次性获取所有值
const iterator = [1, 2, 3][Symbol.iterator]()
while (true) {
  const { done, value } = iterator.next()
  if (done) break
  console.log(value) // 同步输出
}
  • Observable:
    • 异步操作:可以随时发送值
    • 持续性流动:可以持续不断地发送值
const observable = new Observable((subscriber) => {
  setTimeout(() => subscriber.next(1), 1000)
  setTimeout(() => subscriber.next(2), 2000)
  setTimeout(() => subscriber.complete(), 3000)
})

值的数量和时间

  • Iterator:
    • 有限数量:通常处理有限集合
    • 同步时间:值的获取时间是确定的
  • Observable:
    • 无限数量:可以处理无限流
    • 异步时间:值可以在任意时间点到达
// 无限流示例
const infiniteObservable = new Observable((subscriber) => {
  let count = 0
  const interval = setInterval(() => {
    subscriber.next(count++)
  }, 1000)

  return () => clearInterval(interval) // 清理函数
})

错误处理

  • Iterator:
    • 同步错误:使用 try/catch 捕获
    • 简单错误处理:通常只需要处理同步错误
try {
  for (const value of collection) {
    // 处理值
  }
} catch (error) {
  // 处理错误
}
  • Observable:
    • 异步错误:通过 error 回调处理
    • 完整错误处理:可以处理异步和同步错误
observable.subscribe({
  next: (value) => console.log(value),
  error: (error) => console.error('Error:', error),
  complete: () => console.log('Complete'),
})

资源管理

  • Iterator:
    • 简单清理:通常不需要特别的清理机制
    • 手动控制:需要手动控制迭代的结束
  • Observable:
    • 自动清理:通过 unsubscribe 自动清理资源
    • 完整生命周期:包含订阅、执行、清理的完整周期
const subscription = observable.subscribe(/*...*/)
// 稍后取消订阅
subscription.unsubscribe() // 自动清理所有相关资源

组合性

  • Iterator:
    • 有限组合:主要通过循环和生成器函数组合
    • 同步组合:组合操作是同步的
  • Observable:
    • 强大组合:通过各种操作符(operators)进行复杂组合
    • 异步组合:可以组合异步操作
observable1
  .pipe(
    map((x) => x * 2),
    filter((x) => x > 10),
    mergeMap((x) => observable2)
  )
  .subscribe(/*...*/)

Observer 的角色

Observer 的定义与方法

Observer 是一个包含 next, error 和 complete 方法的对象,用于接收 Observable 推送的消息。

interface Observer<T> {
  next: (value: T) => void // 处理下一个值
  error: (err: any) => void // 处理错误
  complete: () => void // 处理完成
}

Subscriber 是 Observer 接口的具体实现,它继承自 Subscription 并实现了 Observer 接口:

export class Subscriber<T> extends Subscription implements Observer<T> {
  protected isStopped: boolean = false
  protected destination: Observer<T>

  constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null) {
    super()
    this.destination =
      destination instanceof Subscriber ? destination : createSafeObserver(destination)
  }
}

订阅机制详解

核心组成

  • Subscription(订阅)

Subscription 代表了一个可清理的资源,主要用于管理 Observable 的生命周期。它提供了用于取消订阅(释放资源)的方法,并可以组合多个订阅。

主要功能:

  • unsubscribe(): 取消订阅并释放资源。
  • add(): 组合多个订阅,确保多个资源可以一起清理。

通过 Subscription,我们可以管理订阅的生命周期,并确保在订阅结束时进行必要的清理工作。

const subscription = observable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('Complete'),
})

subscription.unsubscribe()
  • SafeObserver(安全观察者)

SafeObserver 是一个包装原始观察者的类,主要用于提供错误处理和完成处理的保护。它确保通知按正确的顺序发送,并防止在完成或错误后继续发送值。

主要功能:

  • 包装原始观察者,提供错误处理和完成处理的保护。
  • 确保通知按正确的顺序发送。
  • 防止在完成或错误后继续发送值。
// Subscriber.ts
export class Subscriber<T> extends Subscription implements Observer<T> {
  protected isStopped: boolean = false;
  protected destination: Observer<T>;

  constructor(destination?: Subscriber<any> | Partial<Observer<any>> | ((value: any) => void) | null) {
    super();
    this.destination = destination instanceof Subscriber ? destination : createSafeObserver(destination);
  }

  ... ...
  ... ...

}

通过 SafeObserver,我们确保在通知观察者时,即使发生错误或者完成后,也能正确地处理,避免出现不一致的状态。

订阅资源管理类图

工作流程

订阅初始化阶段

当调用 observable.subscribe() 时,会创建一个新的 Subscriber 实例。

Subscriber 构造函数初始化步骤:

  • 继承自 Subscription 类,获得资源管理能力。
  • 设置 destination 对象,用于处理数据流(根据传入的观察者创建安全观察者)。
  • 初始化状态标志 isStopped = false,用于跟踪订阅状态。
  • 设置数据处理方法:nexterrorcomplete

数据流转阶段

数据流转通过三个核心方法 next, error, 和 complete 实现:

next(value): 处理正常数据

next(value: T): void {
  if (!this.isStopped) {
    this._next(value);
  } else {
    handleStoppedNotification(nextNotification(value), this);
  }
}

error(err): 处理错误情况

error(err: any): void {
  if (!this.isStopped) {
    this.isStopped = true;
    this._error(err);
  } else {
    handleStoppedNotification(errorNotification(err), this);
  }
}

complete(): 处理完成信号

complete(): void {
  if (!this.isStopped) {
    this.isStopped = true;
    this._complete();
  } else {
    handleStoppedNotification(COMPLETE_NOTIFICATION, this);
  }
}

每个方法都遵循相同的处理模式:

  • 检查 isStopped 状态。
  • 如果已停止,调用 handleStoppedNotification
  • 如果未停止,执行相应的内部处理方法(_next_error_complete)。

资源管理机制

通过 Subscription 提供的机制管理资源:

add(teardown): 添加子订阅或清理函数

add(teardown: TeardownLogic): void {
  if (teardown && teardown !== this) {
    if (this.closed) {
      execFinalizer(teardown);
    } else {
      this._finalizers ??= new Set();
      this._finalizers.add(teardown);
    }
  }
}

remove(teardown): 移除之前添加的资源

remove(teardown: Exclude<TeardownLogic, void>): void {
  this._finalizers?.delete(teardown);
}

unsubscribe(): 取消订阅,清理所有资源

unsubscribe(): void {
  if (!this.closed) {
    this.closed = true;

    try {
      this.initialTeardown?.();
    } catch (e) {
      // Handle initial teardown error
    }

    this._finalizers?.forEach(finalizer => {
      try {
        execFinalizer(finalizer);
      } catch (err) {
        // Handle finalizer error
      }
    });

    this._finalizers = null;
  }
}

订阅链管理

订阅可以形成层级关系,通过 add() 方法可以组合多个订阅,当父订阅被取消时,所有子订阅也会被自动取消。

if (hasAddAndUnsubscribe(destination)) {
  destination.add(this)
}

使用 Set 数据结构存储 finalizers,确保资源清理的唯一性。

错误处理机制

错误可以在多个层级处理:

  • Subscriber 级别: 通过 error() 方法处理。
  • Observable 级别: 通过定义的错误处理逻辑处理。
  • 全局级别: 通过 GlobalConfig.onUnhandledError 处理。

性能优化考虑

使用 V8 引擎的隐藏类优化确保性能:

// 确保所有属性按固定顺序初始化
this._next = this._nextOverride ? overrideNext : this._next
this._error = this._errorOverride ? overrideError : this._error
this._complete = this._completeOverride ? overrideComplete : this._complete

完整的生命周期

创建阶段

const subscription = observable.subscribe(observer)

活跃阶段

  • 处理数据流:next()
  • 处理错误:error()
  • 处理完成:complete()

终止阶段

subscription.unsubscribe()

订阅生命周期序列图

这种设计实现了以下几方面的优势:

  • 灵活的资源管理: 通过 Subscription 管理生命周期和资源清理。
  • 可靠的错误处理: 提供多个层级的错误处理机制,确保运行时的安全性。
  • 可组合的数据流: 通过操作符链式调用和 pipe 方法实现数据流的变换和组合。
  • 自动的内存管理: 通过 unsubscribe 方法自动释放资源,防止内存泄漏。
  • 优秀的性能表现: 使用 V8 引擎的隐藏类优化,确保高效运行。

Observable 的创建方式

RxJS 提供了多种工厂方法来创建 Observable,例如 of, from, interval 等。这些创建方式覆盖了不同的使用场景:

  • 同步数据:of, from(array)
  • 异步数据:fromPromise, fromEvent
  • 流式数据:fromIterable, fromAsyncIterable
  • 定时数据:interval, timer
  • 条件创建:defer
  • 错误处理:throwError

选择合适的创建方式取决于:

  • 数据源类型(同步/异步)
  • 数据流特征(单值/多值)
  • 执行时机(即时/延迟)
  • 错误处理需求
  • 资源管理需求

通过这些不同的创建方式,RxJS 提供了强大而灵活的响应式编程能力。

总结

在深入分析了 RxJS 的 Observable 和订阅机制后,可以总结出其关键设计模式和核心机制,以及它们所带来的优势。RxJS 通过精心的设计,实现了一种高效、类型安全且易于扩展的响应式编程范式,使得异步数据流的处理变得更加简单、可维护,并能够满足各种复杂的应用场景。

主要设计模式

观察者模式

  • Observer 接口定义观察行为:
    • Observer 接口定义了 next, error, 和 complete 方法,用于处理 Observable 推送的数据。
  • Observable 管理观察者集合:
    • Observable 通过 subscribe 方法管理观察者集合,确保数据流能够正确地传递给所有订阅者。

发布/订阅模式

  • 通过 subscribe 方法实现松耦合:
    • 订阅者与 Observable 之间通过 subscribe 方法建立联系,从而实现松耦合。这种模式允许轻松增加或移除订阅者。
  • 支持多播和单播:
    • Observable 可以同时被多个观察者订阅(多播),也可以单独处理每个订阅(单播),实现灵活的数据流处理。

责任链模式

  • 通过 pipe 操作符链式处理:
    • pipe 方法允许将多个操作符按顺序组合起来,形成一个操作链。这种链式处理方式能够清晰地表达数据流的处理步骤和顺序。

组合模式

  • 订阅可以组合成树形结构:
    • 通过 Subscriptionadd 方法,多个订阅可以组合在一起,形成层级结构。当父订阅被取消时,所有子订阅也会被自动取消,保证资源管理的一致性。

关键优势

类型安全

  • 完整的泛型支持:
    • RxJS 使用 TypeScript 提供了完整的泛型支持,使得类型检查更加严格和可靠。
  • 类型推导友好:
    • RxJS 的 API 设计非常注重类型推导,能够在代码提示和编译期捕捉错误,提高开发效率和代码质量。

资源管理

  • 自动清理机制:
    • 通过 Subscription 管理订阅的生命周期,确保在订阅结束时自动清理资源,有效防止内存泄漏。

错误处理

  • 全面的错误捕获:
    • RxJS 提供了全面的错误处理机制,通过 error 方法捕获和处理 Observable 中的错误,确保数据流在出现异常时能正确反应。
  • 可配置的错误处理:
    • 错误处理机制可以在多个层级进行配置,从局部的 Subscriber 错误处理到全局的错误处理配置,提供灵活性和可维护性。

扩展性

  • 操作符机制:
    • RxJS 提供了丰富的内置操作符,并允许用户轻松实现自定义操作符,以满足各种复杂的数据流处理需求。
  • 自定义观察者:
    • 用户可以创建自定义的观察者,实现特定需求的处理逻辑,并将其与 Observable 进行无缝集成。

性能优化

  • V8 引擎优化:
    • RxJS 在设计和实现上采用了 V8 引擎的最佳实践,如隐藏类优化,确保高性能运行。
  • 内存管理优化:
    • 通过严格的资源管理和及时的清理机制,RxJS 有效避免了内存泄漏问题,确保应用的稳定性和高效性。