- Published on
RxJS(四):深入理解 Observable 和 Observer
- Authors
- Name
- 青雲
什么是 Observable
Observable 的定义与特点
Observable 是 RxJS 中最核心的概念之一,代表了一个可以被观察的数据流。Observable 的核心代码在 observable.ts 文件中。以下是 Observable 类的简化实现:
// Observable.ts
export class Observable<T> implements Subscribable<T> {
constructor(
private _subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic
) {}
// 核心订阅方法
subscribe(observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription {
const subscriber =
observerOrNext instanceof Subscriber ? observerOrNext : new Subscriber(observerOrNext)
subscriber.add(this._trySubscribe(subscriber))
return subscriber
}
private _trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink)
} catch (err) {
sink.error(err)
}
}
}
Observable 的构造函数接收一个可选的 subscribe 函数作为参数,这个设计很关键,构造函数本身并不执行任何订阅逻辑,只是存储了订阅函数供后续使用。
惰性订阅机制
Observable 是惰性的,只有在有订阅者订阅时才会执行。
- 当创建一个 Observable 时(通过
new Observable()
),只是定义了一个数据流的蓝图,但并不会立即执行任何操作 - 只有当调用
subscribe()
方法时,传入的订阅函数才会被执行,开始真正的数据流处理 - 每次调用
subscribe()
都会创建一个新的执行上下文,这意味着数据流是独立的
示例:
// 创建 Observable - 这时候什么都不会发生
const observable = new Observable((subscriber) => {
console.log('开始执行')
subscriber.next(1)
subscriber.next(2)
subscriber.complete()
})
// 只有调用 subscribe 时才会执行
observable.subscribe({
next: (value) => console.log(value),
complete: () => console.log('完成'),
})
这种惰性订阅机制的设计让 RxJS 能够:
- 提高性能:避免不必要的计算和资源消耗
- 更好的内存管理:资源只在需要时被分配
- 提供更好的控制流:用户可以精确控制数据流的开始和结束
- 支持多次订阅:同一个 Observable 可以被多次订阅,每次都是独立的执行上下文
Observable 与 Iterator 的对比
在编写和理解 RxJS 中的 Observable 时,了解 Observable 和 Iterator 之间的对比是非常重要的。两者虽然都是处理数据流的数据结构,但在概念和行为上有显著的区别。
数据流动方向
- Iterator(拉取模式 Pull):
const iterator = [1, 2, 3][Symbol.iterator]()
console.log(iterator.next().value) // 消费者主动拉取数据
console.log(iterator.next().value)
- Observable(推送模式 Push):
const observable = new Observable((subscriber) => {
subscriber.next(1) // 生产者主动推送数据
subscriber.next(2)
subscriber.complete()
})
observable.subscribe((value) => console.log(value))
同步与异步
- Iterator:
- 同步操作:每次调用 next() 都会立即返回值
- 一次性获取:通常一次性获取所有值
const iterator = [1, 2, 3][Symbol.iterator]()
while (true) {
const { done, value } = iterator.next()
if (done) break
console.log(value) // 同步输出
}
- Observable:
- 异步操作:可以随时发送值
- 持续性流动:可以持续不断地发送值
const observable = new Observable((subscriber) => {
setTimeout(() => subscriber.next(1), 1000)
setTimeout(() => subscriber.next(2), 2000)
setTimeout(() => subscriber.complete(), 3000)
})
值的数量和时间
- Iterator:
- 有限数量:通常处理有限集合
- 同步时间:值的获取时间是确定的
- Observable:
- 无限数量:可以处理无限流
- 异步时间:值可以在任意时间点到达
// 无限流示例
const infiniteObservable = new Observable((subscriber) => {
let count = 0
const interval = setInterval(() => {
subscriber.next(count++)
}, 1000)
return () => clearInterval(interval) // 清理函数
})
错误处理
- Iterator:
- 同步错误:使用 try/catch 捕获
- 简单错误处理:通常只需要处理同步错误
try {
for (const value of collection) {
// 处理值
}
} catch (error) {
// 处理错误
}
- Observable:
- 异步错误:通过 error 回调处理
- 完整错误处理:可以处理异步和同步错误
observable.subscribe({
next: (value) => console.log(value),
error: (error) => console.error('Error:', error),
complete: () => console.log('Complete'),
})
资源管理
- Iterator:
- 简单清理:通常不需要特别的清理机制
- 手动控制:需要手动控制迭代的结束
- Observable:
- 自动清理:通过 unsubscribe 自动清理资源
- 完整生命周期:包含订阅、执行、清理的完整周期
const subscription = observable.subscribe(/*...*/)
// 稍后取消订阅
subscription.unsubscribe() // 自动清理所有相关资源
组合性
- Iterator:
- 有限组合:主要通过循环和生成器函数组合
- 同步组合:组合操作是同步的
- Observable:
- 强大组合:通过各种操作符(operators)进行复杂组合
- 异步组合:可以组合异步操作
observable1
.pipe(
map((x) => x * 2),
filter((x) => x > 10),
mergeMap((x) => observable2)
)
.subscribe(/*...*/)
Observer 的角色
Observer 的定义与方法
Observer 是一个包含 next, error 和 complete 方法的对象,用于接收 Observable 推送的消息。
interface Observer<T> {
next: (value: T) => void // 处理下一个值
error: (err: any) => void // 处理错误
complete: () => void // 处理完成
}
Subscriber 是 Observer 接口的具体实现,它继承自 Subscription 并实现了 Observer 接口:
export class Subscriber<T> extends Subscription implements Observer<T> {
protected isStopped: boolean = false
protected destination: Observer<T>
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null) {
super()
this.destination =
destination instanceof Subscriber ? destination : createSafeObserver(destination)
}
}
订阅机制详解
核心组成
- Subscription(订阅)
Subscription
代表了一个可清理的资源,主要用于管理 Observable 的生命周期。它提供了用于取消订阅(释放资源)的方法,并可以组合多个订阅。
主要功能:
- unsubscribe(): 取消订阅并释放资源。
- add(): 组合多个订阅,确保多个资源可以一起清理。
通过 Subscription
,我们可以管理订阅的生命周期,并确保在订阅结束时进行必要的清理工作。
const subscription = observable.subscribe({
next: (value) => console.log(value),
complete: () => console.log('Complete'),
})
subscription.unsubscribe()
- SafeObserver(安全观察者)
SafeObserver
是一个包装原始观察者的类,主要用于提供错误处理和完成处理的保护。它确保通知按正确的顺序发送,并防止在完成或错误后继续发送值。
主要功能:
- 包装原始观察者,提供错误处理和完成处理的保护。
- 确保通知按正确的顺序发送。
- 防止在完成或错误后继续发送值。
// Subscriber.ts
export class Subscriber<T> extends Subscription implements Observer<T> {
protected isStopped: boolean = false;
protected destination: Observer<T>;
constructor(destination?: Subscriber<any> | Partial<Observer<any>> | ((value: any) => void) | null) {
super();
this.destination = destination instanceof Subscriber ? destination : createSafeObserver(destination);
}
... ...
... ...
}
通过 SafeObserver
,我们确保在通知观察者时,即使发生错误或者完成后,也能正确地处理,避免出现不一致的状态。
订阅资源管理类图
工作流程
订阅初始化阶段
当调用 observable.subscribe()
时,会创建一个新的 Subscriber
实例。
Subscriber 构造函数初始化步骤:
- 继承自
Subscription
类,获得资源管理能力。 - 设置
destination
对象,用于处理数据流(根据传入的观察者创建安全观察者)。 - 初始化状态标志
isStopped = false
,用于跟踪订阅状态。 - 设置数据处理方法:
next
、error
和complete
。
数据流转阶段
数据流转通过三个核心方法 next, error, 和 complete 实现:
next(value): 处理正常数据
next(value: T): void {
if (!this.isStopped) {
this._next(value);
} else {
handleStoppedNotification(nextNotification(value), this);
}
}
error(err): 处理错误情况
error(err: any): void {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
} else {
handleStoppedNotification(errorNotification(err), this);
}
}
complete(): 处理完成信号
complete(): void {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
} else {
handleStoppedNotification(COMPLETE_NOTIFICATION, this);
}
}
每个方法都遵循相同的处理模式:
- 检查
isStopped
状态。 - 如果已停止,调用
handleStoppedNotification
。 - 如果未停止,执行相应的内部处理方法(
_next
、_error
、_complete
)。
资源管理机制
通过 Subscription 提供的机制管理资源:
add(teardown): 添加子订阅或清理函数
add(teardown: TeardownLogic): void {
if (teardown && teardown !== this) {
if (this.closed) {
execFinalizer(teardown);
} else {
this._finalizers ??= new Set();
this._finalizers.add(teardown);
}
}
}
remove(teardown): 移除之前添加的资源
remove(teardown: Exclude<TeardownLogic, void>): void {
this._finalizers?.delete(teardown);
}
unsubscribe(): 取消订阅,清理所有资源
unsubscribe(): void {
if (!this.closed) {
this.closed = true;
try {
this.initialTeardown?.();
} catch (e) {
// Handle initial teardown error
}
this._finalizers?.forEach(finalizer => {
try {
execFinalizer(finalizer);
} catch (err) {
// Handle finalizer error
}
});
this._finalizers = null;
}
}
订阅链管理
订阅可以形成层级关系,通过 add() 方法可以组合多个订阅,当父订阅被取消时,所有子订阅也会被自动取消。
if (hasAddAndUnsubscribe(destination)) {
destination.add(this)
}
使用 Set 数据结构存储 finalizers
,确保资源清理的唯一性。
错误处理机制
错误可以在多个层级处理:
- Subscriber 级别: 通过
error()
方法处理。 - Observable 级别: 通过定义的错误处理逻辑处理。
- 全局级别: 通过
GlobalConfig.onUnhandledError
处理。
性能优化考虑
使用 V8 引擎的隐藏类优化确保性能:
// 确保所有属性按固定顺序初始化
this._next = this._nextOverride ? overrideNext : this._next
this._error = this._errorOverride ? overrideError : this._error
this._complete = this._completeOverride ? overrideComplete : this._complete
完整的生命周期
创建阶段
const subscription = observable.subscribe(observer)
活跃阶段
- 处理数据流:
next()
- 处理错误:
error()
- 处理完成:
complete()
终止阶段
subscription.unsubscribe()
订阅生命周期序列图
这种设计实现了以下几方面的优势:
- 灵活的资源管理: 通过 Subscription 管理生命周期和资源清理。
- 可靠的错误处理: 提供多个层级的错误处理机制,确保运行时的安全性。
- 可组合的数据流: 通过操作符链式调用和 pipe 方法实现数据流的变换和组合。
- 自动的内存管理: 通过 unsubscribe 方法自动释放资源,防止内存泄漏。
- 优秀的性能表现: 使用 V8 引擎的隐藏类优化,确保高效运行。
Observable 的创建方式
RxJS 提供了多种工厂方法来创建 Observable,例如 of, from, interval 等。这些创建方式覆盖了不同的使用场景:
- 同步数据:of, from(array)
- 异步数据:fromPromise, fromEvent
- 流式数据:fromIterable, fromAsyncIterable
- 定时数据:interval, timer
- 条件创建:defer
- 错误处理:throwError
选择合适的创建方式取决于:
- 数据源类型(同步/异步)
- 数据流特征(单值/多值)
- 执行时机(即时/延迟)
- 错误处理需求
- 资源管理需求
通过这些不同的创建方式,RxJS 提供了强大而灵活的响应式编程能力。
总结
在深入分析了 RxJS 的 Observable 和订阅机制后,可以总结出其关键设计模式和核心机制,以及它们所带来的优势。RxJS 通过精心的设计,实现了一种高效、类型安全且易于扩展的响应式编程范式,使得异步数据流的处理变得更加简单、可维护,并能够满足各种复杂的应用场景。
主要设计模式
观察者模式
- Observer 接口定义观察行为:
Observer
接口定义了next
,error
, 和complete
方法,用于处理 Observable 推送的数据。
- Observable 管理观察者集合:
Observable
通过subscribe
方法管理观察者集合,确保数据流能够正确地传递给所有订阅者。
发布/订阅模式
- 通过
subscribe
方法实现松耦合:- 订阅者与 Observable 之间通过
subscribe
方法建立联系,从而实现松耦合。这种模式允许轻松增加或移除订阅者。
- 订阅者与 Observable 之间通过
- 支持多播和单播:
- Observable 可以同时被多个观察者订阅(多播),也可以单独处理每个订阅(单播),实现灵活的数据流处理。
责任链模式
- 通过
pipe
操作符链式处理:pipe
方法允许将多个操作符按顺序组合起来,形成一个操作链。这种链式处理方式能够清晰地表达数据流的处理步骤和顺序。
组合模式
- 订阅可以组合成树形结构:
- 通过
Subscription
的add
方法,多个订阅可以组合在一起,形成层级结构。当父订阅被取消时,所有子订阅也会被自动取消,保证资源管理的一致性。
- 通过
关键优势
类型安全
- 完整的泛型支持:
- RxJS 使用 TypeScript 提供了完整的泛型支持,使得类型检查更加严格和可靠。
- 类型推导友好:
- RxJS 的 API 设计非常注重类型推导,能够在代码提示和编译期捕捉错误,提高开发效率和代码质量。
资源管理
- 自动清理机制:
- 通过
Subscription
管理订阅的生命周期,确保在订阅结束时自动清理资源,有效防止内存泄漏。
- 通过
错误处理
- 全面的错误捕获:
- RxJS 提供了全面的错误处理机制,通过
error
方法捕获和处理 Observable 中的错误,确保数据流在出现异常时能正确反应。
- RxJS 提供了全面的错误处理机制,通过
- 可配置的错误处理:
- 错误处理机制可以在多个层级进行配置,从局部的 Subscriber 错误处理到全局的错误处理配置,提供灵活性和可维护性。
扩展性
- 操作符机制:
- RxJS 提供了丰富的内置操作符,并允许用户轻松实现自定义操作符,以满足各种复杂的数据流处理需求。
- 自定义观察者:
- 用户可以创建自定义的观察者,实现特定需求的处理逻辑,并将其与 Observable 进行无缝集成。
性能优化
- V8 引擎优化:
- RxJS 在设计和实现上采用了 V8 引擎的最佳实践,如隐藏类优化,确保高性能运行。
- 内存管理优化:
- 通过严格的资源管理和及时的清理机制,RxJS 有效避免了内存泄漏问题,确保应用的稳定性和高效性。