Skip to content
Linbudu's Blog

【draft】RxJS学习记录

4 min read

前言

整理进度

  • 创建
  • 转换
  • 过滤
  • 组合
  • 多播(multicast)
  • 错误处理
  • 工具
  • 条件/布尔
  • 数学/聚合
  • Subject
  • Scheduler
  • 太狼老师的教程

不愧是"海量 API", 我人看傻了, 一边学一边记录常用的好了.

会先把大致的作用都过一遍再回过头来补充代码示例, 这样可以互相比较嘛.

创建操作符

  • of 接收任意参数并转换为 ob

    • scheduler 已被弃用,应当将 of 替换为 scheduled 操作符
  • ajax 基于 Ajax 请求创建一个 ob (导出自 rxjs/ajax)

    • 使用方式:

      1import { ajax } from "rxjs/ajax";
      2import { map, catchError } from "rxjs/operators";
      3import { of } from "rxjs";
      4
      5const obs$ = ajax(`https://api.github.com/users?per_page=5`).pipe(
      6 map((userResponse) => console.log("users: ", userResponse)),
      7 catchError((error) => {
      8 console.log("error: ", error);
      9 return of(error);
      10 })
      11);
    • 类似于 axios,ajax 方法也可以接受一个由 url、method、headers、body 等属性组成的对象

  • from 接收一组参数并转为一组 ob

    • scheduler 已被弃用,应当将 of 替换为 scheduled 操作符

    • 可被转化为 Observable 的类型一览

      1export type ObservableInput<T> =
      2 | Observable<T>
      3 | InteropObservable<T>
      4 | AsyncIterable<T>
      5 | PromiseLike<T>
      6 | ArrayLike<T>
      7 | Iterable<T>
      8 | ReadableStreamLike<T>;
  • fromEvent 从 DOM 事件产生 ob

  • fromEventPattern 使用 addHandler 与 removeHanler 创建 ob

  • defer 惰性创建,在被订阅时才会调用工厂函数创建一个 Ob

    • 相关:
      • 条件操作符 iif:(condition,trueRes=EMPTY,fasleRes=EMPTY),根据条件动态的判断订阅哪个 Ob,常和 mergeMap 一起使用(打平内部 Ob)
  • generate 基于条件与迭代器不断产生值, (initial, continueConditionFunc, iteratorFunc, resultSelector), 在条件为 true 时不断进行迭代

  • empty 抛出一个直接 complete 的 ob,不返回值,应使用 EMPTY 替代。

    • 相当于它直接调用subscript.complete()
    • 相关:
      • isEmpty,如果输入的 Observable 产生了值,那么它就会返回 false。
      • defaultEmpty,接受一个默认值,如果输入 Observable 没有产生值,则使用这一操作符的默认值。
  • throwError 抛出一个直接 error 的 ob,接受一个工厂函数,用于创建错误实例。

    • 相当于它直接调用subscript.error()
    • 相关:
      • throwIfEmpty:如果源 Ob 没有产生值,就会产生一个错误。如果没有指定工厂函数,则使用 EmptyError。
      • catchError:通过返回一个新的 ob 或者抛出错误来 catch 管道中的错误。
  • never 不会产生值也不会 complete 的 ob。已废弃,使用 NEVER 替代。

  • interval(period)定时器流, 从 0 开始产生值。

    • 相关:
      • timeInterval:发出一个对象,包含当前值以及上一个值到当前值的时间间隔
  • timer (initialDelay,period)可延时的 interval,从 0 开始产生值。

  • range (start, count)产生范围 ob。

  • repeat (count = Infinity)重复源 ob 产生的流。

