Published on

RxJS(二):掌握 RxJS 操作符与订阅管理

Authors
  • avatar
    Name
    青雲
    Twitter

在上一篇文章中,我们介绍了 RxJS 的基础知识和核心概念,讨论了 Observables、Observers 和 Subscriptions 如何在响应式编程中发挥作用。这篇文章将深入探讨 RxJS 的操作符和订阅管理,帮助你更好地掌握 RxJS 的强大功能,实现复杂的响应式编程逻辑。

引言

回顾 RxJS 基础概念

在继续深入之前,简要回顾一下 RxJS 的基础概念,包括 Observable(可观察对象)、Observer(观察者)、Subscription(订阅)等。通过这些概念,我们能够创建、管理和订阅数据流。

操作符的重要性

操作符(Operators)在 RxJS 中起着至关重要的作用。它们是对 Observable 进行转换、过滤、组合等操作的核心工具,使得处理复杂的数据流变得更加简洁和高效。

操作符详解

转换操作符

转换操作符是将一个 Observable 发出的值通过某种方式转换,并发出新的 Observable。这些操作符是 RxJS 的核心功能之一,通过变化数据流来实现异步编程的复杂逻辑。

map

map 操作符对 Observable 发出的每个值应用一个函数,并发出函数的结果。它类似于数组的 map 方法,但适用于 Observable。

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

const numbers$ = of(1, 2, 3);  // 创建一个发出数字的 Observable
const squaredNumbers$ = numbers$.pipe(map(x => x * x)); // 对每个值应用平方函数

squaredNumbers$.subscribe(value => console.log(value)); // 输出: 1, 4, 9

mapTo

mapTo 操作符将每个发出的值映射为同一个常量值。

import { of } from 'rxjs';
import { mapTo } from 'rxjs/operators';

const numbers$ = of(1, 2, 3);  // 创建一个发出数字的 Observable
const mappedNumbers$ = numbers$.pipe(mapTo('Map')); // 将每个值映射为字符串 "Map"

mappedNumbers$.subscribe(value => console.log(value)); // 输出: Map, Map, Map

switchMap

switchMap 操作符将发出的每个值映射为一个新的 Observable,并取消订阅之前的内部 Observable,当源 Observable 发出新值时。

import { fromEvent, interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

const clicks$ = fromEvent(document, 'click'); // 创建一个从点击事件中发出值的 Observable
const result$ = clicks$.pipe(switchMap(() => interval(1000))); // 将每个点击事件映射为新的 interval Observable

result$.subscribe(value => console.log(value)); // 每次点击后,开始新的计时器并取消上一个计时器

mergeMap

mergeMap 操作符将每个发出的值映射成一个新的 Observable,并合并所有这些内部 Observables,即并发地处理它们。

import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const letters$ = of('a', 'b', 'c');  // 创建一个发出字母的 Observable
const result$ = letters$.pipe(mergeMap(letter => of(letter + '1', letter + '2'))); // 将每个值映射为两个发出新值的 Observables

result$.subscribe(value => console.log(value)); // 输出: a1, a2, b1, b2, c1, c2

concatMap

concatMap 操作符将每个发出的值映射成一个新的 Observable,并按顺序订阅这些内部 Observables。

import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

const letters$ = of('a', 'b', 'c');  // 创建一个发出字母的 Observable
const result$ = letters$.pipe(concatMap(letter => of(letter).pipe(delay(1000)))); // 将每个值映射为一个延迟发出的 Observable

result$.subscribe(value => console.log(value)); // 每隔1秒依次输出: a, b, c

过滤操作符

过滤操作符用于根据条件筛选 Observable 中的数据流。通过这些操作符,我们可以控制哪些数据可以通过流动,而哪些数据会被丢弃。

filter

filter 操作符基于一个谓词函数过滤Observable 发出的值,只发出通过过滤条件的值。

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);  // 创建一个发出数字的 Observable
const filteredNumbers$ = numbers$.pipe(filter(x => x % 2 === 0)); // 过滤掉奇数,只发出偶数

filteredNumbers$.subscribe(value => console.log(value)); // 输出: 2, 4

first

first 操作符只发出 Observable 发出的第一个值,并完成。如果指定了谓词,则发出第一个符合条件的值。

import { of } from 'rxjs';
import { first } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);  // 创建一个发出数字的 Observable
const firstNumber$ = numbers$.pipe(first()); // 只发出第一个值

firstNumber$.subscribe(value => console.log(value)); // 输出: 1

last

last 操作符只发出 Observable 发出的最后一个值。如果指定了谓词,则发出最后一个符合条件的值。

