Published on

RxJS源码解析(五): Scheduler--高效管理异步操作

Authors
  • avatar
    Name
    青雲
    Twitter

在 RxJS 中,Scheduler (调度器) 是一个重要的概念,它用于控制和管理 Observable 中的异步操作的执行时机。理解 Scheduler 的工作原理和使用方法,可以帮助我们更好地掌控 RxJS 中的异步流程。

基本概念

Scheduler(调度器)本质上是一个数据结构,它知道如何根据优先级或其他标准存储和队列任务,并在适当的时候执行这些任务。它由三个核心部分组成:

  1. 数据结构:用于存储和优先化任务,在 RxJS 中每个任务被包装成一个 Action 对象。Action 对象包含了需要执行的工作函数、状态数据以及执行延迟等信息。
  2. 执行上下文:定义任务在何时何地被执行,不同类型的调度器(如异步、队列等)会有不同的执行策略。例如,asyncScheduler 使用 setTimeoutsetInterval 来异步执行任务,而 queueScheduler 则是递归同步执行任务。
  3. 虚拟时钟:通过 now() 方法提供时间概念,使得调度器可以处理基于时间的操作,如延迟执行、定时执行等。虚拟时钟使得调度器能够模拟时间流逝,从而支持复杂的时间操作和测试。

这三个组成部分协同工作,使得 Scheduler 能够精确控制任务的执行时机和方式,是 RxJS 中进行时间和并发管理的核心机制。

核心组件

让我们深入分析 Scheduler 的两个核心组件及其源码实现。

类图

Scheduler 类

首先看 Scheduler 的基础实现:

export class Scheduler implements SchedulerLike {
  public static now: () => number = dateTimestampProvider.now;

  constructor(
    private schedulerActionCtor: typeof Action,
    now: () => number = Scheduler.now
  ) {
    this.now = now;
  }

  public now: () => number;

  public schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay: number = 0,
    state?: T
  ): Subscription {
    return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
  }
}

这段代码展示了 Scheduler 的核心设计:

  1. 时间管理:
    • 通过静态的 now 方法提供时间概念
    • 默认使用 dateTimestampProvider.now 获取真实时间
    • 可以在构造函数中传入自定义的 now 方法,实现虚拟时间
  2. 任务创建:
    • schedulerActionCtor 是一个 Action 构造器,决定了如何创建任务
    • 这种设计允许不同的调度器使用不同的 Action 实现
  3. 调度接口:
    • schedule 方法是核心调度接口
    • 接收工作函数(work)、延迟时间(delay)和状态(state)
    • 返回 Subscription 对象,允许取消调度的任务

Action 类层次结构

基础 Action 类

export class Action<T> extends Subscription {
  constructor(
    scheduler: Scheduler,
    work: (this: SchedulerAction<T>, state?: T) => void
  ) {
    super();
  }

  public schedule(state?: T, delay: number = 0): Subscription {
    return this;
  }
}

Action 类提供了最基础的抽象:

  • 继承自 Subscription,具备可取消的能力
  • 定义了基本的调度接口
  • 作为所有具体 Action 实现的基类

异步 Action 实现

export class AsyncAction<T> extends Action<T> {
  public id: TimerHandle | undefined;  // 用于存储定时器ID
  public state?: T;                    // 存储任务状态
  public delay!: number;               // 延迟时间
  protected pending: boolean = false;   // 任务是否待执行

  constructor(
    protected scheduler: AsyncScheduler,
    protected work: (this: SchedulerAction<T>, state?: T) => void
  ) {
    super(scheduler, work);
  }

  public schedule(state?: T, delay: number = 0): Subscription {
    if (this.closed) {
      return this;
    }

    // 更新状态
    this.state = state;
    const id = this.id;
    const scheduler = this.scheduler;

    // 如果已有定时器ID,先回收
    if (id != null) {
      this.id = this.recycleAsyncId(scheduler, id, delay);
    }

    // 设置任务为待执行状态
    this.pending = true;
    this.delay = delay;
    // 请求新的定时器ID
    this.id = this.requestAsyncId(scheduler, this.id, delay);

    return this;
  }
}

AsyncAction 实现了异步执行的核心逻辑:

  • 通过 id 管理定时器
  • 使用 pending 标记任务状态
  • schedule 方法实现了任务的异步调度
  • 提供了定时器管理机制(requestAsyncId/recycleAsyncId)

队列 Action 实现

