0%

RxJS 快速入门——响应式编程最佳实践

最近在优化一个搜索功能,用到了 RxJS 的防抖和合并请求功能,效果相当不错!以前用 setTimeout 和 Promise 处理异步逻辑总觉得不够优雅,RxJS 简直是异步编程的救星。

RxJS 简介

  RxJS(Reactive Extensions for Javascript)是一个基于可观测数据流(Observables)的库,用于处理异步事件和数据变化。它实现了观察者模式,使开发者能够以声明式的方式处理异步数据流。RxJS 广泛应用于 Angular、React 以及其他 Javascript 应用中,是现代前端开发中不可或缺的工具之一。

  RxJS 的核心思想是将一切视为数据流——用户输入、网络请求、定时器、DOM 事件等都可以看作是随时间推移产生的数据序列。通过一系列的操作符(Operators),我们可以转换、过滤、合并这些数据流,从而构建出复杂而强大的异步应用逻辑。

RxJS 的核心概念

  1. Observable(可观察对象): 代表一个可随时间推移发送值的概念
  2. Observer(观察者): 一个回调函数的集合,用来接收 Observable 的通知
  3. Subscription(订阅): 表示 Observable 的执行,主要用于取消执行
  4. Operators(操作符): 纯函数,用于转换 Observables
  5. Subject(主体): 既是 Observable 又是 Observer 的对象
  6. Schedulers(调度器): 控制何时执行订阅相关的计算

安装和基本使用

安装 RxJS

1
2
3
4
5
6
7
8
# 使用 npm
npm install rxjs

# 使用 yarn
yarn add rxjs

# 使用 pnpm
pnpm add rxjs

基本导入方式

1
2
3
4
5
// 导入特定功能 import { Observable, of, from } from 'rxjs';
import { map, filter, mergeMap } from 'rxjs/operators';

// 或者导入整个库(不推荐,会增加包体积)
import * as Rx from 'rxjs';

核心概念详解

Observable(可观察对象)

  Observable 是 RxJS 中最基础的概念,它代表一个可随时间推移发送值的集合。可以把它想象成一个随着时间推移产生值的函数。Observable 是惰性的,只有当被订阅时才会开始执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import { Observable } from 'rxjs';

// 创建一个简单的 Observable
const observable = new Observable(observer => {
// 发送数据 observer.next(1);
observer.next(2);
observer.next(3);

// 模拟异步操作 setTimeout(() => {
observer.next(4);
observer.complete(); // 标记完成
}, 1000);
});

// 订阅 Observable
const subscription = observable.subscribe({
next: value => console.log(value), // 接收数据 error: error => console.error(error), // 处理错误 complete: () => console.log('完成') // 完成通知
});

// 取消订阅
// subscription.unsubscribe();

创建 Observable 的常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { of, from, interval, timer, fromEvent } from 'rxjs';

// of: 创建发出指定值的 Observable
const of$ = of(1, 2, 3, 4, 5);
of$.subscribe(console.log); // 输出: 1, 2, 3, 4, 5

// from: 从数组、promise、iterable 等创建 Observable
const from$ = from([1, 2, 3, 4, 5]);
from$.subscribe(console.log); // 输出: 1, 2, 3, 4, 5

// interval: 创建定期发出递增数字的 Observable
const interval$ = interval(1000); // 每秒发送一个数字 const intervalSub = interval$.subscribe(console.log);

// timer: 延迟后发送值或定期发送值 const timer$ = timer(2000, 1000); // 2秒后开始,之后每秒发送 const timerSub = timer$.subscribe(console.log);

// fromEvent: 从 DOM 事件创建 Observable
const button = document.getElementById('myButton');
const click$ = fromEvent(button, 'click');
click$.subscribe(event => console.log('按钮被点击:', event));

Observer(观察者)

  Observer 是一个包含三个回调函数的对象:next、error 和 complete。这些回调函数定义了如何处理 Observable 发送的不同类型的通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { Observable } from 'rxjs';

const observer = {
next: (value) => console.log('收到值:', value),
error: (error) => console.error('发生错误:', error),
complete: () => console.log('完成!')
};

const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
});

observable.subscribe(observer);

Subscription(订阅)

  Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 最重要的功能是 unsubscribe,它释放掉所持有的资源。

1
2
3
4
5
6
7
8
9
import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe(value => console.log(value));