转换操作符

  • buffer 当 closingNotifier 有输出时才会输出缓冲区数据,以数组的形式

    • 相关:
      • bufferCount (size,startBufferEvery) 使用固定长度的缓冲区,而非 closingNotifier。第二个参数 startBufferEvery 指定何时开始缓存下一个 buffer,如指定为 1,则结果如下(size 为 5):
        • 1s [0] -> 2s [0,1][1] -> 3s [0,1,2][1,2] [2] ...
      • bufferTime (bufferTimeSpan,bufferCreationInterval)指定缓冲时间,每隔 span 秒发送一次缓冲区,每隔 interval 开启一个缓冲区
      • bufferToggle (opening:Ob, closingSelector: Func)使用 opening+closing 控制,opening Ob 有值产生时,开启一个新的缓冲区,并在 closingSelector 返回 Ob 有值产生时,输出缓冲区内容。
      • bufferWhen (closingSelector: Func) 当 closing 返回的 ob 发出数据时,关闭当前缓冲区, 并立刻准备好下一个
  • concatMap (project,resultSelector),相当于在源 ob 的每个值后附加一连串串行 ob(处理完一串才会到下一串, 就像并发为 1 的 mergeMap)。

    • 用于产生高阶 Ob(concat、concatAll 是组合操作符,一个接收 Ob,一个无参数,concatAll 在管道中的上一级通常会 map 回来新的 Ob 或是,然后 concatAll 会依次订阅这些 Ob)

    • 实例

      1const source = of(2000, 1000);
      2// map value from source into inner observable, when complete emit result and move to next
      3const example = source.pipe(
      4 concatMap((val) => of(`Delayed by: ${val}ms`).pipe(delay(val)))
      5);
      6
      7//output: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
      8const subscribe = example.subscribe((val) =>
      9 console.log(`With concatMap: ${val}`)
      10);
    • concatMapTo 接收 innerOb 与 resultSelector,源值会被直接替换 并且最后会展平为单个 ob 来输出

  • map mapTo 映射处理

  • merge 合并多个 ob 源(可控制并发)

    • mergeMap 对每个值进行一次映射, 并且会使用 mergeAll 展平, 最后只会输出 1 个
    • mergeMapTo
    • mergeAll 将 ob 内的 ob 拿出来展平, 可控制并发
  • scan 对每个源值应用累加器, 返回中间 ob 与最终 ob

    • mergeScan 将累加器的中间与返回 ob 合并到输出 ob
  • switchMap 接收 project 与 resultSelector 将源值映射后合并到最终 ob 最终只会发出最新投射的值

    • 能够取消副作用
    • 切换到一个全新的 Ob
    • switchMapTo
    • switch 通过只发出 ob 内部的最新 ob 来展平一个高阶 ob
  • groupBy 基于 keySelector/eleSelector 进行分组

  • pairwise 从第 2 项开始成对的发送当前值与前一个值 [1,2]

  • partition 将源一分为二: 满足与不满足的

  • pluck 将源值映射为源值的键值

  • window 在内部 ob 发出项时, 将源 ob 的值分割为嵌套的 ob

    • 就像 buffer, 但这里是将多个值组合成嵌套的 ob
    • windowCount 每个窗口值有上限版本
    • windowToggle 以 opening 作为窗口开始, 以 closing 作为结束
    • windowWhen 每当 closing 有值产生时发出当前的窗口并开始下一个

过滤操作符

  • delay 为源的所有值进行延时
  • distinct 返回与之前源 ob 发出的值都不同的值(keySelector + Set)
    • distinctUntilChanged 返回所有与前一项不同的值
    • distinctUntilKeyChanged 可接收 keySelector 版本的 distinctUntilChanged
  • elementAt 发出发送序列的第 N 个值
  • filter
  • first 返回第一个值(或是第一个通过筛选的值)
    • last
  • ignoreElements 忽略所有项, 直接调用源的 complete/error
  • single 返回单个匹配项
  • skip 跳过前 N 个值
    • skipLast
    • skipUntil 跳过直到 notifierOb 开始发送值
  • take
    • takeLast
    • takeUntil 持续发送直到 notifierOb 开始发送值
    • takeWhile 发出满足条件的每个值, 并在出现不满足的值时立即完成
  • debounce 由 durationOb 决定的一段时间内都没有一个新的源值发送, 才会发出下一个值
    • debounceTime 源 ob 发送一个值后, 在指定时间内都没有下一个值, 才会发出当前的这个值
  • throttle 发出一个值 沉默 直到第二个 ob 发送(或者完成) 继续下一个值 重复
    • throttleTime 发出一个值后会沉默指定时间 在此过程中源 ob 的输出都将被忽略 指定时间结束后才能够发送下一个值
  • audit debounce 返回沉默期间的第一个值, audit 返回最后一个, durationOb 持续的时间内会持续忽略源值. 最开始时 durationOb 是禁用的, 第一个源值到达, 启用 durationOb, 持续忽略接下来的源值, durationOb 到期禁用, 返回这段时间的最后一个值
    • auditTime
  • sample 同步版本的 audit/debounce? durationOb 发出时发送最新的源值
    • sampleTime