export class QueueAction<T> extends AsyncAction<T> {
  public schedule(state?: T, delay: number = 0): Subscription {
    // 有延迟时使用异步方式
    if (delay > 0) {
      return super.schedule(state, delay);
    }

    // 无延迟时同步执行
    this.delay = delay;
    this.state = state;
    this.scheduler.flush(this);
    return this;
  }
}

QueueAction 展示了特殊的执行策略:

  • 继承自 AsyncAction,保留异步能力
  • 当 delay 为 0 时,通过 flush 立即同步执行
  • 当 delay 大于 0 时,回退到异步执行模式

工作流程

任务创建阶段

在 Scheduler 中创建一个新的 Action。Action 是调度任务的基本单元,通过封装待执行的工作函数和相关状态信息来实现。

new this.schedulerActionCtor<T>(this, work)

这就像你去餐厅点餐:

  • work 是你要点的菜(要执行的任务)
  • schedulerActionCtor 是厨师(决定如何处理这个任务)
  • 创建的 Action 实例就像一张订单

任务调度阶段

每个 Action 都有自己的 schedule 方法,用于安排具体的调度逻辑。此方法会确定任务何时执行,以及使用何种策略(例如,立即执行、延迟执行等)。

.schedule(state, delay)

相当于确定这个订单的具体细节:

  • state 是配菜要求(任务需要的数据)
  • delay 是预约时间(任务延迟多久执行)

执行控制阶段

根据调度策略,任务可以异步或同步地执行。

// 异步执行
this.id = this.requestAsyncId(scheduler, this.id, delay);

// 同步执行
this.scheduler.flush(this);

就像餐厅处理订单的两种方式:

  • 异步执行:像预约订单,到点再做
  • 同步执行:像堂食订单,立即开始做

资源清理阶段

执行完任务后,清理相关资源,例如取消或回收定时器,以确保不留有多余的定时器引用,从而避免内存泄漏。

this.recycleAsyncId(scheduler, id, delay);

就像完成订单后的清理工作:

  • 清理定时器(收拾餐桌)
  • 释放资源(回收餐具)

流程图

内置调度器类型

AsyncScheduler(异步调度器)

简介

最基础的调度器,使用 JavaScript 的 setTimeout/setInterval 机制来调度任务。它像一个普通的定时任务系统,可以处理延迟执行的任务。

export class AsyncScheduler extends Scheduler {
  // 存储待执行的任务队列
  public actions: Array<AsyncAction<any>> = [];
  // 标记是否有任务正在执行
  public _active: boolean = false;
  // 存储当前的定时器句柄
  public _scheduled: TimerHandle | undefined;

  public flush(action: AsyncAction<any>): void {
    const { actions } = this;
    if (this._active) {
      // 如果当前有任务在执行,将新任务加入队列
      actions.push(action);
      return;
    }
    // ... 执行任务
  }
}

特点

  1. 使用 setTimeout/setInterval 执行任务
  2. 支持延迟执行
  3. 任务按照队列顺序执行
  4. 适合处理异步操作和定时任务

使用场景

延时操作

// 延迟5秒后发出值
of(1, 2, 3).pipe(
  delay(5000, asyncScheduler)
).subscribe(console.log);

定时轮询

// 每2秒轮询一次
interval(2000, asyncScheduler).pipe(
  switchMap(() => this.api.getData())
).subscribe(data => console.log(data));

控制操作符的执行时机

// 强制 observeOn 使用异步调度
observable.pipe(
  observeOn(asyncScheduler)
).subscribe(console.log);

延迟订阅

// 5秒后才开始订阅
const subscription = observable
  .pipe(subscribeOn(asyncScheduler, 5000))
  .subscribe(console.log);

节流控制

// 使用 asyncScheduler 实现防抖
fromEvent(button, 'click').pipe(
  debounceTime(1000, asyncScheduler)
).subscribe(() => console.log('Clicked!'));

QueueScheduler(队列调度器)

简介

一个特殊的调度器,继承自 AsyncScheduler,但通过 QueueAction 实现了同步执行的能力。它能够立即执行没有延迟的任务,适合处理同步递归操作。

// QueueScheduler.ts
export class QueueScheduler extends AsyncScheduler {}

// QueueAction.ts
export class QueueAction<T> extends AsyncAction<T> {
  public schedule(state?: T, delay: number = 0): Subscription {
    // 有延迟时使用异步调度
    if (delay > 0) {
      return super.schedule(state, delay);
    }
    // 无延迟时立即执行
    this.delay = delay;
    this.state = state;
    this.scheduler.flush(this);
    return this;
  }