// 5秒后取消订阅 setTimeout(() => {
subscription.unsubscribe();
console.log('已取消订阅');
}, 5000);

Operators(操作符)

  Operators 是 RxJS 中最重要的概念之一。它们是纯函数,用于转换 Observables。操作符允许我们对数据流进行各种操作,如映射、过滤、合并等。

常用的 Creation Operators

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { of, from, interval, timer, range, iif } from 'rxjs';

// of: 从多个参数创建 Observable
const of$ = of(1, 2, 3);

// from: 从数组、promise 等创建 Observable
const from$ = from(Promise.resolve('resolved'));

// range: 创建一定范围内的数字序列 const range$ = range(1, 5); // 发出 1, 2, 3, 4, 5

// iif: 条件性地创建 Observable
const result$ = iif(
() => Math.random() > 0.5,
of('大于0.5'),
of('小于等于0.5')
);

常用的 Pipeable Operators

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import { of, from } from 'rxjs';
import {
map,
filter,
scan,
take,
takeUntil,
takeWhile,
skip,
skipWhile,
debounceTime,
distinctUntilChanged,
mergeMap,
switchMap,
concatMap,
catchError,
retry,
tap
} from 'rxjs/operators';

// map: 映射每个值 of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log); // 输出: 2, 4, 6

// filter: 过滤值 of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log); // 输出: 2, 4

// scan: 累积计算(类似 reduce,但每次都会输出)
of(1, 2, 3, 4).pipe(
scan((acc, curr) => acc + curr, 0)
).subscribe(console.log); // 输出: 1, 3, 6, 10

// take: 取前 n 个值 range(1, 10).pipe(
take(3)
).subscribe(console.log); // 输出: 1, 2, 3

// debounceTime: 防抖,延迟指定时间后发出最后一个值 fromEvent(input, 'input').pipe(
debounceTime(300),
map(event => event.target.value)
).subscribe(console.log);

// distinctUntilChanged: 过滤连续重复的值 of(1, 1, 2, 2, 3, 1, 4).pipe(
distinctUntilChanged()
).subscribe(console.log); // 输出: 1, 2, 3, 1, 4

高级组合操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import {
mergeMap,
switchMap,
concatMap,
exhaustMap,
combineLatest,
zip,
forkJoin
} from 'rxjs/operators';

// mergeMap: 扁平化内部 Observable,允许并发 of('a', 'b').pipe(
mergeMap(letter => interval(1000).pipe(
take(3),
map(num => `${letter}-${num}`)
))
).subscribe(console.log);
// 输出: a-0, b-0, a-1, b-1, a-2, b-2

// switchMap: 扁平化内部 Observable,但会取消前一个 fromEvent(input, 'input').pipe(
debounceTime(300),
switchMap(query => this.http.get(`/API/search?q=${query}`))
).subscribe(results => {
// 当新查询到来时,取消之前的请求 this.searchResults = results;
});

// combineLatest: 合并多个 Observable,任一发出时都会发出最新的值组合 combineLatest([
of(1, 2, 3),
of('a', 'b', 'c')
]).subscribe(console.log);
// 输出: [1, 'a'], [2, 'a'], [3, 'a'], [3, 'b'], [3, 'c']

// forkJoin: 等待所有 Observable 完成,然后发出最后一个值 forkJoin({
user: this.http.get('/API/user'),
posts: this.http.get('/API/posts')
}).subscribe(result => {
console.log('用户和帖子都加载完成:', result);
});

Subject(主题)

  Subject 是 RxJS 中的特殊类型,它既可以作为 Observable,也可以作为 Observer。这意味着 Subject 可以发出值,同时也可以订阅其他 Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// Subject: 基础 Subject,只能发出给定时刻之后的值 const subject = new Subject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(3); // 只有 observerA 会收到
// 输出: observerA: 1, observerA: 2, observerA: 3, observerB: 3

// BehaviorSubject: 需要初始值,新订阅者会立即收到最近的值 const behaviorSubject = new BehaviorSubject(0);
behaviorSubject.subscribe(val => console.log('A:', val)); // A: 0

behaviorSubject.next(1); // A: 1
behaviorSubject.next(2); // A: 2

behaviorSubject.subscribe(val => console.log('B:', val)); // B: 2 (立即收到最新值)

// ReplaySubject: 可以缓存一定数量的值,新订阅者会收到缓存的值 const replaySubject = new ReplaySubject(3); // 缓存最后3个值 replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.next(4);