组合操作符

  • combineLatest 接收多个 ob 在任意一个输入 ob 产生值时发出源 ob 与所有输入 ob 最新值(最新值是相对于单个 Ob 来说的)的组合

    • 会在每个 ob 都至少发出一个值时才输出第一个值(与 withLatestFrom 一致)
    • 如果每个 ob 都只发送一个值, 或者计算需要的只是每个 ob 的最后一个值, 使用 forkJoin 更好
  • combineAll 接收一个高阶 ob 收集所有内部 ob 在最外部 ob 完成时订阅所有已经收集的 ob 并通过 combineLatest 打平。

    • 具体地说,在源 Ob 完成后,对内部 ob 应用 combineLatest

    • 实例:

      1import { take, map, combineAll } from "rxjs/operators";
      2import { interval } from "rxjs";
      3
      4const source$ = interval(1000).pipe(take(2));
      5
      6const example$ = source$.pipe(
      7 map((val) =>
      8 interval(1000).pipe(
      9 map((i) => `Result (${val}): ${i}`),
      10 take(5)
      11 )
      12 )
      13);
      14
      15// source$2 比 source$1晚产生1s
      16example$
      17 .pipe(combineAll())
      18 /*
      19 output:
      20 ["Result (0): 0", "Result (1): 0"]
      21 ["Result (0): 1", "Result (1): 0"]
      22 ["Result (0): 1", "Result (1): 1"]
      23 ["Result (0): 2", "Result (1): 1"]
      24 ["Result (0): 2", "Result (1): 2"]
      25 ["Result (0): 3", "Result (1): 2"]
      26 ["Result (0): 3", "Result (1): 3"]
      27 ["Result (0): 4", "Result (1): 3"]
      28 ["Result (0): 4", "Result (1): 4"]
      29*/
      30 .subscribe(console.log);
  • concat (...obs)顺序的连结多个 ob,但是在一个结束后才会开始下一个

    • concatAll,同样用于处理高阶 Ob。它的处理方式是对于返回的内部 Ob,一个个的进行订阅。就像组合版本的 combineAll(combineAll 是[a, 1], [b, 2]的"组合", 而 concatAll 是 a,b,1,2 这样的"连结")

      • 实例

        1import { take, concatAll } from "rxjs/operators";
        2import { interval, of } from "rxjs";
        3
        4// interval返回的还是Ob,这里之前是个误区,包括of等返回的都是Ob
        5const obs1 = interval(1000).pipe(take(5));
        6const obs2 = interval(500).pipe(take(2));
        7const obs3 = interval(2000).pipe(take(1));
        8//emit three observables
        9const source = of(obs1, obs2, obs3);
        10
        11const example = source.pipe(concatAll());
        12
        13/*
        14 output: 0,1,2,3,4,0,1,0
        15*/
        16const subscribe = example.subscribe((val) => console.log(val));
    • 如果对 ob 的执行顺序无要求, 可以使用 merge

  • merge 将多个 ob 组合到一个(不是 combineLatest 那种将多个产生值合并为一个再输出, 也不是 concat 那种一个完了再下一个), 就像是字面意思...的合并到一个 Ob。

    • 有点像 zip,但 zip 是等所有 Ob 都产生一次值后再以数组发出收集的值。
    • 如果对顺序有要求, 应当使用 concat
    • mergeAll (concurrent),接收一个产生 ob 的源 ob,通过同时发出高阶 ob 内部 ob 发出的值将高阶 ob 打平。同样会等源 ob 完成生产所有内部 ob 后,再订阅这些内部 ob?
  • exhaustAll 专一版本的 mergeAll 会在当前专注的内部 ob 未完成时丢弃掉其他 ob 发出的值

  • switchAll 花心版本的 exhaust 会丢弃掉当前专注的 ob 订阅新的有值发出的 ob, 常用于请求竞态

  • forkJoin(属于静态方法,不会被用在 pipe 中) 在接受的所有 ob 完成时输出每个 ob 最后的结果组成的值

    • 适用于只关心每个 ob 的最后一个值的情况
    • 如果有一个 ob 没有完成, 那么 forkJoin 永远不会产生值
    • 如果有 ob 失败了, 将失去其他 ob 的值, 此时应当使用 catchError 进行兜底
    • 如果需要正确的得到每个 ob 与值的相对应关系, 应该使用 zip
  • race 发出最新的值,在遇到错误时,不会做出响应(所以不会被 catchError catch 到)

  • startWith 在 pipe 中存在时, 会先发出其内部的 ob 再发出源 ob

    • endWith 在源 ob 完成时,发出一个值
    • 如果想要在计算完成时执行操作, 但不想要产生一个新值, 应该使用 finalize
  • withLatestFrom 在源 ob 发出值时使用此值和输入 ob 的最新值计算输出值

  • zip 组合多个 ob 最后得到一个 ob 值来自于输入的 ob 按顺序计算而来

多播

  • multicast 将一个 ob 在多个订阅者之间共享, 通常会先将订阅者添加到 Subject 上, 再由 Subject 监听多播 ob, 见下方 Subject 相关
  • share 就像自带 refCount 的 multicast
  • shareReplay 在 share 的基础上还能够缓存最后 N 个值
    • 通常在有副作用或者复杂计算时, 为了避免其在多个订阅者中都进行执行, 或者后续加入的订阅者需要能够访问先前的值.
  • publish 需要手动调用 connect 的 multicast/share, 就像是先注册到订阅者列表中, 手动决定何时开始发布值