  public execute(state: T, delay: number): any {
    // 有延迟或已关闭时使用父类执行方式
    return delay > 0 || this.closed ? 
      super.execute(state, delay) : 
      this._execute(state, delay);
  }

  protected requestAsyncId(
    scheduler: QueueScheduler,
    id?: TimerHandle,
    delay: number = 0
  ): TimerHandle {
    // 有延迟时使用异步方式
    if ((delay != null && delay > 0) || 
        (delay == null && this.delay > 0)) {
      return super.requestAsyncId(scheduler, id, delay);
    }
    // 否则立即执行
    scheduler.flush(this);
    return 0; // 返回 0 作为有效的 TimerHandle
  }
}

特点

  1. 继承自 AsyncScheduler,保留异步能力
  2. 无延迟任务立即同步执行
  3. 有延迟任务使用异步模式
  4. 通过 flush 方法直接执行任务
  5. 适合处理递归操作

使用场景

递归操作

// 使用 queueScheduler 处理递归操作
const source = of(1, 2, 3).pipe(
  observeOn(queueScheduler),
  map(x => {
    console.log('Processing:', x);
    return x * x;
  })
);

source.subscribe(console.log);
// 输出:
// Processing: 1
// 1
// Processing: 2
// 4
// Processing: 3
// 9

同步队列处理

// 确保操作按顺序同步执行
const queue = new Subject();
queue.pipe(
  observeOn(queueScheduler),
  tap(x => console.log('Processing:', x)),
  map(x => x * 2)
).subscribe(console.log);

queue.next(1); // 立即处理
queue.next(2); // 立即处理
queue.next(3); // 立即处理

控制并发操作

// 使用 queueScheduler 控制操作的执行顺序
merge(
  of('A', 'B', 'C'),
  of(1, 2, 3)
).pipe(
  observeOn(queueScheduler)
).subscribe(console.log);
// 输出会按顺序执行,不会交错

确保同步执行

// 强制同步执行,即使是异步操作
from(Promise.resolve('Hello')).pipe(
  subscribeOn(queueScheduler)
).subscribe(console.log);

AsapScheduler(微任务调度器)

简介

使用微任务机制的调度器,通过 Immediate 工具类实现,它会在当前同步代码执行完成后立即执行任务。

// AsapAction.ts
export class AsapAction<T> extends AsyncAction<T> {
  protected requestAsyncId(scheduler: AsapScheduler, id?: TimerHandle, delay: number = 0): TimerHandle {
    // 有延迟时使用普通的异步调度
    if (delay !== null && delay > 0) {
      return super.requestAsyncId(scheduler, id, delay);
    }
    // 将动作推入调度器队列
    scheduler.actions.push(this);
    // 使用 immediateProvider 调度微任务
    return scheduler._scheduled || (
      scheduler._scheduled = immediateProvider.setImmediate(
        scheduler.flush.bind(scheduler, undefined)
      )
    );
  }
}

// Immediate.ts
export const Immediate = {
  setImmediate(cb: () => void): number {
    const handle = nextHandle++;
    activeHandles[handle] = true;
    if (!resolved) {
      resolved = Promise.resolve();
    }
    // 使用 Promise 实现微任务调度
    resolved.then(() => findAndClearHandle(handle) && cb());
    return handle;
  },
  clearImmediate(handle: number): void {
    findAndClearHandle(handle);
  }
};

特点

  1. 使用 Promise 实现微任务机制
  2. 执行时机比 setTimeout 更早
  3. 在当前同步代码执行完立即执行
  4. 通过 Immediate 工具类管理微任务

使用场景

快速响应的异步操作

// 使用 asapScheduler 实现快速的异步响应
of(1, 2, 3).pipe(
  observeOn(asapScheduler)
).subscribe(console.log);
// 会在当前同步代码执行完后立即执行

微任务级别的状态更新

// 用于状态更新,确保在当前同步代码执行完后立即更新
const state$ = new BehaviorSubject(0);

state$.pipe(
  observeOn(asapScheduler),
  tap(state => updateUI(state))
).subscribe();

// 多个状态更新会在同一个微任务中批量处理
state$.next(1);
state$.next(2);
state$.next(3);

DOM 更新优化

