Rxjs - 常用操作符
2021-04-30 02:29
标签:obs with 转换 rgba 迭代器 selector git ott duration 本文总结Rxjs中的常用Operators pipe()的参数可以放任意的operators, operator在pipe中依次执行 map 对源 observable 的每个值应用投射函数。 mapTo将每个发出值映射成常量。 如果想基于某个逻辑或另一个 observable 来取任意数量的值,你可以 takeUntil 或 takeWhile! 按顺序发出任意数量的值 from: 将数组、promise 或迭代器转换成 observable fromEvent: 从event生成observable fromPromise:从promise生成observable 透明地执行操作或副作用,比如打印日志。 delay: 根据给定时间延迟发出值。 throttle:以某个时间间隔为阈值,在 debounce:根据一个选择器函数,舍弃掉在两次输出之间小于指定时间的发出值。 这两个Operators可用于节流 observable 按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值。 将多个 observables 转换成单个 observable 。 concatMap: 组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。 mergeMap: 将每个源值投射成 Observable ,该 Observable 会合并到输出 Observable 中。 注意 switchMap: 将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中, 并且只发出最新投射的 Observable 中的值。 exhaustMap: 映射成内部 observable,忽略其他值直到该 observable 完成 Rxjs - 常用操作符 标签:obs with 转换 rgba 迭代器 selector git ott duration 原文地址:https://www.cnblogs.com/Asp1rant/p/14714776.htmlPipe
map, mapTo
1 import { from } from ‘rxjs‘;
2 import { map, mapTo } from ‘rxjs/operators‘;
3
4 // 发出 (1,2,3,4,5)
5 const source = from([1, 2, 3, 4, 5]);
6 // 每个数字加10
7 const example = source.pipe(map(val => val + 10));
8 // 输出: 11,12,13,14,15
9 const subscribe = example.subscribe(val => console.log(val));
10
11 // 发出每个页面点击
12 const source = fromEvent(document, ‘click‘);
13 // 将所有发出值映射成同一个值
14 const example = source.pipe(mapTo(‘GOODBYE WORLD!‘));
15 // 输出: (click)‘GOODBYE WORLD!‘...
16 const subscribe = example.subscribe(val => console.log(val));
take, skip
take
与 skip
是相反的,它接收起始的N个值,而 skip
会跳过起始的N个值。1 import { interval } from ‘rxjs‘;
2 import { take } from ‘rxjs/operators‘;
3
4 // 每1秒发出值
5 const interval$ = interval(1000);
6 // 取前5个发出的值
7 const example = interval$.pipe(take(5));
8 // 输出: 0,1,2,3,4
9 const subscribe = example.subscribe(val => console.log(val));
1 import { skip } from ‘rxjs/operators‘;
2
3 // 每1秒发出值
4 const source = interval(1000);
5 // 跳过前5个发出值
6 const example = source.pipe(skip(5));
7 // 输出: 5...6...7...8........
8 const subscribe = example.subscribe(val => console.log(val));
of
1 import { of } from ‘rxjs‘;
2 // 依次发出提供的任意数量的值
3 const source = of(1, 2, 3, 4, 5);
4 // 输出: 1,2,3,4,5
5 const subscribe = source.subscribe(val => console.log(val));
from, fromEvent, fromPromise
示例 1: 数组转换而来的 observable
// RxJS v6+
import { from } from ‘rxjs‘;
// 将数组作为值的序列发出
const arraySource = from([1, 2, 3, 4, 5]);
// 输出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
示例 2: promise 转换而来的 observable
// RxJS v6+
import { from } from ‘rxjs‘;
// 发出 promise 的结果
const promiseSource = from(new Promise(resolve => resolve(‘Hello World!‘)));
// 输出: ‘Hello World‘
const subscribe = promiseSource.subscribe(val => console.log(val));
示例 3: 集合转换而来的 observable
// RxJS v6+
import { from } from ‘rxjs‘;
// 使用 js 的集合
const map = new Map();
map.set(1, ‘Hi‘);
map.set(2, ‘Bye‘);
const mapSource = from(map);
// 输出: [1, ‘Hi‘], [2, ‘Bye‘]
const subscribe = mapSource.subscribe(val => console.log(val));
示例 4: 字符串转换而来的 observable
// RxJS v6+
import { from } from ‘rxjs‘;
// 将字符串作为字符序列发出
const source = from(‘Hello World‘);
// 输出: ‘H‘,‘e‘,‘l‘,‘l‘,‘o‘,‘ ‘,‘W‘,‘o‘,‘r‘,‘l‘,‘d‘
const subscribe = source.subscribe(val => console.log(val));
tap
1 import { of } from ‘rxjs‘;
2 import { tap, map } from ‘rxjs/operators‘;
3
4 const source = of(1, 2, 3, 4, 5);
5 // 使用 tap 透明地打印 source 中的值
6 const example = source.pipe(
7 tap(val => console.log(`BEFORE MAP: ${val}`)),
8 map(val => val + 10),
9 tap(val => console.log(`AFTER MAP: ${val}`))
10 );
11
12 // ‘tap‘ 并不转换值
13 // 输出: 11...12...13...14...15
14 const subscribe = example.subscribe(val => console.log(val));
delay
1 import { of, merge } from ‘rxjs‘;
2 import { mapTo, delay } from ‘rxjs/operators‘;
3
4 // 发出一项
5 const example = of(null);
6 // 每延迟一次输出便增加1秒延迟时间
7 const message = merge(
8 example.pipe(mapTo(‘Hello‘)),
9 example.pipe(
10 mapTo(‘World!‘),
11 delay(1000)
12 ),
13 example.pipe(
14 mapTo(‘Goodbye‘),
15 delay(2000)
16 ),
17 example.pipe(
18 mapTo(‘World!‘),
19 delay(3000)
20 )
21 );
22 // 输出: ‘Hello‘...‘World!‘...‘Goodbye‘...‘World!‘
23 const subscribe = message.subscribe(val => console.log(val));
throttle, debounce
durationSelector
完成前将抑制新值的发出 1 import { interval } from ‘rxjs‘;
2 import { throttle, debounce } from ‘rxjs/operators‘;
3
4 // 每1秒发出值
5 const source = interval(1000);
6 // 节流2秒后才发出最新值
7 const example = source.pipe(throttle(val => interval(2000)));
8 // 输出: 0...3...6...9
9 const subscribe = example.subscribe(val => console.log(val));
10
11
12 // 发出四个字符串
13 const example 2= of(‘WAIT‘, ‘ONE‘, ‘SECOND‘, ‘Last will display‘);
14 // 只有在最后一次发送后再经过一秒钟,才会发出值,并抛弃在此之前的所有其他值
15 const debouncedExample = example2.pipe(debounce(() => timer(1000)));
16 // 在这个示例中,所有的值都将被忽略,除了最后一个
17 // 输出: ‘Last will display‘
18 const subscribe = debouncedExample.subscribe(val => console.log(val));
concat, concatMap
1 import { delay, concat } from ‘rxjs/operators‘;
2 import { of } from ‘rxjs‘;
3
4 // 发出 1,2,3
5 const sourceOne = of(1, 2, 3);
6 // 发出 4,5,6
7 const sourceTwo = of(4, 5, 6);
8
9 // 延迟3秒,然后发出
10 const sourceThree = sourceOne.pipe(delay(3000));
11 // sourceTwo 要等待 sourceOne 完成才能订阅
12 const example = sourceThree.pipe(concat(sourceTwo));
13 // 输出: 1,2,3,4,5,6
14 const subscribe = example.subscribe(val =>
15 console.log(‘Example: Delayed source one:‘, val)
16 );
merge, mergeMap
1 import { mapTo } from ‘rxjs/operators‘;
2 import { interval, merge } from ‘rxjs‘;
3
4 // 每2.5秒发出值
5 const first = interval(2500);
6 // 每2秒发出值
7 const second = interval(2000);
8 // 每1.5秒发出值
9 const third = interval(1500);
10 // 每1秒发出值
11 const fourth = interval(1000);
12
13 // 从一个 observable 中发出输出值
14 const example = merge(
15 first.pipe(mapTo(‘FIRST!‘)),
16 second.pipe(mapTo(‘SECOND!‘)),
17 third.pipe(mapTo(‘THIRD‘)),
18 fourth.pipe(mapTo(‘FOURTH‘))
19 );
20 // 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
21 const subscribe = example.subscribe(val => console.log(val));
concatMap, mergeMap, switchMap, exhaustMap
concatMap
和 mergeMap
之间的区别。 因为 concatMap
之前前一个内部 observable 完成后才会订阅下一个, source 中延迟 2000ms 值会先发出。 对比的话, mergeMap
会立即订阅所有内部 observables, 延迟少的 observable (1000ms) 会先发出值,然后才是 2000ms 的 observable 。 1 import { of } from ‘rxjs‘;
2 import { concatMap, delay, mergeMap } from ‘rxjs/operators‘;
3
4 // 发出延迟值
5 const source = of(2000, 1000);
6 // 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
7 const example = source.pipe(
8 concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
9 );
10 // 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
11 const subscribe = example.subscribe(val =>
12 console.log(`With concatMap: ${val}`)
13 );
14
15 // 展示 concatMap 和 mergeMap 之间的区别
16 const mergeMapExample = source
17 .pipe(
18 // 只是为了确保 meregeMap 的日志晚于 concatMap 示例
19 delay(5000),
20 mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
21 )
22 .subscribe(val => console.log(`With mergeMap: ${val}`));
1 import { interval } from ‘rxjs‘;
2 import { exhaustMap, tap, take } from ‘rxjs/operators‘;
3
4 const firstInterval = interval(1000).pipe(take(10));
5 const secondInterval = interval(1000).pipe(take(2));
6
7 const exhaustSub = firstInterval
8 .pipe(
9 exhaustMap(f => {
10 console.log(`Emission Corrected of first interval: ${f}`);
11 return secondInterval;
12 })
13 )
14 /*
15 当我们订阅第一个 interval 时,它开始发出值(从0开始)。
16 这个值会映射成第二个 interval,然后它开始发出值(从0开始)。
17 当第二个 interval 出于激活状态时,第一个 interval 的值会被忽略。
18 我们可以看到 firstInterval 发出的数字为3,6,等等...
19
20 输出:
21 Emission of first interval: 0
22 0
23 1
24 Emission of first interval: 3
25 0
26 1
27 Emission of first interval: 6
28 0
29 1
30 Emission of first interval: 9
31 0
32 1
33 */
34 .subscribe(s => console.log(s));