Published on

RxJS(一):深入理解 RxJS 基础与核心概念

Authors
  • avatar
    Name
    青雲
    Twitter

在现代前端开发中,响应式编程越来越流行,而 RxJS(Reactive Extensions for JavaScript)作为响应式编程的基础库,被广泛应用于处理异步事件流和操控异步数据。

为了帮助读者全面掌握 RxJS 的强大功能,我将通过三篇文章系统地介绍 RxJS。这是第一篇,围绕 RxJS 的基础知识和核心概念展开,帮助读者理解和掌握其基本用法。

引言

什么是RxJS?

在日常生活中,我们经常会遇到需要同时处理多个事务的情况,比如在做饭时监控着锅里的食物,同时又在等待微波炉里的食物加热完成,甚至还要时不时去看一下洗衣机的进度。这些活动类似于我们在软件开发中处理异步事件的任务,我们需要同时监控和响应多个数据流和事件。在开发中,这种情况非常普遍,比如用户界面上同时监听多个按钮的点击事件、网络请求的返回数据以及其他用户交互行为。

传统的编程方式可能会使得这些任务变得复杂而难以管理。想象一下,如果你要同时在多个锅里做饭,而每个锅的食物烹饪时间和温度都不同,若没有一个好的机制来同时管理它们,最终可能会导致食物烧焦或者半生不熟。

这就是 RxJS 发挥作用的地方。RxJS是一个强大的库,它提供了一种处理多个异步事件和数据流的优雅方式。通过引入 Observable(可观察对象)和 Observer(观察者)的概念,RxJS 允许开发者以声明式和函数式编程的方式,轻松创建、组合和订阅数据流。就像在厨房里,你有了一套完整的工具和监控系统,无论是管理多个烹饪过程还是等待多个定时器完成,你都能高效且准确地完成任务,确保每道菜都能完美出炉。

通过使用 RxJS,开发者可以在他们的应用中实现更加复杂和动态的响应式用户界面,更好地处理并发数据,以及以更干净、更模块化的方式解决异步编程的挑战。无论是处理 Web 应用中的用户输入,还是实时更新的数据展示,RxJS 都能让这些任务变得更加简单和高效。

RxJS 的应用场景和优势

  • 处理异步操作:RxJS 可以简化处理多个并发异步操作的逻辑,例如 HTTP 请求、事件处理等。
  • 数据流转换:通过操作符将数据流转换和处理,简化数据流的操作。
  • 事件处理:RxJS 使得处理复杂事件流变得更加简洁和强大。
  • 高效流控:用户可以通过 RxJS 轻松地控制大量数据流,以避免内存泄漏和性能问题。

前置知识:函数式编程和响应式编程

我在深入了解 JavaScript 中的函数式编程一文中介绍过函数式编程。函数式编程是一种编程范式,强调使用纯函数和函数组合来构建程序,并避免全局状态和副作用。简单来说,它强调数据的不变性和利用函数来表达计算逻辑。

举个例子,假设我们有一个数组,我们想将每个元素平方后再求和,传统编程可能这样写:

let numbers = [1, 2, 3, 4];
let squares = [];
for (let i = 0; i < numbers.length; i++) {
  squares.push(numbers[i] * numbers[i]);
}
let sumOfSquares = 0;
for (let i = 0; i < squares.length; i++) {
  sumOfSquares += squares[i];
}
console.log(sumOfSquares);

使用函数式编程,我们可以更简洁地实现同样的功能:

const numbers = [1, 2, 3, 4];
const sumOfSquares = numbers.map(n => n * n).reduce((a, b) => a + b, 0);
console.log(sumOfSquares);

响应式编程则是一种关注异步数据流和变化传播的编程范式。它允许我们以声明性的方式构建和操作数据流。

想象你正在开发一个实时搜索功能,每次用户输入一个字符,你都需要根据新的输入去更新搜索结果。键入事件本身可以被看作是一个数据流,你可以对这个数据流进行转换和操作。

使用响应式编程的方法,我们可以通过以下方式处理用户输入:以声明的方式处理异步操作和数据流,从而使代码更加清晰和易于维护。