replaySubject.subscribe(val => console.log('C:', val)); // C: 2, C: 3, C: 4

实际应用场景

搜索功能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import { Component, OnInit, OnDestroy } from '@Angular/core';
import { FormControl } from '@Angular/forms';
import { Subject } from 'rxjs';
import {
debounceTime,
distinctUntilChanged,
switchMap,
catchError,
takeUntil
} from 'rxjs/operators';

@Component({
selector: 'app-search',
template: `
<input [formControl]="searchControl" placeholder="搜索...">
<div *ngIf="loading">搜索中...</div>
<div *ngFor="let result of results">
{{ result.name }}
</div>
`
})
export class SearchComponent implements OnInit, OnDestroy {
searchControl = new FormControl();
results: any[] = [];
loading = false;
private destroy$ = new Subject<void>();

ngOnInit() {
this.searchControl.valueChanges.pipe(
debounceTime(300), // 防抖300ms
distinctUntilChanged(), // 只有值真正改变时才发出 switchMap(query => {
if (!query) {
return []; // 如果查询为空,返回空数组
}

this.loading = true;
return this.searchService.search(query).pipe(
catchError(error => {
console.error('搜索失败:', error);
return [];
})
);
}),
takeUntil(this.destroy$) // 组件销毁时取消订阅
).subscribe(results => {
this.results = results;
this.loading = false;
});
}

ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}

表单验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import { FormBuilder, FormGroup, Validators } from '@Angular/forms';
import { combineLatest, of } from 'rxjs';
import {
map,
startWith,
debounceTime,
distinctUntilChanged
} from 'rxjs/operators';

@Component({
selector: 'app-user-form',
template: `
<form [formGroup]="userForm">
<input formControlName="email" placeholder="邮箱">
<div *ngIf="emailValid$ | async" class="valid">✓ 邮箱有效</div>
<div *ngIf="(emailValid$ | async) === false" class="invalid">✗ 邮箱无效</div>

<input formControlName="password" type="password" placeholder="密码">
<div *ngIf="passwordStrong$ | async" class="valid">✓ 密码强度高</div>
</form>
`
})
export class UserFormComponent {
userForm: FormGroup;

constructor(private fb: FormBuilder) {
this.userForm = this.fb.group({
email: ['', [Validators.required, Validators.email]],
password: ['', [Validators.required, Validators.minLength(8)]]
});
}

ngOnInit() {
// 邮箱验证状态流 this.emailValid$ = this.userForm.get('email').valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
map(email => this.validateEmail(email)),
startWith(this.validateEmail(this.userForm.get('email').value))
);

// 密码强度流 this.passwordStrong$ = this.userForm.get('password').valueChanges.pipe(
map(password => this.checkPasswordStrength(password)),
startWith(this.checkPasswordStrength(this.userForm.get('password').value))
);

// 表单整体有效性 this.formValid$ = combineLatest([
this.emailValid$,
this.passwordStrong$
]).pipe(
map(([emailValid, passwordStrong]) => emailValid && passwordStrong)
);
}

private validateEmail(email: string): boolean {
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return emailRegex.test(email);
}

private checkPasswordStrength(password: string): boolean {
// 简单的密码强度检查 return password.length >= 8 && /[A-Z]/.test(password) && /[0-9]/.test(password);
}
}

WebSocket 连接管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import { Injectable } from '@Angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
BehaviorSubject,
Observable,
merge,
interval
} from 'rxjs';
import {
map,
filter,
tap,
retryWhen,
delayWhen,
scan
} from 'rxjs/operators';

@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private wsSubject$: WebSocketSubject<any>;
private connectionStatus$ = new BehaviorSubject<boolean>(false);
private reconnectInterval = 3000;
private maxRetries = 5;

connect(url: string): Observable<any> {
this.wsSubject$ = webSocket({
url: url,
openObserver: {
next: () => {
console.log('WebSocket 连接已建立');
this.connectionStatus$.next(true);
}
},
closeObserver: {
next: () => {
console.log('WebSocket 连接已关闭');
this.connectionStatus$.next(false);
}
}
});

// 连接心跳检测 const heartbeat$ = interval(30000).pipe(
filter(() => this.isConnected()),
tap(() => this.sendHeartbeat())
);