错误处理

  • catchError 通过返回一个新的 ob 来捕获错误
  • retry(count) 返回源 ob 在发生错误时不断重新从头尝试直到最大重试次数的 ob
  • retryWhen((errors)=>ob) 在源 ob 发生 error 时 将 error 传递给 notifier 进行判断 当 notifier 进入 complete 或者 error 时 对源 ob 的订阅也将进入 complete 或者 error 如果 notifier 不断继续 则会从头开始订阅源 ob

工具操作符

  • tap(原先的 do) 在每次源 ob 发送值时 执行一次操作 不影响返回的 ob

  • delay 延迟源 ob 发送

    • delayWhen 由接收的 ob 决定延迟时间
  • materialize 将 ob 包装为 Notification 类型, 此对象是{kind:"", value:"", error:undefined, hasValue: true}的形式

    • dematerialize: 相反, 从 Notification 对象到 ob 代表着的发送(即 next complete error)
  • observerOn: 基于指定的调度器重新发出通知, 如Scheduler.animationFrame

    • subscribeOn: 基于指定的调度器订阅
  • timeInterval 发出包含当前值以及与前一次发出值间隔的时间(基于调度器的 now 方法获取每次的当前时间, 再计算时间差)

  • timestamp 在产生值时 为其附加一个时间戳

  • timeout 在指定时间内没有 ob 产生时抛出错误

    • timeoutWith 在指定时间内没有 ob 产生时订阅另一个源 ob
  • toArray 将源 ob 的所有值收集到数组中

  • finalize 在 ob 结束后执行操作

条件/布尔

  • defaultIfEmpty 为直到完成时也没有值产生的源 ob 指定一个默认值
  • every 判断源 ob 发出的每个值是否都满足指定条件, 如果是, 则返回 true
  • find 发出源 ob 中第一个满足条件的值
    • findIndex
  • isEmpty 在源 ob 为空的情况下发出一个发出 true 的 ob
  • sequenceEqual 依次比较两个源 ob 产生的每个值, 在完全相等时返回 true
  • iif 在订阅将被发起时才决定订阅哪一个 ob(三元表达式)

数学/聚合

  • count 在源 ob 完成时告知发送值的数量
  • max 通过比较函数找到源 ob 发出值中最大的一项
    • min
  • reduce 在源 ob 发出的值上应用累加器函数 并返回最终的累加值

调度器

Subject

  • Subject 是特殊的 ob, 它能够在多个观察者之间共享一个 ob(在正常情况下, 一个 ob 的多个观察者, 每一个都会重新执行这个 ob)

  • Subject 实际上也是 EventEmitter, 它将维护多个观察者的注册信息

  • multicast 操作符在底层实际上使用了 Subject

  • Subject 可以被订阅, 并且订阅者并不能区分自己订阅的是 ob 还是 sub, 但 subscribe 方法实际上不会直接调用 ob, 而是像 addEventListener 那样新增一个订阅者到注册信息中

  • Subject 可以通过 next error complete 方法来将值/状态传递给所有观察者们

    1const sub = new Subject();
    2
    3sub.subscribe((x) => console.log(`1-${x}`));
    4sub.subscribe((x) => console.log(`2-${x}`));
    5
    6sub.next("item1");
    7sub.next("item2");
    8sub.next("item3");
    9
    10sub.complete();
    11
    121 - item1;
    132 - item1;
    141 - item2;
    152 - item2;
    161 - item3;
    172 - item3;
  • Subject 同时还可以作为观察者, 也就是直接被传给 ob 的 subscribe 方法

    1const sourceOb = from([1, 2, 3]);
    2
    3const sub = new Subject();
    4
    5sub.subscribe((x) => console.log(`1-${x}`));
    6sub.subscribe((x) => console.log(`2-${x}`));
    7
    8sourceOb.subscribe(sub);
    9
    101 - 1;
    112 - 1;
    121 - 2;
    132 - 2;
    141 - 3;
    152 - 3;

    这里可以理解为, sub 上注册的订阅者现在将多播的订阅 sourceOb

    Subject 是将任意 ob 执行同时在多个订阅者之间共享的唯一方式

多播的 ob

多播的 ob 会将通知通过一个可能拥有多个订阅者的 subject 发送出去

multicast 的底层原理: 观察者订阅 subject, 由 subject 进行注册, 然后再由 subject 去订阅源 ob, 将源 ob 的值多播给多个观察者:

1const sourceOb = from([1, 2, 3]);
2
3const sub = new Subject();
4
5const multicasted = sourceOb.pipe(multicast(sub));
6
7// 实际上即是sub.subscribe
8multicasted.subscribe((x) => console.log(`1-${x}`));
9multicasted.subscribe((x) => console.log(`2-${x}`));
10
11sourceOb.subscribe(sub);

引用计数

假设要实现第一个观察者到达时自动连结, 最后一个观察者取消订阅时自动取消共享, 使用手动订阅需要自己一个个处理 subscribe 和 unsubscribe, 在这种情况下, 可以将 refCount 方法加入到管道中, 它会在第一个订阅者出现时让多播 ob 自动启动(共享), 在最后一个订阅者离开时取消共享.