// 在 DOM 操作中使用,确保更新及时但不阻塞主线程
fromEvent(input, 'input').pipe(
  map(e => e.target.value),
  observeOn(asapScheduler),
  tap(value => updateSearchResults(value))
).subscribe();

Promise 链中的操作

// 与 Promise 配合使用,保持相似的执行时序
from(Promise.resolve('data')).pipe(
  observeOn(asapScheduler),
  map(data => processData(data))
).subscribe(console.log);

AnimationFrameScheduler(动画帧调度器)

简介

专门用于处理动画相关操作的调度器,使用浏览器的 requestAnimationFrame API,确保任务在下一帧渲染前执行。

// AnimationFrameAction.ts
export class AnimationFrameAction<T> extends AsyncAction<T> {
  protected requestAsyncId(
    scheduler: AnimationFrameScheduler,
    id?: TimerHandle,
    delay: number = 0
  ): TimerHandle {
    // 有延迟时使用普通的异步调度
    if (delay !== null && delay > 0) {
      return super.requestAsyncId(scheduler, id, delay);
    }
    // 将动作推入调度器队列
    scheduler.actions.push(this);
    // 使用 animationFrameProvider 请求动画帧
    return scheduler._scheduled || (
      scheduler._scheduled = animationFrameProvider.requestAnimationFrame(
        () => scheduler.flush(undefined)
      )
    );
  }

  protected recycleAsyncId(
    scheduler: AnimationFrameScheduler,
    id?: TimerHandle,
    delay: number = 0
  ): TimerHandle | undefined {
    if (delay != null ? delay > 0 : this.delay > 0) {
      return super.recycleAsyncId(scheduler, id, delay);
    }
    // 清理不需要的动画帧请求
    const { actions } = scheduler;
    if (id != null && actions[actions.length - 1]?.id !== id) {
      animationFrameProvider.cancelAnimationFrame(id as number);
      scheduler._scheduled = undefined;
    }
    return undefined;
  }
}

// animationFrameProvider.ts
export const animationFrameProvider: AnimationFrameProvider = {
  schedule(callback) {
    let request = requestAnimationFrame;
    let cancel = cancelAnimationFrame;
    const { delegate } = animationFrameProvider;
    
    if (delegate) {
      request = delegate.requestAnimationFrame;
      cancel = delegate.cancelAnimationFrame;
    }
    
    const handle = request((timestamp) => {
      cancel = undefined;
      callback(timestamp);
    });
    
    return new Subscription(() => cancel?.(handle));
  },
  // ... 其他方法
};

特点

  1. 使用浏览器的 requestAnimationFrame API
  2. 任务在下一帧渲染前执行
  3. 支持动画帧的取消和重新调度
  4. 提供可定制的动画帧处理机制(通过 delegate)
  5. 适合处理视觉相关的操作

使用场景

平滑动画

// 创建平滑的计数动画
interval(0, animationFrameScheduler).pipe(
  map(frame => Math.min(frame, 100)),
  takeWhile(v => v <= 100)
).subscribe(value => {
  element.style.width = `${value}%`;
});

视觉效果过渡

// 实现平滑的颜色过渡
const startColor = [0, 0, 0];
const endColor = [255, 255, 255];

interval(0, animationFrameScheduler).pipe(
  map(frame => {
    const progress = Math.min(frame / 60, 1); // 假设60帧完成过渡
    return startColor.map((start, i) => 
      Math.round(start + (endColor[i] - start) * progress)
    );
  }),
  takeWhile(([r]) => r <= endColor[0])
).subscribe(([r, g, b]) => {
  element.style.backgroundColor = `rgb(${r},${g},${b})`;
});

滚动动画

// 实现平滑滚动
const scroll$ = fromEvent(window, 'scroll').pipe(
  observeOn(animationFrameScheduler),
  map(() => window.scrollY)
);

// 平滑滚动到指定位置
function smoothScrollTo(target: number) {
  const start = window.scrollY;
  interval(0, animationFrameScheduler).pipe(
    map(frame => {
      const progress = Math.min(frame / 30, 1); // 30帧完成滚动
      return start + (target - start) * easeInOut(progress);
    }),
    takeWhile(current => Math.abs(current - target) > 1)
  ).subscribe(y => window.scrollTo(0, y));
}

Canvas 动画

// Canvas 动画绘制
const canvas = document.querySelector('canvas');
const ctx = canvas.getContext('2d');