import { of } from 'rxjs';
import { last } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);  // 创建一个发出数字的 Observable
const lastNumber$ = numbers$.pipe(last()); // 只发出最后一个值

lastNumber$.subscribe(value => console.log(value)); // 输出: 5

take

take 操作符只发出前 n 个值,然后完成

import { of } from 'rxjs';
import { take } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);  // 创建一个发出数字的 Observable
const firstThree$ = numbers$.pipe(take(3)); // 只发出前3个值

firstThree$.subscribe(value => console.log(value)); // 输出: 1, 2, 3

skip

skip 操作符跳过前 n 个值,然后发出后续值。

import { of } from 'rxjs';
import { skip } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);  // 创建一个发出数字的 Observable
const skippedFirstTwo$ = numbers$.pipe(skip(2)); // 跳过前2个值

skippedFirstTwo$.subscribe(value => console.log(value)); // 输出: 3, 4, 5

组合操作符

组合操作符用于组合多个 Observable,从而创建一个新的 Observable。这些操作符允许你灵活地处理多个数据流,使得你可以更高效地处理复杂的异步操作。

merge

merge 操作符将多个 Observables 合并为一个 Observable,以并发的方式发出值。它会订阅所有的输入 Observable,并将它们发出的所有值作为单个 Observable 发出。

import { of, merge } from 'rxjs';
import { delay } from 'rxjs/operators';

// 创建两个 Observables,第二个 Observable 会延迟发出值
const observable1 = of('a', 'b', 'c');  // 第一个源 Observable
const observable2 = of('1', '2', '3').pipe(delay(1000));  // 第二个源 Observable,每个值延迟1秒发出

// 使用 merge 将两个 Observables 合并起来
const merged$ = merge(observable1, observable2);

// 订阅并输出值
merged$.subscribe(value => console.log(value)); 
// 输出: a, b, c  (来自于observable1) 
// 输出: 1, 2, 3 (在1秒后, 来自于observable2,且并发发出)

concat

concat 操作符将多个 Observables 串联到一起来发出值,这些 Observables 会按顺序触发,前一个 Observable 完成后才会开始发出下一个 Observable 的值。

import { of, concat } from 'rxjs';

// 创建两个 Observables
const observable1 = of('a', 'b', 'c');  // 第一个源 Observable
const observable2 = of('1', '2', '3');  // 第二个源 Observable

// 使用 concat 将两个 Observables 串联起来
const concatenated$ = concat(observable1, observable2);

// 订阅并输出值
concatenated$.subscribe(value => console.log(value)); 
// 输出: a, b, c, 1, 2, 3,只有在observable1的所有值都发出后, observable2才会发出它的值

combineLatest

combineLatest 操作符会等待所有输入 Observable 都发出至少一个值,然后每当其中任意一个 Observable 发出值时,将最新的每个 Observable 发出的值作为数组组合并发出。