// 合并消息流和心跳流 return merge(
this.wsSubject$.pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= this.maxRetries) {
throw error;
}
return retryCount + 1;
}, 0),
delayWhen(() => interval(this.reconnectInterval))
)
)
),
heartbeat$
);
}

sendMessage(message: any) {
if (this.wsSubject$ && this.isConnected()) {
this.wsSubject$.next(message);
}
}

private sendHeartbeat() {
this.sendMessage({ type: 'heartbeat', timestamp: Date.now() });
}

private isConnected(): boolean {
return this.connectionStatus$.value;
}

disconnect() {
if (this.wsSubject$) {
this.wsSubject$.complete();
}
}
}

请求合并和缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import { Injectable } from '@Angular/core';
import { HttpClient } from '@Angular/common/http';
import { Observable, of, BehaviorSubject } from 'rxjs';
import {
shareReplay,
finalize,
map,
mergeMap,
multicast
} from 'rxjs/operators';

@Injectable({
providedIn: 'root'
})
export class DataService {
private loading$ = new BehaviorSubject<boolean>(false);
private cache = new Map<string, Observable<any>>();

constructor(private http: HttpClient) {}

// 带缓存的数据请求 getDataWithCache<T>(url: string, ttl = 5 * 60 * 1000): Observable<T> {
const cached = this.cache.get(url);

if (cached) {
console.log('从缓存获取数据:', url);
return cached;
}

console.log('发起新请求:', url);

const fresh$ = this.http.get<T>(url).pipe(
tap(() => this.loading$.next(true)),
finalize(() => this.loading$.next(false)),
shareReplay({ bufferSize: 1, refCount: true }) // 共享订阅,缓存最新值
);

this.cache.set(url, fresh$);

// TTL 缓存失效 setTimeout(() => {
this.cache.delete(url);
}, ttl);

return fresh$;
}

// 批量请求合并 batchRequest(urls: string[]): Observable<any[]> {
return of(urls).pipe(
mergeMap(urlList =>
Promise.all(
urlList.map(url => this.getDataWithCache(url).toPromise())
)
)
);
}

isLoading$(): Observable<boolean> {
return this.loading$.asObservable();
}
}

性能优化技巧

1. 使用 shareReplay 避免重复请求

1
2
3
4
5
6
7
8
9
10
import { shareReplay, map } from 'rxjs/operators';

// 不好的做法 - 每次订阅都会发起请求 const badApiCall$ = this.http.get('/API/users');

// 好的做法 - 共享订阅结果 const goodApiCall$ = this.http.get('/API/users').pipe(
shareReplay({ bufferSize: 1, refCount: true })
);

// 多个组件同时订阅,只会发起一个 HTTP 请求 goodApiCall$.subscribe(users => console.log('组件1:', users));
goodApiCall$.subscribe(users => console.log('组件2:', users));

2. 使用 takeUntil 管理订阅生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { takeUntil } from 'rxjs/operators';
import { Subject } from 'rxjs';

export class MyComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();

ngOnInit() {
// 正确的订阅方式 this.dataService.someDataStream$.pipe(
takeUntil(this.destroy$) // 组件销毁时自动取消订阅
).subscribe(data => {
// 处理数据
});
}

ngOnDestroy() {
this.destroy$.next(); // 触发取消订阅 this.destroy$.complete();
}
}

3. 合理使用不同的 Subject 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import { BehaviorSubject, ReplaySubject } from 'rxjs';

// 对于需要立即获取当前状态的场景,使用 BehaviorSubject
class AuthService {
private currentUser$ = new BehaviorSubject<User | null>(null);

getCurrentUser(): Observable<User | null> {
return this.currentUser$.asObservable(); // 新订阅者立即收到当前用户
}

setCurrentUser(user: User | null) {
this.currentUser$.next(user);
}
}

// 对于需要获取历史数据的场景,使用 ReplaySubject
class EventLogService {
private events$ = new ReplaySubject<Event[]>(10); // 缓存最后10个事件 logEvent(event: Event) {
this.events$.next(event);
}

getRecentEvents(): Observable<Event[]> {
return this.events$.asObservable();
}
}

最佳实践

1. 错误处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import { catchError, retry, of } from 'rxjs/operators';

// 简单的错误处理 apiCall$.pipe(
catchError(error => {
console.error('API 调用失败:', error);
return of([]); // 返回默认值
})
).subscribe();

// 带重试的错误处理 apiCall$.pipe(
retry(3), // 重试3次 catchError(error => {
console.error('API 调用最终失败:', error);
return of(null);
})
).subscribe();