interval(0, animationFrameScheduler).pipe(
  map(frame => ({
    x: Math.cos(frame / 10) * 100,
    y: Math.sin(frame / 10) * 100
  }))
).subscribe(pos => {
  ctx.clearRect(0, 0, canvas.width, canvas.height);
  ctx.beginPath();
  ctx.arc(pos.x + canvas.width/2, pos.y + canvas.height/2, 10, 0, Math.PI * 2);
  ctx.fill();
});

内置调度器工作流程图

调度器选择指南

执行机制对比

调度器类型执行机制执行时机任务队列
AsyncSchedulersetTimeout/setInterval宏任务,最后执行异步队列
QueueScheduler同步执行立即执行同步队列
AsapSchedulerPromise(微任务)当前同步代码后立即执行微任务队列
AnimationFrameSchedulerrequestAnimationFrame下一帧渲染前执行动画帧队列

使用场景对比

调度器类型主要场景适用情况不适用情况
AsyncScheduler+ 延时操作
+ 定时轮询
+ 延迟订阅
+ 需要特定延迟的操
+ 周期性任务
+ 常规异步操作
+ 需要立即响应的场
+ 动画相关操作
QueueScheduler+ 递归操作
+ 同步队列处理
+ 控制执行顺序
+ 需要立即执行的操
+ 需要保证执行顺序
+ 处理同步递归
+ 有延迟需求的场景
+ 耗时操作
AsapScheduler+ 快速响应的异步操作
+ 状态更新
+ DOM更新
+ 需要快速异步响应
+ UI状态更新
+ Promise相关操作
+ 需要特定延迟的场景
+ 动画效果
AnimationFrameScheduler+ 动画效果
+ 视觉更新
+ Canvas绘制
+ 平滑动画
+ 视觉效果过渡
+ 性能敏感的UI更新
+ 非视觉相关的操作
+ 需要精确延时的场景

性能特征对比

调度器类型响应速度CPU消耗内存占用适用规模
AsyncScheduler最慢大规模任务
QueueScheduler最快小规模任务
AsapScheduler较快中等规模任务
AnimationFrameScheduler适中较高动画相关人物

选择建议

根据不同的场景选择合适的调度器:

  1. 根据任务类型选择:
    • 普通异步任务 → AsyncScheduler
    • 同步/递归任务 → QueueScheduler
    • UI状态更新 → AsapScheduler
    • 动画/视觉效果 → AnimationFrameScheduler
  2. 根据性能需求选择:
    • 需要低CPU消耗 → AsyncScheduler
    • 需要快速响应 → QueueScheduler/AsapScheduler
    • 需要流畅动画 → AnimationFrameScheduler
  3. 根据执行时机选择:
    • 需要延迟执行 → AsyncScheduler
    • 需要立即执行 → QueueScheduler
    • 需要微任务执行 → AsapScheduler
    • 需要帧同步 → AnimationFrameScheduler
  4. 实际应用建议:
    • 可以在不同场景组合使用多个调度器
    • 优先考虑默认行为,不要过度优化
    • 根据实际性能表现选择合适的调度器
    • 注意监控和测试不同调度器的效果

总结

通过对 RxJS Scheduler 的深入探讨,我们可以得出以下关键结论:

  • 架构设计
    • 分层设计:Scheduler 采用分层设计,包含基础的调度器类和特定的实现类。
    • 灵活机制:Action 类体系提供了灵活的任务执行机制。
    • 解耦设计:通过组合模式实现了调度器和动作的解耦。
  • 性能考虑
    • 调度器选择:选择合适的调度器对应用性能有重要影响。
    • 响应与资源平衡:需要在响应速度和资源消耗之间找到平衡。
    • 任务规模适配:不同规模的任务适合不同的调度器。
  • 最佳实践
    • 场景适配:根据具体场景选择合适的调度器。
    • 避免过度优化:避免过度优化和不必要的调度器使用。
    • 资源管理:注意资源管理和避免内存泄漏问题。
  • 未来展望
    • 新调度机制:随着 Web 平台的发展,可能会出现新的调度机制。
    • 性能优化:调度器的性能优化空间仍然存在。

通过合理使用这些调度器,我们可以更好地控制异步操作的执行时序,提供更好的用户体验。理解调度器的工作原理和适用场景,是构建高性能 RxJS 应用的关键。在实际开发中,应该根据具体需求选择合适的调度器,同时关注性能指标和资源使用情况,以确保应用的最佳表现。