fromEvent(inputElement, 'input')
  .pipe(
    debounceTime(300),
    map(event => event.target.value),
    switchMap(query => fetch(`https://api.example.com/search?q=${query}`)),
    map(response => response.json())
  )
  .subscribe(results => displayResults(results));

基础知识

安装和设置

在 Node.js 中安装 RxJS

首先,我们将通过 NPM 来安装 RxJS:

npm install rxjs

然后,可以在项目中引入 RxJS:

import { Observable, of } from 'rxjs';

在浏览器中使用 RxJS

以下是在浏览器中引入 RxJS 的方法:

<!-- 引入 CDN -->
<script src="https://unpkg.com/[email protected]/dist/bundles/rxjs.umd.min.js"></script>

<script>
  const { of } = rxjs;
  const source$ = of(1, 2, 3);
  source$.subscribe(console.log);
</script>

核心概念

Observable(可观察对象)

Observable 是 RxJS 及响应式编程的核心概念之一。它代表一个可以同步或异步推送数据流的对象。

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next('Hello');
  subscriber.next('World');
  subscriber.complete();
});

observable.subscribe(value => console.log(value));

Observer(观察者)

Observer 是接收和处理 Observable 推送的数据的类。它定义了 next, errorcomplete 三个回调函数。

const observer = {
  next: x => console.log('Next:', x),
  error: err => console.error('Error:', err),
  complete: () => console.log('Completed')
};

observable.subscribe(observer);

Subscription(订阅)

Subscription 是表示 Observable 和 Observer 之间订阅关系的对象。它允许我们取消订阅以停止接收数据流。

const subscription = observable.subscribe(observer);

// 取消订阅
subscription.unsubscribe();

Operators(操作符)

Operators 是对 Observable 进行转换、过滤、组合等操作的函数。RxJS 提供了丰富的操作符,用于操作数据流。

import { map } from 'rxjs/operators';

const numbers$ = of(1, 2, 3);
const squaredNumbers$ = numbers$.pipe(map(x => x * x));

squaredNumbers$.subscribe(value => console.log(value));
// 输出: 1, 4, 9

Scheduler(调度器)

Scheduler 用于控制 Observable 的执行上下文,例如异步任务的调度和队列管理。

import { asyncScheduler, of } from 'rxjs';

const observable = of('Hello', asyncScheduler);

console.log('Before subscription');
observable.subscribe(console.log);
console.log('After subscription');

创建 Observable

创建简单 Observable

创建 Observable 是 RxJS 的核心操作之一。在 RxJS 中,可以通过多种方式方便地创建 Observable,每种方式适用于不同的场景。

使用 of

of 操作符可以用来创建一个发出一组特定值的 Observable。它适用于你有一组有限的值,而且这些值是已知的情况。

import { of } from 'rxjs';

// 创建发出1、2、3的Observable
const observable = of(1, 2, 3);

// 订阅它
observable.subscribe(value => console.log(value)); // 输出: 1, 2, 3

使用from

from 操作符可以将一个数组、Promise 或可迭代对象转换为 Observable。它适用于你已经有一个数组或其他可以遍历的数据结构。

import { from } from 'rxjs';

// 将数组转换为Observable
const array = [10, 20, 30];
const observable = from(array);

// 订阅它
observable.subscribe(value => console.log(value)); // 输出: 10, 20, 30

使用fromEvent

fromEvent 操作符可以将 DOM 事件转换为 Observable。例如,当你需要处理用户交互事件(如点击、输入等)时,你可以使用 fromEvent 来创建 Observable。

import { fromEvent } from 'rxjs';

// 获取按钮元素
const button = document.getElementById('myButton');

// 将点击事件转换为Observable
const observable = fromEvent(button, 'click');

// 订阅它
observable.subscribe(event => console.log('Button clicked:', event));

使用 interval

interval 操作符可以创建一个定时发出序列值的 Observable。它适用于需要定时触发事件的情况,例如轮询操作。

import { interval } from 'rxjs';

// 创建一个每1000ms发出数值的Observable
const observable = interval(1000);

// 订阅它
observable.subscribe(value => console.log(value)); // 每隔1000ms输出递增的数值:0, 1, 2, ...

创建自定义 Observable

