- Published on
RxJS源码解析(五): Scheduler--高效管理异步操作
- Authors
- Name
- 青雲
在 RxJS 中,Scheduler (调度器) 是一个重要的概念,它用于控制和管理 Observable 中的异步操作的执行时机。理解 Scheduler 的工作原理和使用方法,可以帮助我们更好地掌控 RxJS 中的异步流程。
基本概念
Scheduler(调度器)本质上是一个数据结构,它知道如何根据优先级或其他标准存储和队列任务,并在适当的时候执行这些任务。它由三个核心部分组成:
- 数据结构:用于存储和优先化任务,在 RxJS 中每个任务被包装成一个 Action 对象。Action 对象包含了需要执行的工作函数、状态数据以及执行延迟等信息。
- 执行上下文:定义任务在何时何地被执行,不同类型的调度器(如异步、队列等)会有不同的执行策略。例如,
asyncScheduler
使用setTimeout
或setInterval
来异步执行任务,而queueScheduler
则是递归同步执行任务。 - 虚拟时钟:通过
now()
方法提供时间概念,使得调度器可以处理基于时间的操作,如延迟执行、定时执行等。虚拟时钟使得调度器能够模拟时间流逝,从而支持复杂的时间操作和测试。
这三个组成部分协同工作,使得 Scheduler 能够精确控制任务的执行时机和方式,是 RxJS 中进行时间和并发管理的核心机制。
核心组件
让我们深入分析 Scheduler 的两个核心组件及其源码实现。
类图
Scheduler 类
首先看 Scheduler 的基础实现:
export class Scheduler implements SchedulerLike {
public static now: () => number = dateTimestampProvider.now;
constructor(
private schedulerActionCtor: typeof Action,
now: () => number = Scheduler.now
) {
this.now = now;
}
public now: () => number;
public schedule<T>(
work: (this: SchedulerAction<T>, state?: T) => void,
delay: number = 0,
state?: T
): Subscription {
return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
}
}
这段代码展示了 Scheduler 的核心设计:
- 时间管理:
- 通过静态的
now
方法提供时间概念 - 默认使用
dateTimestampProvider.now
获取真实时间 - 可以在构造函数中传入自定义的
now
方法,实现虚拟时间
- 通过静态的
- 任务创建:
schedulerActionCtor
是一个 Action 构造器,决定了如何创建任务- 这种设计允许不同的调度器使用不同的 Action 实现
- 调度接口:
schedule
方法是核心调度接口- 接收工作函数(work)、延迟时间(delay)和状态(state)
- 返回 Subscription 对象,允许取消调度的任务
Action 类层次结构
基础 Action 类
export class Action<T> extends Subscription {
constructor(
scheduler: Scheduler,
work: (this: SchedulerAction<T>, state?: T) => void
) {
super();
}
public schedule(state?: T, delay: number = 0): Subscription {
return this;
}
}
Action 类提供了最基础的抽象:
- 继承自 Subscription,具备可取消的能力
- 定义了基本的调度接口
- 作为所有具体 Action 实现的基类
异步 Action 实现
export class AsyncAction<T> extends Action<T> {
public id: TimerHandle | undefined; // 用于存储定时器ID
public state?: T; // 存储任务状态
public delay!: number; // 延迟时间
protected pending: boolean = false; // 任务是否待执行
constructor(
protected scheduler: AsyncScheduler,
protected work: (this: SchedulerAction<T>, state?: T) => void
) {
super(scheduler, work);
}
public schedule(state?: T, delay: number = 0): Subscription {
if (this.closed) {
return this;
}
// 更新状态
this.state = state;
const id = this.id;
const scheduler = this.scheduler;
// 如果已有定时器ID,先回收
if (id != null) {
this.id = this.recycleAsyncId(scheduler, id, delay);
}
// 设置任务为待执行状态
this.pending = true;
this.delay = delay;
// 请求新的定时器ID
this.id = this.requestAsyncId(scheduler, this.id, delay);
return this;
}
}
AsyncAction 实现了异步执行的核心逻辑:
- 通过 id 管理定时器
- 使用 pending 标记任务状态
- schedule 方法实现了任务的异步调度
- 提供了定时器管理机制(requestAsyncId/recycleAsyncId)
队列 Action 实现
export class QueueAction<T> extends AsyncAction<T> {
public schedule(state?: T, delay: number = 0): Subscription {
// 有延迟时使用异步方式
if (delay > 0) {
return super.schedule(state, delay);
}
// 无延迟时同步执行
this.delay = delay;
this.state = state;
this.scheduler.flush(this);
return this;
}
}
QueueAction 展示了特殊的执行策略:
- 继承自 AsyncAction,保留异步能力
- 当 delay 为 0 时,通过 flush 立即同步执行
- 当 delay 大于 0 时,回退到异步执行模式
工作流程
任务创建阶段
在 Scheduler 中创建一个新的 Action。Action 是调度任务的基本单元,通过封装待执行的工作函数和相关状态信息来实现。
new this.schedulerActionCtor<T>(this, work)
这就像你去餐厅点餐:
work
是你要点的菜(要执行的任务)schedulerActionCtor
是厨师(决定如何处理这个任务)- 创建的 Action 实例就像一张订单
任务调度阶段
每个 Action 都有自己的 schedule
方法,用于安排具体的调度逻辑。此方法会确定任务何时执行,以及使用何种策略(例如,立即执行、延迟执行等)。
.schedule(state, delay)
相当于确定这个订单的具体细节:
- state 是配菜要求(任务需要的数据)
- delay 是预约时间(任务延迟多久执行)
执行控制阶段
根据调度策略,任务可以异步或同步地执行。
// 异步执行
this.id = this.requestAsyncId(scheduler, this.id, delay);
// 同步执行
this.scheduler.flush(this);
就像餐厅处理订单的两种方式:
- 异步执行:像预约订单,到点再做
- 同步执行:像堂食订单,立即开始做
资源清理阶段
执行完任务后,清理相关资源,例如取消或回收定时器,以确保不留有多余的定时器引用,从而避免内存泄漏。
this.recycleAsyncId(scheduler, id, delay);
就像完成订单后的清理工作:
- 清理定时器(收拾餐桌)
- 释放资源(回收餐具)
流程图
内置调度器类型
AsyncScheduler(异步调度器)
简介
最基础的调度器,使用 JavaScript 的 setTimeout/setInterval 机制来调度任务。它像一个普通的定时任务系统,可以处理延迟执行的任务。
export class AsyncScheduler extends Scheduler {
// 存储待执行的任务队列
public actions: Array<AsyncAction<any>> = [];
// 标记是否有任务正在执行
public _active: boolean = false;
// 存储当前的定时器句柄
public _scheduled: TimerHandle | undefined;
public flush(action: AsyncAction<any>): void {
const { actions } = this;
if (this._active) {
// 如果当前有任务在执行,将新任务加入队列
actions.push(action);
return;
}
// ... 执行任务
}
}
特点
- 使用 setTimeout/setInterval 执行任务
- 支持延迟执行
- 任务按照队列顺序执行
- 适合处理异步操作和定时任务
使用场景
延时操作
// 延迟5秒后发出值
of(1, 2, 3).pipe(
delay(5000, asyncScheduler)
).subscribe(console.log);
定时轮询
// 每2秒轮询一次
interval(2000, asyncScheduler).pipe(
switchMap(() => this.api.getData())
).subscribe(data => console.log(data));
控制操作符的执行时机
// 强制 observeOn 使用异步调度
observable.pipe(
observeOn(asyncScheduler)
).subscribe(console.log);
延迟订阅
// 5秒后才开始订阅
const subscription = observable
.pipe(subscribeOn(asyncScheduler, 5000))
.subscribe(console.log);
节流控制
// 使用 asyncScheduler 实现防抖
fromEvent(button, 'click').pipe(
debounceTime(1000, asyncScheduler)
).subscribe(() => console.log('Clicked!'));
QueueScheduler(队列调度器)
简介
一个特殊的调度器,继承自 AsyncScheduler,但通过 QueueAction 实现了同步执行的能力。它能够立即执行没有延迟的任务,适合处理同步递归操作。
// QueueScheduler.ts
export class QueueScheduler extends AsyncScheduler {}
// QueueAction.ts
export class QueueAction<T> extends AsyncAction<T> {
public schedule(state?: T, delay: number = 0): Subscription {
// 有延迟时使用异步调度
if (delay > 0) {
return super.schedule(state, delay);
}
// 无延迟时立即执行
this.delay = delay;
this.state = state;
this.scheduler.flush(this);
return this;
}
public execute(state: T, delay: number): any {
// 有延迟或已关闭时使用父类执行方式
return delay > 0 || this.closed ?
super.execute(state, delay) :
this._execute(state, delay);
}
protected requestAsyncId(
scheduler: QueueScheduler,
id?: TimerHandle,
delay: number = 0
): TimerHandle {
// 有延迟时使用异步方式
if ((delay != null && delay > 0) ||
(delay == null && this.delay > 0)) {
return super.requestAsyncId(scheduler, id, delay);
}
// 否则立即执行
scheduler.flush(this);
return 0; // 返回 0 作为有效的 TimerHandle
}
}
特点
- 继承自 AsyncScheduler,保留异步能力
- 无延迟任务立即同步执行
- 有延迟任务使用异步模式
- 通过 flush 方法直接执行任务
- 适合处理递归操作
使用场景
递归操作
// 使用 queueScheduler 处理递归操作
const source = of(1, 2, 3).pipe(
observeOn(queueScheduler),
map(x => {
console.log('Processing:', x);
return x * x;
})
);
source.subscribe(console.log);
// 输出:
// Processing: 1
// 1
// Processing: 2
// 4
// Processing: 3
// 9
同步队列处理
// 确保操作按顺序同步执行
const queue = new Subject();
queue.pipe(
observeOn(queueScheduler),
tap(x => console.log('Processing:', x)),
map(x => x * 2)
).subscribe(console.log);
queue.next(1); // 立即处理
queue.next(2); // 立即处理
queue.next(3); // 立即处理
控制并发操作
// 使用 queueScheduler 控制操作的执行顺序
merge(
of('A', 'B', 'C'),
of(1, 2, 3)
).pipe(
observeOn(queueScheduler)
).subscribe(console.log);
// 输出会按顺序执行,不会交错
确保同步执行
// 强制同步执行,即使是异步操作
from(Promise.resolve('Hello')).pipe(
subscribeOn(queueScheduler)
).subscribe(console.log);
AsapScheduler(微任务调度器)
简介
使用微任务机制的调度器,通过 Immediate
工具类实现,它会在当前同步代码执行完成后立即执行任务。
// AsapAction.ts
export class AsapAction<T> extends AsyncAction<T> {
protected requestAsyncId(scheduler: AsapScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
// 有延迟时使用普通的异步调度
if (delay !== null && delay > 0) {
return super.requestAsyncId(scheduler, id, delay);
}
// 将动作推入调度器队列
scheduler.actions.push(this);
// 使用 immediateProvider 调度微任务
return scheduler._scheduled || (
scheduler._scheduled = immediateProvider.setImmediate(
scheduler.flush.bind(scheduler, undefined)
)
);
}
}
// Immediate.ts
export const Immediate = {
setImmediate(cb: () => void): number {
const handle = nextHandle++;
activeHandles[handle] = true;
if (!resolved) {
resolved = Promise.resolve();
}
// 使用 Promise 实现微任务调度
resolved.then(() => findAndClearHandle(handle) && cb());
return handle;
},
clearImmediate(handle: number): void {
findAndClearHandle(handle);
}
};
特点
- 使用 Promise 实现微任务机制
- 执行时机比 setTimeout 更早
- 在当前同步代码执行完立即执行
- 通过 Immediate 工具类管理微任务
使用场景
快速响应的异步操作
// 使用 asapScheduler 实现快速的异步响应
of(1, 2, 3).pipe(
observeOn(asapScheduler)
).subscribe(console.log);
// 会在当前同步代码执行完后立即执行
微任务级别的状态更新
// 用于状态更新,确保在当前同步代码执行完后立即更新
const state$ = new BehaviorSubject(0);
state$.pipe(
observeOn(asapScheduler),
tap(state => updateUI(state))
).subscribe();
// 多个状态更新会在同一个微任务中批量处理
state$.next(1);
state$.next(2);
state$.next(3);
DOM 更新优化
// 在 DOM 操作中使用,确保更新及时但不阻塞主线程
fromEvent(input, 'input').pipe(
map(e => e.target.value),
observeOn(asapScheduler),
tap(value => updateSearchResults(value))
).subscribe();
Promise 链中的操作
// 与 Promise 配合使用,保持相似的执行时序
from(Promise.resolve('data')).pipe(
observeOn(asapScheduler),
map(data => processData(data))
).subscribe(console.log);
AnimationFrameScheduler(动画帧调度器)
简介
专门用于处理动画相关操作的调度器,使用浏览器的 requestAnimationFrame
API,确保任务在下一帧渲染前执行。
// AnimationFrameAction.ts
export class AnimationFrameAction<T> extends AsyncAction<T> {
protected requestAsyncId(
scheduler: AnimationFrameScheduler,
id?: TimerHandle,
delay: number = 0
): TimerHandle {
// 有延迟时使用普通的异步调度
if (delay !== null && delay > 0) {
return super.requestAsyncId(scheduler, id, delay);
}
// 将动作推入调度器队列
scheduler.actions.push(this);
// 使用 animationFrameProvider 请求动画帧
return scheduler._scheduled || (
scheduler._scheduled = animationFrameProvider.requestAnimationFrame(
() => scheduler.flush(undefined)
)
);
}
protected recycleAsyncId(
scheduler: AnimationFrameScheduler,
id?: TimerHandle,
delay: number = 0
): TimerHandle | undefined {
if (delay != null ? delay > 0 : this.delay > 0) {
return super.recycleAsyncId(scheduler, id, delay);
}
// 清理不需要的动画帧请求
const { actions } = scheduler;
if (id != null && actions[actions.length - 1]?.id !== id) {
animationFrameProvider.cancelAnimationFrame(id as number);
scheduler._scheduled = undefined;
}
return undefined;
}
}
// animationFrameProvider.ts
export const animationFrameProvider: AnimationFrameProvider = {
schedule(callback) {
let request = requestAnimationFrame;
let cancel = cancelAnimationFrame;
const { delegate } = animationFrameProvider;
if (delegate) {
request = delegate.requestAnimationFrame;
cancel = delegate.cancelAnimationFrame;
}
const handle = request((timestamp) => {
cancel = undefined;
callback(timestamp);
});
return new Subscription(() => cancel?.(handle));
},
// ... 其他方法
};
特点
- 使用浏览器的 requestAnimationFrame API
- 任务在下一帧渲染前执行
- 支持动画帧的取消和重新调度
- 提供可定制的动画帧处理机制(通过 delegate)
- 适合处理视觉相关的操作
使用场景
平滑动画
// 创建平滑的计数动画
interval(0, animationFrameScheduler).pipe(
map(frame => Math.min(frame, 100)),
takeWhile(v => v <= 100)
).subscribe(value => {
element.style.width = `${value}%`;
});
视觉效果过渡
// 实现平滑的颜色过渡
const startColor = [0, 0, 0];
const endColor = [255, 255, 255];
interval(0, animationFrameScheduler).pipe(
map(frame => {
const progress = Math.min(frame / 60, 1); // 假设60帧完成过渡
return startColor.map((start, i) =>
Math.round(start + (endColor[i] - start) * progress)
);
}),
takeWhile(([r]) => r <= endColor[0])
).subscribe(([r, g, b]) => {
element.style.backgroundColor = `rgb(${r},${g},${b})`;
});
滚动动画
// 实现平滑滚动
const scroll$ = fromEvent(window, 'scroll').pipe(
observeOn(animationFrameScheduler),
map(() => window.scrollY)
);
// 平滑滚动到指定位置
function smoothScrollTo(target: number) {
const start = window.scrollY;
interval(0, animationFrameScheduler).pipe(
map(frame => {
const progress = Math.min(frame / 30, 1); // 30帧完成滚动
return start + (target - start) * easeInOut(progress);
}),
takeWhile(current => Math.abs(current - target) > 1)
).subscribe(y => window.scrollTo(0, y));
}
Canvas 动画
// Canvas 动画绘制
const canvas = document.querySelector('canvas');
const ctx = canvas.getContext('2d');
interval(0, animationFrameScheduler).pipe(
map(frame => ({
x: Math.cos(frame / 10) * 100,
y: Math.sin(frame / 10) * 100
}))
).subscribe(pos => {
ctx.clearRect(0, 0, canvas.width, canvas.height);
ctx.beginPath();
ctx.arc(pos.x + canvas.width/2, pos.y + canvas.height/2, 10, 0, Math.PI * 2);
ctx.fill();
});
内置调度器工作流程图
调度器选择指南
执行机制对比
调度器类型 | 执行机制 | 执行时机 | 任务队列 |
---|---|---|---|
AsyncScheduler | setTimeout/setInterval | 宏任务,最后执行 | 异步队列 |
QueueScheduler | 同步执行 | 立即执行 | 同步队列 |
AsapScheduler | Promise(微任务) | 当前同步代码后立即执行 | 微任务队列 |
AnimationFrameScheduler | requestAnimationFrame | 下一帧渲染前执行 | 动画帧队列 |
使用场景对比
调度器类型 | 主要场景 | 适用情况 | 不适用情况 |
---|---|---|---|
AsyncScheduler | + 延时操作 + 定时轮询 + 延迟订阅 | + 需要特定延迟的操 + 周期性任务 + 常规异步操作 | + 需要立即响应的场 + 动画相关操作 |
QueueScheduler | + 递归操作 + 同步队列处理 + 控制执行顺序 | + 需要立即执行的操 + 需要保证执行顺序 + 处理同步递归 | + 有延迟需求的场景 + 耗时操作 |
AsapScheduler | + 快速响应的异步操作 + 状态更新 + DOM更新 | + 需要快速异步响应 + UI状态更新 + Promise相关操作 | + 需要特定延迟的场景 + 动画效果 |
AnimationFrameScheduler | + 动画效果 + 视觉更新 + Canvas绘制 | + 平滑动画 + 视觉效果过渡 + 性能敏感的UI更新 | + 非视觉相关的操作 + 需要精确延时的场景 |
性能特征对比
调度器类型 | 响应速度 | CPU消耗 | 内存占用 | 适用规模 |
---|---|---|---|---|
AsyncScheduler | 最慢 | 低 | 低 | 大规模任务 |
QueueScheduler | 最快 | 高 | 低 | 小规模任务 |
AsapScheduler | 较快 | 中 | 低 | 中等规模任务 |
AnimationFrameScheduler | 适中 | 较高 | 中 | 动画相关人物 |
选择建议
根据不同的场景选择合适的调度器:
- 根据任务类型选择:
- 普通异步任务 → AsyncScheduler
- 同步/递归任务 → QueueScheduler
- UI状态更新 → AsapScheduler
- 动画/视觉效果 → AnimationFrameScheduler
- 根据性能需求选择:
- 需要低CPU消耗 → AsyncScheduler
- 需要快速响应 → QueueScheduler/AsapScheduler
- 需要流畅动画 → AnimationFrameScheduler
- 根据执行时机选择:
- 需要延迟执行 → AsyncScheduler
- 需要立即执行 → QueueScheduler
- 需要微任务执行 → AsapScheduler
- 需要帧同步 → AnimationFrameScheduler
- 实际应用建议:
- 可以在不同场景组合使用多个调度器
- 优先考虑默认行为,不要过度优化
- 根据实际性能表现选择合适的调度器
- 注意监控和测试不同调度器的效果
总结
通过对 RxJS Scheduler 的深入探讨,我们可以得出以下关键结论:
- 架构设计
- 分层设计:Scheduler 采用分层设计,包含基础的调度器类和特定的实现类。
- 灵活机制:Action 类体系提供了灵活的任务执行机制。
- 解耦设计:通过组合模式实现了调度器和动作的解耦。
- 性能考虑
- 调度器选择:选择合适的调度器对应用性能有重要影响。
- 响应与资源平衡:需要在响应速度和资源消耗之间找到平衡。
- 任务规模适配:不同规模的任务适合不同的调度器。
- 最佳实践
- 场景适配:根据具体场景选择合适的调度器。
- 避免过度优化:避免过度优化和不必要的调度器使用。
- 资源管理:注意资源管理和避免内存泄漏问题。
- 未来展望
- 新调度机制:随着 Web 平台的发展,可能会出现新的调度机制。
- 性能优化:调度器的性能优化空间仍然存在。
通过合理使用这些调度器,我们可以更好地控制异步操作的执行时序,提供更好的用户体验。理解调度器的工作原理和适用场景,是构建高性能 RxJS 应用的关键。在实际开发中,应该根据具体需求选择合适的调度器,同时关注性能指标和资源使用情况,以确保应用的最佳表现。