1const source = interval(500);
2const subject = new Subject();
3const refCounted = source.pipe(multicast(subject), refCount());
4
5let subscription1: Subscription, subscription2: Subscription;
6
7console.log("observerA subscribed");
8// 会自动开始共享ob 因为第一个订阅者出现了
9subscription1 = refCounted.subscribe({
10 next: (v) => console.log(`observerA: ${v}`),
11});
12
13setTimeout(() => {
14 console.log("observerB subscribed");
15 subscription2 = refCounted.subscribe({
16 next: (v) => console.log(`observerB: ${v}`),
17 });
18}, 600);
19
20setTimeout(() => {
21 console.log("observerA unsubscribed");
22 subscription1.unsubscribe();
23}, 1200);
24
25// 取消共享 因为没有观察者了
26setTimeout(() => {
27 console.log("observerB unsubscribed");
28 subscription2.unsubscribe();
29}, 2000);

BehaviorSubject

新增了"当前值"的概念, 会保存被发送的最新值, 并且当新的观察者参与订阅时会立即接收到当前值.

1const subject = new BehaviorSubject(0); // 0 is the initial value
2
3subject.subscribe({
4 next: (v) => console.log(`observerA: ${v}`),
5});
6
7subject.next(1);
8subject.next(2);
9
10subject.subscribe({
11 next: (v) => console.log(`observerB: ${v}`),
12});
13
14subject.next(3);
15
16observerA: 0;
17observerA: 1;
18observerA: 2;
19observerB: 2;
20observerA: 3;
21observerB: 3;

第二个观察者接收到的首个值就是 2

ReplaySubject

类似前一个, 但它可以发送旧的值给订阅者(根据实例化时的缓冲长度决定)

1subject.subscribe({
2 next: (v) => console.log(`observerA: ${v}`),
3});
4
5subject.next(1);
6subject.next(2);
7subject.next(3);
8subject.next(4);
9
10subject.subscribe({
11 next: (v) => console.log(`observerB: ${v}`),
12});
13
14subject.next(5);
15
16// observerA: 1
17// observerA: 2
18// observerA: 3
19// observerA: 4
20// observerB: 2
21// observerB: 3
22// observerB: 4
23// observerA: 5
24// observerB: 5

B 加入时, 最新的 3 个值: 2 3 4 会被发送给 B

除了指定缓冲长度, 还可以指定缓存时间

AsyncSubject

只有当当前共享的 Ob 完成时, 才会将最后一个值发送给所有订阅者

只发送一个值, 类似 last 操作符

1const subject = new AsyncSubject();
2
3subject.subscribe({
4 next: (v) => console.log(`observerA: ${v}`),
5});
6
7subject.next(1);
8subject.next(2);
9subject.next(3);
10subject.next(4);
11
12subject.subscribe({
13 next: (v) => console.log(`observerB: ${v}`),
14});
15
16subject.next(5);
17subject.complete();
18
19// Logs:
20// observerA: 5
21// observerB: 5

太狼老师的教程

要实现的功能:

  • 输入字符, 敲击回车/点击 ADD, 创建一个 Todo Item 并清空输入框
  • 点击一个 TodoItem 来完成它
  • 移除 TodoItem

这篇文章使用的 RxJS 版本是 5.x, 也就是说在 6.x 使用需要把链式调用转换为.pipe调用