// 更复杂的错误处理 apiCall$.pipe(
retry(2),
catchError(error => {
if (error.status === 401) {
// 401错误,重定向到登录页 this.router.navigate(['/login']);
return EMPTY;
} else if (error.status === 500) {
// 500错误,返回默认数据 return of(defaultData);
} else {
// 其他错误,抛出异常 throw error;
}
})
).subscribe();

2. 内存泄漏防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { Component, OnInit, OnDestroy } from '@Angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
selector: 'app-example',
template: `<div>示例组件</div>`
})
export class ExampleComponent implements OnInit, OnDestroy {
private destroy$ = new Subject<void>();

ngOnInit() {
// 正确使用 takeUntil
this.observable$.pipe(
takeUntil(this.destroy$)
).subscribe(value => {
// 处理逻辑
});

// 正确处理 interval 等持续发射的 Observable
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(count => {
console.log(count);
});
}

ngOnDestroy() {
this.destroy$.next(); // 触发取消订阅 this.destroy$.complete(); // 完成 Subject
}
}

3. 与现代框架的集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// React 中使用 RxJS (使用 rxjs-React-hooks)
import { useObservable } from 'rxjs-React-hooks';
import { useState, useEffect } from 'React';

function SearchComponent() {
const [query, setQuery] = useState('');

const results = useObservable(() =>
from(query).pipe(
debounceTime(300),
switchMap(q =>
q ? fetch(`/API/search?q=${q}`).then(r => r.Json()) : of([])
),
startWith([])
),
[query]
);

return (
<div>
<input
value={query}
onChange={e => setQuery(e.target.value)}
placeholder="搜索..."
/>
<ul>
{results.map(item => <li key={item.id}>{item.name}</li>)}
</ul>
</div>
);
}

常见陷阱和解决方案

1. 避免多次订阅相同 Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 错误做法 - 每次渲染都会创建新订阅 function BadComponent() {
useEffect(() => {
const subscription = someObservable$.subscribe(setData);
return () => subscription.unsubscribe();
}, [someObservable$]); // 依赖项变化会重新订阅

// 正确做法 - 使用 shareReplay 确保单一订阅 const sharedData$ = useMemo(() =>
someObservable$.pipe(shareReplay(1)),
[]
);

function GoodComponent() {
useEffect(() => {
const subscription = sharedData$.subscribe(setData);
return () => subscription.unsubscribe();
}, [sharedData$]); // 依赖项不会变化
}

2. 正确处理异步数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 复杂的数据流组合 combineLatest([
userId$,
filterParams$,
sortBy$
]).pipe(
debounceTime(100),
switchMap(([userId, filters, sortBy]) =>
forkJoin({
user: this.userService.getUser(userId),
posts: this.postService.getPosts({ userId, filters, sortBy }),
stats: this.statsService.getUserStats(userId)
})
),
catchError(error => {
console.error('加载用户数据失败:', error);
return of({ user: null, posts: [], stats: null });
})
).subscribe(({ user, posts, stats }) => {
this.userData = { user, posts, stats };
});

总结

  • RxJS 提供了强大的响应式编程能力,特别适合处理异步数据流
  • Observable、Observer、Subscription 是核心概念,需要深入理解
  • Operators 是数据转换的关键,合理使用能大幅提升代码可读性
  • Subject 在需要双向数据流时很有用,但要注意选择合适的类型
  • 错误处理和内存泄漏防护是使用 RxJS 时必须考虑的问题
  • 与现代框架的集成需要注意性能和生命周期管理

学习 RxJS 的过程有点像学开车——刚开始觉得很复杂,但一旦掌握了就再也回不去了。现在写异步逻辑时,第一反应就是想想能不能用 RxJS 来简化,感觉整个编程思维都被重塑了。

扩展阅读

  • RxJS 官方文档
  • ReactiveX.io
  • Learn RxJS
  • Angular 官方文档 - RxJS
  • RxJS 最佳实践指南

参考资料

  • RxJS GitHub Repository: https://github.com/ReactiveX/rxjs
  • Reactive Extensions Specification: https://github.com/ReactiveX/RxJava/wiki
  • Angular Documentation: https://Angular.io/
  • Javascript Promises: https://developer.mozilla.org/en-US/docs/Web/Javascript/Reference/Global_Objects/Promise
bulb