除了使用内置的操作符来创建 Observable 之外,RxJS 还允许我们自定义 Observable,以便在需要的时候灵活地处理任意的异步操作。下面将介绍如何使用 Observable.create 方法来创建自定义 Observable。

使用 Observable.create

Observable.create 方法允许你定义自定义的 Observable 逻辑,可以控制何时发出值(next)、处理错误(error)和完成(complete)通知。这特别适合处理复杂的异步任务。

import { Observable } from 'rxjs';

// 使用 Observable.create 方法创建自定义 Observable
const customObservable = new Observable(subscriber => {
  // 发出一些值
  subscriber.next('Custom data 1');
  subscriber.next('Custom data 2');

  // 模拟异步操作
  setTimeout(() => {
    subscriber.next('Custom data 3');
    subscriber.complete(); // 完成通知
  }, 2000);

  // 错误通知(根据需要使用)
  // subscriber.error('Something went wrong');
});

// 订阅自定义 Observable
customObservable.subscribe({
  next: value => console.log('Received value:', value),
  error: err => console.error('Error:', err),
  complete: () => console.log('Completed')
});

// 输出:
// Received value: Custom data 1
// Received value: Custom data 2
// 2秒后
// Received value: Custom data 3
// Complete

在这个示例中,我们创建了一个自定义 Observable,并在创建逻辑中发出了一些同步值(Custom data 1Custom data 2),然后通过 setTimeout 模拟了一个异步操作,2秒后发出了一个新的值(Custom data 3)并完成了 Observable(通过调用 subscriber.complete())。在订阅者中,我们通过 nexterrorcomplete 方法来处理接收到的值、错误和完成通知。

通过自定义 Observable,可以灵活地创建和管理任意复杂的异步操作,满足不同的需求。

订阅和取消订阅

在 RxJS 中,订阅和取消订阅是使用 Observable 的关键操作。当我们订阅一个 Observable 时,它开始发出值,并通过观察者处理这些值。我们还可以随时取消订阅,以停止接收数据流。

subscribe 方法详解

subscribe 方法会创建一个 Observer 并将其订阅到 Observable 上。当 Observable 推送数据时,Observer 的 nexterrorcomplete 方法会被调用,分别处理Observable 的数据、错误和完成通知。

const subscription = customObservable.subscribe({
  next: value => console.log('Received value:', value),
  error: err => console.error('Error:', err),
  complete: () => console.log('Completed')
});

// 输出:
// Received value: Custom data 1
// Received value: Custom data 2
// 2秒后
// Received value: Custom data 3
// Complete

管理订阅

Subscriptionunsubscribe

Subscription 表示一个订阅关系,调用 unsubscribe 方法可以取消订阅,停止接收数据。

import { interval } from 'rxjs';

const subscription = interval(1000).subscribe({
  next: value => console.log(value),
  complete: () => console.log('Completed')
});

// 5秒后取消订阅
setTimeout(() => {
  subscription.unsubscribe();
  console.log('Unsubscribed');
}, 5000);

addremove 子订阅

可以将多个订阅组合成一个父订阅,通过 add 和 remove 方法管理子订阅。

import { Subscription, interval } from 'rxjs';

const parentSubscription = new Subscription();

const childSubscription1 = interval(1000).subscribe(val => console.log('First:', val));
const childSubscription2 = interval(1000).subscribe(val => console.log('Second:', val));

parentSubscription.add(childSubscription1);
parentSubscription.add(childSubscription2);

// 5秒后取消所有子订阅
setTimeout(() => {
  parentSubscription.unsubscribe();
  console.log('Unsubscribed all');
}, 5000);

在这个示例中,我们创建了一个父订阅,并添加了两个子订阅。5秒后,父订阅取消,所有子订阅也随之取消。

结论

通过本文,我们深入介绍了 RxJS 的基础概念和核心功能。读者现在应该对 RxJS 的基本使用有了一定的了解,掌握了 Observable、Observer、Subscription、Operators 和 Scheduler 的基础知识。

在下一篇文章中,我们将深入探讨 RxJS 的操作符和订阅管理,帮助大家更好地掌握 RxJS 的强大功能,实现复杂的响应式编程逻辑。