1import { Observable } from "rxjs";
2import { createTodoItem } from "./lib";
3
4const $input = <HTMLInputElement>document.querySelector(".todo-val");
5const $list = <HTMLUListElement>document.querySelector(".list-group");
6const $add = document.querySelector(".button-add");
7
8const enter$ = Observable.fromEvent<KeyboardEvent>($input, "keydown").filter(
9 // 监听enter键作为一个单独的流
10 (r) => r.keyCode === 13
11);
12
13// add键的点击同理
14const clickAdd$ = Observable.fromEvent<MouseEvent>($add, "click");
15
16// 将enter与add点击监听合并为一个流
17const input$ = enter$.merge(clickAdd$);
18
19const item$ = input$
20 // 提取输入值
21 .map(() => $input.value)
22 // 去空
23 .filter((r) => r !== "")
24 // 返回一个DOM片段
25 .map(createTodoItem)
26 // 使用do来执行副作用/DOM操作等
27 .do((ele: HTMLLIElement) => {
28 $list.appendChild(ele);
29 // 清空输入框
30 $input.value = "";
31 })
32 .publishReplay(1)
33 .refCount();
34
35const toggle$ = item$
36 // mergeMap 对每个值进行一次映射 并使用mergeAll展平最后的结果
37 .mergeMap(($todoItem) =>
38 // 将item映射到item的点击事件
39 Observable.fromEvent<MouseEvent>($todoItem, "click")
40 // 仅关心item被点击
41 .filter((e) => e.target === $todoItem)
42 // 再将每次点击映射回item
43 .mapTo($todoItem)
44 )
45 .do(($todoItem: HTMLElement) => {
46 // 处理样式(标识状态)
47 if ($todoItem.classList.contains("done")) {
48 $todoItem.classList.remove("done");
49 } else {
50 $todoItem.classList.add("done");
51 }
52 });
53
54const remove$ = item$
55 .mergeMap(($todoItem) => {
56 const $removeButton = $todoItem.querySelector(".button-remove");
57 return Observable.fromEvent($removeButton, "click").mapTo($todoItem);
58 })
59 .do(($todoItem: HTMLElement) => {
60 // 从 DOM 上移掉 todo item
61 const $parent = $todoItem.parentNode;
62 $parent.removeChild($todoItem);
63 });
64
65const app$ = toggle$.merge(remove$).do((r) => console.log(r));
66
67app$.subscribe();
68
69// lib
70export const createTodoItem = (val: string) => {
71 const result = <HTMLLIElement>document.createElement("LI");
72 result.classList.add("list-group-item");
73 const innerHTML = `
74 ${val}
75 <button type="button" class="btn btn-default button-remove" aria-label="right Align">
76 <span class="glyphicon glyphicon-remove" aria-hidden="true"></span>
77 </button>
78 `;
79 result.innerHTML = innerHTML;
80 return result;
81};

主要关注:

  • 由于 add 键点击与 enter 的效果相同, 所以可以将这两个流合并(merge)

  • 为每一个 item 进行一次映射, 然后展平内部的所有 ob >>> mergeMap

  • 将 item 映射到 item 的点击事件, 过滤 target 不为对应 item 的点击事件, 再将满足条件的点击事件映射回(mapTo)item, 然后在 do 中对这个 item 进行操作

  • 由于 ob 默认是单播的, 即会为每一个订阅者进行一次独特的执行. 这里假设先点击了 toggle, 执行完毕后点击 remove, remove 会重新去订阅, 但此时已经没有流会产生(因为没有输入事件了), 因此需要将此 ob 多播, 猜测 publishReplay 即为 shareReplay, 思路类似, refCount 则用于自动启用?

实现功能:

  • 回车/add 后发送请求, 在请求返回后清空输入框并基于结果生成 todo, 在请求返回前回车/add 时, 比对输入框当前值与上一个值是否相同, 仅在不想同时才会取消掉上次请求并发送新请求(竞态)
  • 点击 item 时发送请求, 间隔 300ms 内的点击只会发出一次
  • 每次输入字符会在停止 200ms 后发送一个请求, 搜索是否有匹配的 todo, 若有则高亮匹配项. 如果在上一次搜索返回前输入了新字符, 则取消掉上一个
1import { Observable, Subject } from "rxjs";
2import {
3 createTodoItem,
4 mockToggle,
5 mockHttpPost,
6 search,
7 HttpResponse,
8} from "./lib";
9
10const $input = <HTMLInputElement>document.querySelector(".todo-val");
11const $list = <HTMLUListElement>document.querySelector(".list-group");
12const $add = document.querySelector(".button-add");
13
14// 后面的 search$ 与 enter 应该时从同一个 Observable 中转换出来,这里将 input 事件的 Observable publish 成 muticast
15const type$ = Observable.fromEvent<KeyboardEvent>($input, "keydown")
16 .publish()
17 .refCount();
18
19const search$ = type$
20 .debounceTime(200)
21 .filter((evt) => evt.keyCode !== 13)
22 .map((result) => (<HTMLInputElement>result.target).value)
23 .switchMap(search)
24 .do((result: HttpResponse | null) => {
25 const actived = document.querySelectorAll(".active");
26 Array.prototype.forEach.call(actived, (item: HTMLElement) => {
27 item.classList.remove("active");
28 });
29 if (result) {
30 const item = document.querySelector(`.todo-item-${result._id}`);
31 item.classList.add("active");
32 }
33 });
34
35const enter$ = type$.filter((r) => r.keyCode === 13);
36
37const clickAdd$ = Observable.fromEvent<MouseEvent>($add, "click");
38
39const input$ = enter$.merge(clickAdd$);
40
41const clearInputSubject$ = new Subject<any>();
42
43const item$ = input$
44 .map(() => $input.value)
45 .filter((r) => r !== "")
46 // 使输入框值不变时无法走到请求取消的环节
47 // 返回与之前源ob都不同的值
48 // .distinct()
49 // 避免在输入值相同时也过滤掉ob
50 // flushes会清空缓存:flushes会被订阅,并在完成时清空缓存
51 .distinct(null, clearInputSubject$)
52 // 当ob内部流动的值同样是ob时 订阅最新的一个 将其的值传给下一个操作符 并取消对上一个的订阅
53 // 实际上就是先switch然后map
54 .switchMap(mockHttpPost)
55 .map(createTodoItem)
56 .do((ele: HTMLLIElement) => {
57 $list.appendChild(ele);
58 $input.value = "";
59 })
60 .publishReplay(1)
61 .refCount();
62
63const toggle$ = item$
64 .mergeMap(($todoItem) => {
65 return (
66 Observable.fromEvent<MouseEvent>($todoItem, "click")
67 // 300ms内只会发出一次
68 .debounceTime(300)
69 .filter((e) => e.target === $todoItem)
70 .mapTo({
71 data: {
72 _id: $todoItem.dataset["id"],
73 isDone: $todoItem.classList.contains("done"),
74 },
75 $todoItem,
76 })
77 );
78 })
79 .switchMap((result) =>
80 mockToggle(result.data._id, result.data.isDone)
81 // 映射回原对象
82 .mapTo(result.$todoItem)
83 );
84
85const remove$ = item$
86 .mergeMap(($todoItem) => {
87 const $removeButton = $todoItem.querySelector(".button-remove");
88 return Observable.fromEvent($removeButton, "click").mapTo($todoItem);
89 })
90 .do(($todoItem: HTMLElement) => {
91 // 从 DOM 上移掉 todo item
92 const $parent = $todoItem.parentNode;
93 $parent.removeChild($todoItem);
94 });
95
96const app$ = toggle$.merge(remove$, search$).do((r) => {
97 console.log(r);
98});
99
100app$.subscribe();