import { timer, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';

// 创建两个 timer Observable
const timer1$ = timer(0, 1000).pipe(map(value => `Timer1: ${value}`)); // 第一个值立即发出,然后每隔 1 秒继续发出递增的值,并标记是 Timer1 的值
const timer2$ = timer(500, 1000).pipe(map(value => `Timer2: ${value}`)); // 第一个值在 500 毫秒后发出,然后每隔 1 秒继续发出递增的值,并标记是 Timer2 的值

// 组合两个 Observable,发出最新的组合值
const combined$ = combineLatest([timer1$, timer2$]);

// 订阅并输出组合后的值
combined$.subscribe(value => console.log(value)); 
// 初次组合值输出: ['Timer1: 0', 'Timer2: 0']
// 每次 Timer1 或 Timer2 发出新值时,输出组合后的最新值: ['Timer1: 1', 'Timer2: 0'], ['Timer1: 1', 'Timer2: 1'], ['Timer1: 2', 'Timer2: 1'], ...
  1. t = 0 ms:
    1. timer1$ 发出 Timer1: 0
    2. timer2$ 还未发出值,combineLatest$ 不会发出值
  2. t = 500 ms:
    1. timer1$ 保持不变,但 timer2$ 发出 Timer2: 0
    2. combineLatest$ 输出 [Timer1: 0, Timer2: 0]
  3. t = 1000 ms:
    1. timer1$ 发出 Timer1: 1
    2. timer2$ 保持 Timer2: 0
    3. combineLatest$ 输出 [Timer1: 1, Timer2: 0]
  4. t = 1500 ms:
    1. timer1$ 保持 Timer1: 1
    2. timer2$ 发出 Timer2: 1
    3. combineLatest$ 输出 [Timer1: 1, Timer2: 1]
  5. t = 2000 ms:
    1. timer1$ 发出 Timer1: 2
    2. timer2$ 保持 Timer2: 1
    3. combineLatest$ 输出 [Timer1: 2, Timer2: 1]

withLatestFrom

withLatestFrom 操作符会结合主 Observable 的值和其他 Observable 最新发出的值,并作为数组发出。

import { fromEvent, interval } from 'rxjs';
import { withLatestFrom } from 'rxjs/operators';

const clicks$ = fromEvent(document, 'click'); // 点击事件 Observable
const timer$ = interval(1000); // 每秒递增的 Observable

const result$ = clicks$.pipe(withLatestFrom(timer$)); // 结合点击事件和最新的计时值

result$.subscribe(value => console.log(value)); // 每次点击时输出:[MouseEvent, timer的最新值]

创建操作符

创建操作符用于生成新的 Observable,它们是 RxJS 中非常重要的一部分。这些操作符能够帮助我们轻松创建 Observable,以适应不同的需求。

startWith

startWith 操作符在 Observable 发出任何值之前首先发出一个特定的值。它可以用于提供初始值或默认值。

import { of } from 'rxjs';
import { startWith } from 'rxjs/operators';

// 创建一个发出1、2、3的Observable,并在开始时先发出0
const numbers$ = of(1, 2, 3).pipe(startWith(0));

// 订阅并输出值
numbers$.subscribe(value => console.log(value)); // 输出: 0, 1, 2, 3

timer

timer 操作符会在指定时间后发出一个值。如果提供两个参数,则它还会接着以指定的时间间隔发出值。

import { timer } from 'rxjs';

// 创建一个在2秒后发出第一个值的Observable
const singleTimer$ = timer(2000);

// 订阅并输出值
singleTimer$.subscribe(value => console.log('Single Timer:', value)); // 2秒后输出: Single Timer: 0

// 创建一个在2秒后发出第一个值,然后每隔1秒发出值的Observable
const intervalTimer$ = timer(2000, 1000);

// 订阅并输出值
intervalTimer$.subscribe(value => console.log('Interval Timer:', value)); // 2秒后第一次输出: Interval Timer: 0, 然后每隔1秒递增: Interval Timer: 1, 2, 3, ...

Marble Diagrams:

时间(s):                 0-1-2-3-4-5-6-7-
timer(2000):            ----0--|
timer(2000, 1000):      ----0--1--2--3--|

range

range 操作符创建一个发出特定范围值序列的 Observable。它允许我们指定起始值和数量。

import { range } from 'rxjs';

// 创建一个从5开始,发出5个连续整数的Observable
const numbers$ = range(5, 5);

// 订阅并输出值
numbers$.subscribe(value => console.log(value)); // 输出: 5, 6, 7, 8, 9

Marble Diagrams

range(5, 5):            --5--6--7--8--9--|

错误处理操作符

在处理 Observable 的过程中,错误处理是一个重要的部分。RxJS 提供了几种错误处理操作符,帮助我们优雅地处理和恢复来自 Observable 的错误流。

catchError

catchError 操作符用于捕获源 Observable 中的错误,并返回一个新的 Observable 或抛出一个错误。这使我们能够从错误中恢复并继续流的处理。

import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

const source$ = throwError('An error occurred!'); // 创建一个发出错误的Observable

const handled$ = source$.pipe(
  catchError(error => {
    console.error('Caught error:', error);
    return of('Default value'); // 返回一个新的 Observable 作为替换
  })
);

// 订阅并输出值
handled$.subscribe(value => console.log(value)); // 输出: Caught error: An error occurred!  默认值: Default value

Marble Diagrams

Source Observable:   ----#|
catchError:          ----(Default value)|

retry

retry 操作符用于在源 Observable 遇到错误时自动重新订阅。可以指定重新尝试的次数,超过次数后如果仍然发生错误,则错误将被最终传递出去。

import { of, Observable } from 'rxjs';
import { retry, catchError } from 'rxjs/operators';

let count = 0;
const source$ = new Observable((observer) => {
  if (count++ < 2) {
    observer.error('Error occurred');
  } else {
    observer.next('Success');
    observer.complete();
  }
});

const retrying$ = source$.pipe(
  retry(2), // 重新尝试2次
  catchError((error) => of(`Caught final error after 2 retries: ${error}`))
);

// 订阅并输出值
retrying$.subscribe((value) => console.log(value));
// 第三次应该成功,因此输出: "Success"

Marble Diagrams

Source Observable:
----#        (第一次: 发射错误)
    ----#    (第二次: 重试后发射错误)
        ----(Success)| (第三次: 重试后成功,并完成)

retry 操作符通过 retryOptions 来配置重试行为,包括最大重试次数和重试通知等。我们可以使用它来实现自定义重试逻辑。

import { of, timer, throwError, Observable } from 'rxjs';
import { retry, catchError, tap } from 'rxjs/operators';

// 假设我们有一个可能失败的过程
let failureCount = 0;
const unreliableObservable = new Observable(observer => {
  if (failureCount < 2) {
    observer.error('Simulated failure');
    failureCount++;
  } else {
    observer.next('Success');
    observer.complete();
  }
});

// 使用retryOptions进行重试控制
const retrying$ = unreliableObservable.pipe(
  retry({
    count: 3, // 最大重试次数
    delay: (error, retryCount) => {
      console.log(`Retry attempt ${retryCount}, error: ${error}`);
      return timer(1000 * retryCount); // 重试时延时间
    }
  }),
  catchError(error => of(`Final error after retries: ${error}`))
);

// 订阅并输出值
retrying$.subscribe(value => console.log(value));
// 输出:
// Retry attempt 1, error: Simulated failure
// Retry attempt 2, error: Simulated failure
// Success

Marble Diagrams

Source Observable:        ----#|
1st retry (with delay):       ----1s----#|
2nd retry (with delay):             ----2s----#|
3rd retry (with delay):                   ----3s----(Success)|

20240712161012_rec_.gif

辅助操作符

辅助操作符(Utility Operators)用于辅助其他操作符或提供便利的方法。它们通常不改变数据流的内容,而是增强可观察对象(Observable)的行为。

tap

tap 操作符在 Observable 的每个值被发出时执行副作用操作。它主要用于调试和日志记录,帮助你观察数据流而不改变数据本身。

import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

// 创建一个发出数字的 Observable,并记录每个值
const numbers$ = of(1, 2, 3).pipe(
  tap(value => console.log('Before map:', value)), // 输出: 'Before map: 1', 'Before map: 2', 'Before map: 3'
  map(value => value * 10), // 转换数据流的值
  tap(value => console.log('After map:', value)) // 打印转换后的值: 'After map: 10', 'After map: 20', 'After map: 30'
);

// 订阅并输出值
numbers$.subscribe(value => console.log('Final value:', value)); // 输出: 10, 20, 30

delay

delay 操作符将延迟 Observable 的发射时间。它用于控制发射的时序,可以为基于时间的操作提供便利。

import { of } from 'rxjs';
import { delay } from 'rxjs/operators';

// 创建一个延迟发出值的 Observable
const delayed$ = of('Hello').pipe(delay(2000)); // 延迟 2 秒发出值

delayed$.subscribe(value => console.log(value)); // 2 秒后输出: 'Hello'

Marble Diagrams

Source Observable:      --(Hello)--|
delay(2000):            ----(Hello)--|

timeout

timeout 操作符如果在指定时间内没有发出值,将会抛出一个超时错误。这对于防止数据流卡住并提供超时处理非常有用。

import { of, timer } from 'rxjs';
import { timeout, catchError } from 'rxjs/operators';

// 创建一个发出延迟值的 Observable
const lateObservable$ = timer(3000).pipe(
  timeout(2000), // 如果超过 2 秒没有发出值,则抛出超时错误
  catchError(error => of(`Error: ${error.message}`)) // 捕获并处理错误
);

lateObservable$.subscribe(value => console.log(value)); // 2 秒后输出: 'Error: Timeout has occurred'

Marble Diagrams

Source Observable:      -------(value)|
timeout(2000):          -----(TimeoutError)--|

多播和分享 Observable

多播和分享 Observable 是 RxJS 中非常重要的概念,它们允许多个订阅者共享一个单独的 Observable 执行,避免重复订阅和重复执行。通过多播和分享操作符,我们可以优化性能,提高代码的可读性和可维护性。

connect

connect 操作符通过在 Observable 中多播数据流的方式,允许开发者定义在连接前如何使用多播的 Observable。适用于需要手动多播的场景。

import { of, tap, connect, merge, map, filter } from 'rxjs';

// 源 Observable
const source$ = of(1, 2, 3, 4, 5).pipe(
  tap({
    subscribe: () => console.log('subscription started'),
    next: n => console.log(`source emitted ${n}`)
  })
);

// 使用 connect 进行多播
source$.pipe(
  connect(shared$ =>
    merge(
      shared$.pipe(map(n => `all ${n}`)),
      shared$.pipe(filter(n => n % 2 === 0), map(n => `even ${n}`)),
      shared$.pipe(filter(n => n % 2 !== 0), map(n => `odd ${n}`))
    )
  )
).subscribe(console.log);

share

share 操作符是使用最广泛的操作符之一。它是 multicast 和 refCount 的简写。在有第一个订阅者时开始订阅原始 Observable,并在所有订阅者取消订阅时停止订阅。

import { interval } from 'rxjs';
import { share, tap, take, map } from 'rxjs/operators';

// 创建一个每秒发出一个值的 Observable
const source$ = interval(1000).pipe(
  tap(x => console.log('Processing:', x)),
  map(x => `Stream ${x}`),
  take(5),
  share()
);

// 第一个订阅者
source$.subscribe(x => console.log('Subscription 1:', x));

// 延迟 2 秒后的第二个订阅者开始订阅
setTimeout(() => {
  source$.subscribe(x => console.log('Subscription 2:', x));
}, 2000);

shareReplay

shareReplay 操作符不仅能多播数据流,还能重播指定数量的最近发出的值给新的订阅者。这对于需要新订阅者获取以前发出的值的场景非常有用。

import { interval } from 'rxjs';
import { shareReplay, take } from 'rxjs/operators';

// 创建一个每秒发出一个值的 Observable
const source$ = interval(1000).pipe(
  take(6),
  shareReplay(3)  // 重播最近的3个值
);

source$.subscribe(x => console.log('Sub A:', x));
source$.subscribe(x => console.log('Sub B:', x));

// 延迟后第三个订阅者开始订阅
setTimeout(() => {
  source$.subscribe(x => console.log('Sub C:', x));
}, 11000);

Subject

Subject 是一种特殊类型的 Observable,它允许多播值给多个观察者。Subject 是观察者,也是可观察对象。

Subject

Subject 是最基础的 Subject 类型,它既是一个 Observable,也是一个 Observer,可以多播值给多个订阅者。

import { Subject } from 'rxjs';

const subject = new Subject<number>();

// 订阅者 1
subject.subscribe({
  next: (v) => console.log(`Observer 1: ${v}`)
});

// 订阅者 2
subject.subscribe({
  next: (v) => console.log(`Observer 2: ${v}`)
});

// 通过 subject 发出值
subject.next(1);
subject.next(2);

image.png Marble Diagrams:

Subject:             --1--2--|
Observer 1:      ------1--2--|
Observer 2:      ------1--2--|

BehaviorSubject

BehaviorSubject 是一种特殊的 Subject,它总是保存最新的值,并将在新的观察者订阅时立即发出这个值。

import { BehaviorSubject } from 'rxjs';

const behaviorSubject = new BehaviorSubject<number>(0); // 初始值为 0

// 订阅者 1
behaviorSubject.subscribe({
  next: (v) => console.log(`Observer 1: ${v}`)
});

// 通过 BehaviorSubject 发出值
behaviorSubject.next(1);
behaviorSubject.next(2);

// 订阅者 2
behaviorSubject.subscribe({
  next: (v) => console.log(`Observer 2: ${v}`)
});

behaviorSubject.next(3);

image.png

ReplaySubject

ReplaySubject 缓存所有发出的值,并将这些值重播给新的观察者。可以指定缓冲区大小来限制重放的值个数。

import { ReplaySubject } from 'rxjs';

const replaySubject = new ReplaySubject<number>(2); // 缓存最近 2 个值

// 订阅者 1
replaySubject.subscribe({
  next: (v) => console.log(`Observer 1: ${v}`)
});

// 通过 ReplaySubject 发出值
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

// 订阅者 2
replaySubject.subscribe({
  next: (v) => console.log(`Observer 2: ${v}`)
});

replaySubject.next(4);

image.png

AsyncSubject

AsyncSubject 只会在完成时发出最后一个值,然后完成。

import { AsyncSubject } from 'rxjs';

const asyncSubject = new AsyncSubject<number>();

// 订阅者 1
asyncSubject.subscribe({
  next: (v) => console.log(`Observer 1: ${v}`)
});

// 通过 AsyncSubject 发出值
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);

// 订阅者 2
asyncSubject.subscribe({
  next: (v) => console.log(`Observer 2: ${v}`)
});

// 发出 complete
asyncSubject.complete();

image.png

结论

我们在本章中详细探讨了 RxJS 中的重要操作符和管理订阅的最佳实践。通过多个小节逐步深入,我们覆盖了变换、过滤、组合、创建、错误处理、辅助操作符以及多播与分享 Observable 和 Subject 的详细知识。 下一篇文章将深入探讨 RxJS 的高级应用和实战案例,如果你有任何问题或想法,欢迎交流和探讨!