注意点:

  • 多播 ob 的应用场景, 及派生 ob
  • distinct 的 flushes

  • 文件上传: 断点续传与恢复/分片/进度展示

后面会用框架重构

1import { Observable, Subscriber, Subject } from "rxjs";
2// spark-md5 没有第三方 .d.ts 文件,这里用 commonjs 风格的 require 它
3// 如果未再 tsconfig.json 中设置 noImplicitAny: true 且 TypeScript 版本大于 2.1 则也可以用
4// import * as SparkMD5 from 'spark-md5' 的方式引用
5const SparkMD5 = require("spark-md5");
6// @warn memory leak
7const $attachment = document.querySelector(".attachment");
8const $progressBar = document.querySelector(".progress-bar") as HTMLElement;
9const apiHost = "http://127.0.0.1:5000/api";
10
11interface FileInfo {
12 fileSize: number;
13 fileMD5: string;
14 lastUpdated: string;
15 fileName: string;
16}
17
18interface ChunkMeta {
19 fileSize: number;
20 chunkSize: number;
21 chunks: number;
22 fileKey: string;
23}
24
25type Action = "pause" | "resume" | "progress" | "complete";
26
27export class FileUploader {
28 // 有文件进入时触发
29 // 原始流
30 private file$ = Observable.fromEvent($attachment, "change")
31 .map((r: Event) => (r.target as HTMLInputElement).files[0])
32 .filter((f) => !!f);
33
34 private click$ = Observable.fromEvent($attachment, "click")
35 // 过滤子节点冒泡
36 .map((e: Event) => e.target)
37 .filter((e: HTMLElement) => e === $attachment)
38 // 1-2-3 上传-暂停-继续
39 // scan会不断生成这几个状态
40 .scan((acc: number, val: HTMLElement) => {
41 if (val.classList.contains("glyphicon-paperclip")) {
42 return 1;
43 }
44 if (acc === 2) {
45 return 3;
46 }
47 return 2;
48 }, 3)
49 .filter((v) => v !== 1)
50 // 根据状态改变icon样式
51 .do((v) => {
52 console.log(v);
53 if (v === 2) {
54 this.action$.next({ name: "pause" });
55 $attachment.classList.remove("glyphicon-pause");
56 $attachment.classList.add("glyphicon-play");
57 } else {
58 this.action$.next({ name: "resume" });
59 this.buildPauseIcon();
60 }
61 })
62 .map((v) => ({ action: v === 2 ? "PAUSE" : "RESUME", payload: null }));
63
64 private action$ = new Subject<{
65 name: Action;
66 payload?: any;
67 }>();
68
69 private pause$ = this.action$.filter((ac) => ac.name === "pause");
70 private resume$ = this.action$.filter((ac) => ac.name === "resume");
71
72 // 进度流
73 private progress$ = this.action$
74 .filter((action) => action.name === "progress")
75 .map((action) => action.payload)
76 .distinctUntilChanged((x: number, y: number) => x - y >= 0)
77 .do((r) => {
78 const percent = Math.round(r * 100);
79 $progressBar.style.width = `${percent}%`;
80 $progressBar.firstElementChild.textContent = `${
81 percent > 1 ? percent - 1 : percent
82 } %`;
83 })
84 .map((r) => ({ action: "PROGRESS", payload: r }));
85
86 public uploadStream$ = this.file$
87 .switchMap(this.readFileInfo)
88 .switchMap((i) =>
89 Observable.ajax
90 // 获取分片信息(并不是实际上传)
91 .post(`${apiHost}/upload/chunk`, i.fileinfo)
92 .map((r) => {
93 // 按照返回的分片信息进行分片
94 const blobs = this.slice(
95 i.file,
96 r.response.chunks,
97 r.response.chunkSize
98 );
99 return { blobs, chunkMeta: r.response };
100 })
101 )
102 // 创建暂停按钮
103 .do(() => this.buildPauseIcon())
104 .switchMap(({ blobs, chunkMeta }) => {
105 const uploaded: number[] = [];
106 const dists = blobs.map((blob, index) => {
107 let currentLoaded = 0;
108 return this.uploadChunk(chunkMeta, index, blob).do((r) => {
109 currentLoaded = r.loaded / chunkMeta.fileSize;
110 uploaded[index] = currentLoaded;
111 const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0));
112 // 计算进度
113 this.action$.next({ name: "progress", payload: percent });
114 });
115 });
116
117 // 并发上传所有分片(并发度3)
118 const uploadStream = Observable.from(dists).mergeAll(this.concurrency);
119
120 // 所有分片上传完毕后输出值
121 // 再映射到本次切片的元数据chunkMeta
122 return Observable.forkJoin(uploadStream).mapTo(chunkMeta);
123 })
124 // 上传分片
125 .switchMap((r: ChunkMeta) =>
126 Observable.ajax
127 .post(`${apiHost}/upload/chunk/${r.fileKey}`)
128 // 将请求映射到上传状态
129 .mapTo({
130 action: "UPLOAD_SUCCESS",
131 payload: r,
132 })
133 )
134 .do(() => {
135 $progressBar.firstElementChild.textContent = "100 %";
136 // restore icon
137 $attachment.classList.remove("glyphicon-pause");
138 $attachment.classList.add("glyphicon-paperclip");
139 ($attachment.firstElementChild as HTMLInputElement).disabled = false;
140 })
141 // 与过程流 点击流合并
142 .merge(this.progress$, this.click$);
143
144 constructor(private concurrency = 3) {}
145
146 // side effect
147 private buildPauseIcon() {
148 $attachment.classList.remove("glyphicon-paperclip");
149 $attachment.classList.add("glyphicon-pause");
150 ($attachment.firstElementChild as HTMLInputElement).disabled = true;
151 }
152
153 // 读取文件信息
154 // 拿到文件流后会附带文件信息与MD5信息
155 // 用于uploadSream$的第一次switchMap
156 private readFileInfo(
157 file: File
158 ): Observable<{ file: File; fileinfo: FileInfo }> {
159 const reader = new FileReader();
160 const spark = new SparkMD5.ArrayBuffer();
161 reader.readAsArrayBuffer(file);
162 return Observable.create(
163 (observer: Subscriber<{ file: File; fileinfo: FileInfo }>) => {
164 reader.onload = (e: Event) => {
165 spark.append((e.target as FileReader).result);
166 const fileMD5 = spark.end();
167 observer.next({
168 file,
169 fileinfo: {
170 fileMD5,
171 fileSize: file.size,
172 lastUpdated: file.lastModifiedDate.toISOString(),
173 fileName: file.name,
174 },
175 });
176 observer.complete();
177 };
178 return () => {
179 if (!reader.result) {
180 console.warn("read file aborted");
181 reader.abort();
182 }
183 };
184 }
185 );
186 }
187
188 private slice(file: File, n: number, chunkSize: number): Blob[] {
189 const result: Blob[] = [];
190 for (let i = 0; i < n; i++) {
191 const startSize = i * chunkSize;
192 const slice = file.slice(
193 startSize,
194 i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize
195 );
196 result.push(slice);
197 }
198 return result;
199 }
200
201 private uploadChunk(
202 meta: ChunkMeta,
203 index: number,
204 blob: Blob
205 ): Observable<ProgressEvent> {
206 const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${
207 index + 1
208 }&chunks=${meta.chunks}`;
209 return Observable.create((subscriber: Subscriber<ProgressEvent>) => {
210 const ajax$ = Observable.ajax({
211 url: host,
212 body: blob,
213 method: "post",
214 crossDomain: true,
215 headers: { "Content-Type": "application/octet-stream" },
216 // 进度获取
217 progressSubscriber: subscriber,
218 })
219 // 在暂停流有输出时 停止输出值
220 .takeUntil(this.pause$)
221 // 在恢复流有输出时 重复
222 .repeatWhen(() => this.resume$);
223 const subscription = ajax$.subscribe();
224 return () => subscription.unsubscribe();
225 }).retryWhen(() => this.resume$);
226 }
227}