— 4 min read
不愧是"海量 API", 我人看傻了, 一边学一边记录常用的好了.
会先把大致的作用都过一遍再回过头来补充代码示例, 这样可以互相比较嘛.
of 接收任意参数并转换为 ob
ajax 基于 Ajax 请求创建一个 ob (导出自 rxjs/ajax)
使用方式:
1import { ajax } from "rxjs/ajax";2import { map, catchError } from "rxjs/operators";3import { of } from "rxjs";4
5const obs$ = ajax(`https://api.github.com/users?per_page=5`).pipe(6 map((userResponse) => console.log("users: ", userResponse)),7 catchError((error) => {8 console.log("error: ", error);9 return of(error);10 })11);
类似于 axios,ajax 方法也可以接受一个由 url、method、headers、body 等属性组成的对象
from 接收一组参数并转为一组 ob
scheduler 已被弃用,应当将 of 替换为 scheduled 操作符
可被转化为 Observable 的类型一览
1export type ObservableInput<T> =2 | Observable<T>3 | InteropObservable<T>4 | AsyncIterable<T>5 | PromiseLike<T>6 | ArrayLike<T>7 | Iterable<T>8 | ReadableStreamLike<T>;
fromEvent 从 DOM 事件产生 ob
fromEventPattern 使用 addHandler 与 removeHanler 创建 ob
defer 惰性创建,在被订阅时才会调用工厂函数创建一个 Ob
generate 基于条件与迭代器不断产生值, (initial, continueConditionFunc, iteratorFunc, resultSelector), 在条件为 true 时不断进行迭代
empty 抛出一个直接 complete 的 ob,不返回值,应使用 EMPTY 替代。
subscript.complete()
throwError 抛出一个直接 error 的 ob,接受一个工厂函数,用于创建错误实例。
subscript.error()
never 不会产生值也不会 complete 的 ob。已废弃,使用 NEVER 替代。
interval(period)定时器流, 从 0 开始产生值。
timer (initialDelay,period)可延时的 interval,从 0 开始产生值。
range (start, count)产生范围 ob。
repeat (count = Infinity)重复源 ob 产生的流。
buffer 当 closingNotifier 有输出时才会输出缓冲区数据,以数组的形式
concatMap (project,resultSelector),相当于在源 ob 的每个值后附加一连串串行 ob(处理完一串才会到下一串, 就像并发为 1 的 mergeMap)。
用于产生高阶 Ob(concat、concatAll 是组合操作符,一个接收 Ob,一个无参数,concatAll 在管道中的上一级通常会 map 回来新的 Ob 或是,然后 concatAll 会依次订阅这些 Ob)
实例
1const source = of(2000, 1000);2// map value from source into inner observable, when complete emit result and move to next3const example = source.pipe(4 concatMap((val) => of(`Delayed by: ${val}ms`).pipe(delay(val)))5);6
7//output: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms8const subscribe = example.subscribe((val) =>9 console.log(`With concatMap: ${val}`)10);
concatMapTo 接收 innerOb 与 resultSelector,源值会被直接替换 并且最后会展平为单个 ob 来输出
map mapTo 映射处理
merge 合并多个 ob 源(可控制并发)
scan 对每个源值应用累加器, 返回中间 ob 与最终 ob
switchMap 接收 project 与 resultSelector 将源值映射后合并到最终 ob 最终只会发出最新投射的值
groupBy 基于 keySelector/eleSelector 进行分组
pairwise 从第 2 项开始成对的发送当前值与前一个值 [1,2]
partition 将源一分为二: 满足与不满足的
pluck 将源值映射为源值的键值
window 在内部 ob 发出项时, 将源 ob 的值分割为嵌套的 ob
combineLatest 接收多个 ob 在任意一个输入 ob 产生值时发出源 ob 与所有输入 ob 最新值(最新值是相对于单个 Ob 来说的)的组合
combineAll 接收一个高阶 ob 收集所有内部 ob 在最外部 ob 完成时订阅所有已经收集的 ob 并通过 combineLatest 打平。
具体地说,在源 Ob 完成后,对内部 ob 应用 combineLatest
实例:
1import { take, map, combineAll } from "rxjs/operators";2import { interval } from "rxjs";3
4const source$ = interval(1000).pipe(take(2));5
6const example$ = source$.pipe(7 map((val) =>8 interval(1000).pipe(9 map((i) => `Result (${val}): ${i}`),10 take(5)11 )12 )13);14
15// source$2 比 source$1晚产生1s16example$17 .pipe(combineAll())18 /*19 output:20 ["Result (0): 0", "Result (1): 0"]21 ["Result (0): 1", "Result (1): 0"]22 ["Result (0): 1", "Result (1): 1"]23 ["Result (0): 2", "Result (1): 1"]24 ["Result (0): 2", "Result (1): 2"]25 ["Result (0): 3", "Result (1): 2"]26 ["Result (0): 3", "Result (1): 3"]27 ["Result (0): 4", "Result (1): 3"]28 ["Result (0): 4", "Result (1): 4"]29*/30 .subscribe(console.log);
concat (...obs)顺序的连结多个 ob,但是在一个结束后才会开始下一个
concatAll,同样用于处理高阶 Ob。它的处理方式是对于返回的内部 Ob,一个个的进行订阅。就像组合版本的 combineAll(combineAll 是[a, 1], [b, 2]的"组合", 而 concatAll 是 a,b,1,2 这样的"连结")
实例
1import { take, concatAll } from "rxjs/operators";2import { interval, of } from "rxjs";3
4// interval返回的还是Ob,这里之前是个误区,包括of等返回的都是Ob5const obs1 = interval(1000).pipe(take(5));6const obs2 = interval(500).pipe(take(2));7const obs3 = interval(2000).pipe(take(1));8//emit three observables9const source = of(obs1, obs2, obs3);10
11const example = source.pipe(concatAll());12
13/*14 output: 0,1,2,3,4,0,1,015*/16const subscribe = example.subscribe((val) => console.log(val));
如果对 ob 的执行顺序无要求, 可以使用 merge
merge 将多个 ob 组合到一个(不是 combineLatest 那种将多个产生值合并为一个再输出, 也不是 concat 那种一个完了再下一个), 就像是字面意思...的合并到一个 Ob。
exhaustAll 专一版本的 mergeAll 会在当前专注的内部 ob 未完成时丢弃掉其他 ob 发出的值
switchAll 花心版本的 exhaust 会丢弃掉当前专注的 ob 订阅新的有值发出的 ob, 常用于请求竞态
forkJoin(属于静态方法,不会被用在 pipe 中) 在接受的所有 ob 完成时输出每个 ob 最后的结果组成的值
race 发出最新的值,在遇到错误时,不会做出响应(所以不会被 catchError catch 到)
startWith 在 pipe 中存在时, 会先发出其内部的 ob 再发出源 ob
withLatestFrom 在源 ob 发出值时使用此值和输入 ob 的最新值计算输出值
zip 组合多个 ob 最后得到一个 ob 值来自于输入的 ob 按顺序计算而来
tap(原先的 do) 在每次源 ob 发送值时 执行一次操作 不影响返回的 ob
delay 延迟源 ob 发送
materialize 将 ob 包装为 Notification 类型, 此对象是{kind:"", value:"", error:undefined, hasValue: true}
的形式
observerOn: 基于指定的调度器重新发出通知, 如Scheduler.animationFrame
timeInterval 发出包含当前值以及与前一次发出值间隔的时间(基于调度器的 now 方法获取每次的当前时间, 再计算时间差)
timestamp 在产生值时 为其附加一个时间戳
timeout 在指定时间内没有 ob 产生时抛出错误
toArray 将源 ob 的所有值收集到数组中
finalize 在 ob 结束后执行操作
Subject 是特殊的 ob, 它能够在多个观察者之间共享一个 ob(在正常情况下, 一个 ob 的多个观察者, 每一个都会重新执行这个 ob)
Subject 实际上也是 EventEmitter, 它将维护多个观察者的注册信息
multicast 操作符在底层实际上使用了 Subject
Subject 可以被订阅, 并且订阅者并不能区分自己订阅的是 ob 还是 sub, 但 subscribe 方法实际上不会直接调用 ob, 而是像 addEventListener 那样新增一个订阅者到注册信息中
Subject 可以通过 next error complete 方法来将值/状态传递给所有观察者们
1const sub = new Subject();2
3sub.subscribe((x) => console.log(`1-${x}`));4sub.subscribe((x) => console.log(`2-${x}`));5
6sub.next("item1");7sub.next("item2");8sub.next("item3");9
10sub.complete();11
121 - item1;132 - item1;141 - item2;152 - item2;161 - item3;172 - item3;
Subject 同时还可以作为观察者, 也就是直接被传给 ob 的 subscribe 方法
1const sourceOb = from([1, 2, 3]);2
3const sub = new Subject();4
5sub.subscribe((x) => console.log(`1-${x}`));6sub.subscribe((x) => console.log(`2-${x}`));7
8sourceOb.subscribe(sub);9
101 - 1;112 - 1;121 - 2;132 - 2;141 - 3;152 - 3;
这里可以理解为, sub 上注册的订阅者现在将多播的订阅 sourceOb
Subject 是将任意 ob 执行同时在多个订阅者之间共享的唯一方式
多播的 ob 会将通知通过一个可能拥有多个订阅者的 subject 发送出去
multicast 的底层原理: 观察者订阅 subject, 由 subject 进行注册, 然后再由 subject 去订阅源 ob, 将源 ob 的值多播给多个观察者:
1const sourceOb = from([1, 2, 3]);2
3const sub = new Subject();4
5const multicasted = sourceOb.pipe(multicast(sub));6
7// 实际上即是sub.subscribe8multicasted.subscribe((x) => console.log(`1-${x}`));9multicasted.subscribe((x) => console.log(`2-${x}`));10
11sourceOb.subscribe(sub);
假设要实现第一个观察者到达时自动连结, 最后一个观察者取消订阅时自动取消共享, 使用手动订阅需要自己一个个处理 subscribe 和 unsubscribe, 在这种情况下, 可以将 refCount 方法加入到管道中, 它会在第一个订阅者出现时让多播 ob 自动启动(共享), 在最后一个订阅者离开时取消共享.
1const source = interval(500);2const subject = new Subject();3const refCounted = source.pipe(multicast(subject), refCount());4
5let subscription1: Subscription, subscription2: Subscription;6
7console.log("observerA subscribed");8// 会自动开始共享ob 因为第一个订阅者出现了9subscription1 = refCounted.subscribe({10 next: (v) => console.log(`observerA: ${v}`),11});12
13setTimeout(() => {14 console.log("observerB subscribed");15 subscription2 = refCounted.subscribe({16 next: (v) => console.log(`observerB: ${v}`),17 });18}, 600);19
20setTimeout(() => {21 console.log("observerA unsubscribed");22 subscription1.unsubscribe();23}, 1200);24
25// 取消共享 因为没有观察者了26setTimeout(() => {27 console.log("observerB unsubscribed");28 subscription2.unsubscribe();29}, 2000);
新增了"当前值"的概念, 会保存被发送的最新值, 并且当新的观察者参与订阅时会立即接收到当前值.
1const subject = new BehaviorSubject(0); // 0 is the initial value2
3subject.subscribe({4 next: (v) => console.log(`observerA: ${v}`),5});6
7subject.next(1);8subject.next(2);9
10subject.subscribe({11 next: (v) => console.log(`observerB: ${v}`),12});13
14subject.next(3);15
16observerA: 0;17observerA: 1;18observerA: 2;19observerB: 2;20observerA: 3;21observerB: 3;
第二个观察者接收到的首个值就是 2
类似前一个, 但它可以发送旧的值给订阅者(根据实例化时的缓冲长度决定)
1subject.subscribe({2 next: (v) => console.log(`observerA: ${v}`),3});4
5subject.next(1);6subject.next(2);7subject.next(3);8subject.next(4);9
10subject.subscribe({11 next: (v) => console.log(`observerB: ${v}`),12});13
14subject.next(5);15
16// observerA: 117// observerA: 218// observerA: 319// observerA: 420// observerB: 221// observerB: 322// observerB: 423// observerA: 524// observerB: 5
B 加入时, 最新的 3 个值: 2 3 4 会被发送给 B
除了指定缓冲长度, 还可以指定缓存时间
只有当当前共享的 Ob 完成时, 才会将最后一个值发送给所有订阅者
只发送一个值, 类似 last 操作符
1const subject = new AsyncSubject();2
3subject.subscribe({4 next: (v) => console.log(`observerA: ${v}`),5});6
7subject.next(1);8subject.next(2);9subject.next(3);10subject.next(4);11
12subject.subscribe({13 next: (v) => console.log(`observerB: ${v}`),14});15
16subject.next(5);17subject.complete();18
19// Logs:20// observerA: 521// observerB: 5
要实现的功能:
这篇文章使用的 RxJS 版本是 5.x, 也就是说在 6.x 使用需要把链式调用转换为
.pipe
调用
1import { Observable } from "rxjs";2import { createTodoItem } from "./lib";3
4const $input = <HTMLInputElement>document.querySelector(".todo-val");5const $list = <HTMLUListElement>document.querySelector(".list-group");6const $add = document.querySelector(".button-add");7
8const enter$ = Observable.fromEvent<KeyboardEvent>($input, "keydown").filter(9 // 监听enter键作为一个单独的流10 (r) => r.keyCode === 1311);12
13// add键的点击同理14const clickAdd$ = Observable.fromEvent<MouseEvent>($add, "click");15
16// 将enter与add点击监听合并为一个流17const input$ = enter$.merge(clickAdd$);18
19const item$ = input$20 // 提取输入值21 .map(() => $input.value)22 // 去空23 .filter((r) => r !== "")24 // 返回一个DOM片段25 .map(createTodoItem)26 // 使用do来执行副作用/DOM操作等27 .do((ele: HTMLLIElement) => {28 $list.appendChild(ele);29 // 清空输入框30 $input.value = "";31 })32 .publishReplay(1)33 .refCount();34
35const toggle$ = item$36 // mergeMap 对每个值进行一次映射 并使用mergeAll展平最后的结果37 .mergeMap(($todoItem) =>38 // 将item映射到item的点击事件39 Observable.fromEvent<MouseEvent>($todoItem, "click")40 // 仅关心item被点击41 .filter((e) => e.target === $todoItem)42 // 再将每次点击映射回item43 .mapTo($todoItem)44 )45 .do(($todoItem: HTMLElement) => {46 // 处理样式(标识状态)47 if ($todoItem.classList.contains("done")) {48 $todoItem.classList.remove("done");49 } else {50 $todoItem.classList.add("done");51 }52 });53
54const remove$ = item$55 .mergeMap(($todoItem) => {56 const $removeButton = $todoItem.querySelector(".button-remove");57 return Observable.fromEvent($removeButton, "click").mapTo($todoItem);58 })59 .do(($todoItem: HTMLElement) => {60 // 从 DOM 上移掉 todo item61 const $parent = $todoItem.parentNode;62 $parent.removeChild($todoItem);63 });64
65const app$ = toggle$.merge(remove$).do((r) => console.log(r));66
67app$.subscribe();68
69// lib70export const createTodoItem = (val: string) => {71 const result = <HTMLLIElement>document.createElement("LI");72 result.classList.add("list-group-item");73 const innerHTML = `74 ${val}75 <button type="button" class="btn btn-default button-remove" aria-label="right Align">76 <span class="glyphicon glyphicon-remove" aria-hidden="true"></span>77 </button>78 `;79 result.innerHTML = innerHTML;80 return result;81};
主要关注:
由于 add 键点击与 enter 的效果相同, 所以可以将这两个流合并(merge)
为每一个 item 进行一次映射, 然后展平内部的所有 ob >>> mergeMap
将 item 映射到 item 的点击事件, 过滤 target 不为对应 item 的点击事件, 再将满足条件的点击事件映射回(mapTo)item, 然后在 do 中对这个 item 进行操作
由于 ob 默认是单播的, 即会为每一个订阅者进行一次独特的执行. 这里假设先点击了 toggle, 执行完毕后点击 remove, remove 会重新去订阅, 但此时已经没有流会产生(因为没有输入事件了), 因此需要将此 ob 多播, 猜测 publishReplay 即为 shareReplay, 思路类似, refCount 则用于自动启用?
实现功能:
1import { Observable, Subject } from "rxjs";2import {3 createTodoItem,4 mockToggle,5 mockHttpPost,6 search,7 HttpResponse,8} from "./lib";9
10const $input = <HTMLInputElement>document.querySelector(".todo-val");11const $list = <HTMLUListElement>document.querySelector(".list-group");12const $add = document.querySelector(".button-add");13
14// 后面的 search$ 与 enter 应该时从同一个 Observable 中转换出来,这里将 input 事件的 Observable publish 成 muticast15const type$ = Observable.fromEvent<KeyboardEvent>($input, "keydown")16 .publish()17 .refCount();18
19const search$ = type$20 .debounceTime(200)21 .filter((evt) => evt.keyCode !== 13)22 .map((result) => (<HTMLInputElement>result.target).value)23 .switchMap(search)24 .do((result: HttpResponse | null) => {25 const actived = document.querySelectorAll(".active");26 Array.prototype.forEach.call(actived, (item: HTMLElement) => {27 item.classList.remove("active");28 });29 if (result) {30 const item = document.querySelector(`.todo-item-${result._id}`);31 item.classList.add("active");32 }33 });34
35const enter$ = type$.filter((r) => r.keyCode === 13);36
37const clickAdd$ = Observable.fromEvent<MouseEvent>($add, "click");38
39const input$ = enter$.merge(clickAdd$);40
41const clearInputSubject$ = new Subject<any>();42
43const item$ = input$44 .map(() => $input.value)45 .filter((r) => r !== "")46 // 使输入框值不变时无法走到请求取消的环节47 // 返回与之前源ob都不同的值48 // .distinct()49 // 避免在输入值相同时也过滤掉ob50 // flushes会清空缓存:flushes会被订阅,并在完成时清空缓存51 .distinct(null, clearInputSubject$)52 // 当ob内部流动的值同样是ob时 订阅最新的一个 将其的值传给下一个操作符 并取消对上一个的订阅53 // 实际上就是先switch然后map54 .switchMap(mockHttpPost)55 .map(createTodoItem)56 .do((ele: HTMLLIElement) => {57 $list.appendChild(ele);58 $input.value = "";59 })60 .publishReplay(1)61 .refCount();62
63const toggle$ = item$64 .mergeMap(($todoItem) => {65 return (66 Observable.fromEvent<MouseEvent>($todoItem, "click")67 // 300ms内只会发出一次68 .debounceTime(300)69 .filter((e) => e.target === $todoItem)70 .mapTo({71 data: {72 _id: $todoItem.dataset["id"],73 isDone: $todoItem.classList.contains("done"),74 },75 $todoItem,76 })77 );78 })79 .switchMap((result) =>80 mockToggle(result.data._id, result.data.isDone)81 // 映射回原对象82 .mapTo(result.$todoItem)83 );84
85const remove$ = item$86 .mergeMap(($todoItem) => {87 const $removeButton = $todoItem.querySelector(".button-remove");88 return Observable.fromEvent($removeButton, "click").mapTo($todoItem);89 })90 .do(($todoItem: HTMLElement) => {91 // 从 DOM 上移掉 todo item92 const $parent = $todoItem.parentNode;93 $parent.removeChild($todoItem);94 });95
96const app$ = toggle$.merge(remove$, search$).do((r) => {97 console.log(r);98});99
100app$.subscribe();
注意点:
后面会用框架重构
1import { Observable, Subscriber, Subject } from "rxjs";2// spark-md5 没有第三方 .d.ts 文件,这里用 commonjs 风格的 require 它3// 如果未再 tsconfig.json 中设置 noImplicitAny: true 且 TypeScript 版本大于 2.1 则也可以用4// import * as SparkMD5 from 'spark-md5' 的方式引用5const SparkMD5 = require("spark-md5");6// @warn memory leak7const $attachment = document.querySelector(".attachment");8const $progressBar = document.querySelector(".progress-bar") as HTMLElement;9const apiHost = "http://127.0.0.1:5000/api";10
11interface FileInfo {12 fileSize: number;13 fileMD5: string;14 lastUpdated: string;15 fileName: string;16}17
18interface ChunkMeta {19 fileSize: number;20 chunkSize: number;21 chunks: number;22 fileKey: string;23}24
25type Action = "pause" | "resume" | "progress" | "complete";26
27export class FileUploader {28 // 有文件进入时触发29 // 原始流30 private file$ = Observable.fromEvent($attachment, "change")31 .map((r: Event) => (r.target as HTMLInputElement).files[0])32 .filter((f) => !!f);33
34 private click$ = Observable.fromEvent($attachment, "click")35 // 过滤子节点冒泡36 .map((e: Event) => e.target)37 .filter((e: HTMLElement) => e === $attachment)38 // 1-2-3 上传-暂停-继续39 // scan会不断生成这几个状态40 .scan((acc: number, val: HTMLElement) => {41 if (val.classList.contains("glyphicon-paperclip")) {42 return 1;43 }44 if (acc === 2) {45 return 3;46 }47 return 2;48 }, 3)49 .filter((v) => v !== 1)50 // 根据状态改变icon样式51 .do((v) => {52 console.log(v);53 if (v === 2) {54 this.action$.next({ name: "pause" });55 $attachment.classList.remove("glyphicon-pause");56 $attachment.classList.add("glyphicon-play");57 } else {58 this.action$.next({ name: "resume" });59 this.buildPauseIcon();60 }61 })62 .map((v) => ({ action: v === 2 ? "PAUSE" : "RESUME", payload: null }));63
64 private action$ = new Subject<{65 name: Action;66 payload?: any;67 }>();68
69 private pause$ = this.action$.filter((ac) => ac.name === "pause");70 private resume$ = this.action$.filter((ac) => ac.name === "resume");71
72 // 进度流73 private progress$ = this.action$74 .filter((action) => action.name === "progress")75 .map((action) => action.payload)76 .distinctUntilChanged((x: number, y: number) => x - y >= 0)77 .do((r) => {78 const percent = Math.round(r * 100);79 $progressBar.style.width = `${percent}%`;80 $progressBar.firstElementChild.textContent = `${81 percent > 1 ? percent - 1 : percent82 } %`;83 })84 .map((r) => ({ action: "PROGRESS", payload: r }));85
86 public uploadStream$ = this.file$87 .switchMap(this.readFileInfo)88 .switchMap((i) =>89 Observable.ajax90 // 获取分片信息(并不是实际上传)91 .post(`${apiHost}/upload/chunk`, i.fileinfo)92 .map((r) => {93 // 按照返回的分片信息进行分片94 const blobs = this.slice(95 i.file,96 r.response.chunks,97 r.response.chunkSize98 );99 return { blobs, chunkMeta: r.response };100 })101 )102 // 创建暂停按钮103 .do(() => this.buildPauseIcon())104 .switchMap(({ blobs, chunkMeta }) => {105 const uploaded: number[] = [];106 const dists = blobs.map((blob, index) => {107 let currentLoaded = 0;108 return this.uploadChunk(chunkMeta, index, blob).do((r) => {109 currentLoaded = r.loaded / chunkMeta.fileSize;110 uploaded[index] = currentLoaded;111 const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0));112 // 计算进度113 this.action$.next({ name: "progress", payload: percent });114 });115 });116
117 // 并发上传所有分片(并发度3)118 const uploadStream = Observable.from(dists).mergeAll(this.concurrency);119
120 // 所有分片上传完毕后输出值121 // 再映射到本次切片的元数据chunkMeta122 return Observable.forkJoin(uploadStream).mapTo(chunkMeta);123 })124 // 上传分片125 .switchMap((r: ChunkMeta) =>126 Observable.ajax127 .post(`${apiHost}/upload/chunk/${r.fileKey}`)128 // 将请求映射到上传状态129 .mapTo({130 action: "UPLOAD_SUCCESS",131 payload: r,132 })133 )134 .do(() => {135 $progressBar.firstElementChild.textContent = "100 %";136 // restore icon137 $attachment.classList.remove("glyphicon-pause");138 $attachment.classList.add("glyphicon-paperclip");139 ($attachment.firstElementChild as HTMLInputElement).disabled = false;140 })141 // 与过程流 点击流合并142 .merge(this.progress$, this.click$);143
144 constructor(private concurrency = 3) {}145
146 // side effect147 private buildPauseIcon() {148 $attachment.classList.remove("glyphicon-paperclip");149 $attachment.classList.add("glyphicon-pause");150 ($attachment.firstElementChild as HTMLInputElement).disabled = true;151 }152
153 // 读取文件信息154 // 拿到文件流后会附带文件信息与MD5信息155 // 用于uploadSream$的第一次switchMap156 private readFileInfo(157 file: File158 ): Observable<{ file: File; fileinfo: FileInfo }> {159 const reader = new FileReader();160 const spark = new SparkMD5.ArrayBuffer();161 reader.readAsArrayBuffer(file);162 return Observable.create(163 (observer: Subscriber<{ file: File; fileinfo: FileInfo }>) => {164 reader.onload = (e: Event) => {165 spark.append((e.target as FileReader).result);166 const fileMD5 = spark.end();167 observer.next({168 file,169 fileinfo: {170 fileMD5,171 fileSize: file.size,172 lastUpdated: file.lastModifiedDate.toISOString(),173 fileName: file.name,174 },175 });176 observer.complete();177 };178 return () => {179 if (!reader.result) {180 console.warn("read file aborted");181 reader.abort();182 }183 };184 }185 );186 }187
188 private slice(file: File, n: number, chunkSize: number): Blob[] {189 const result: Blob[] = [];190 for (let i = 0; i < n; i++) {191 const startSize = i * chunkSize;192 const slice = file.slice(193 startSize,194 i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize195 );196 result.push(slice);197 }198 return result;199 }200
201 private uploadChunk(202 meta: ChunkMeta,203 index: number,204 blob: Blob205 ): Observable<ProgressEvent> {206 const host = `${apiHost}/upload/chunk/${meta.fileKey}?chunk=${207 index + 1208 }&chunks=${meta.chunks}`;209 return Observable.create((subscriber: Subscriber<ProgressEvent>) => {210 const ajax$ = Observable.ajax({211 url: host,212 body: blob,213 method: "post",214 crossDomain: true,215 headers: { "Content-Type": "application/octet-stream" },216 // 进度获取217 progressSubscriber: subscriber,218 })219 // 在暂停流有输出时 停止输出值220 .takeUntil(this.pause$)221 // 在恢复流有输出时 重复222 .repeatWhen(() => this.resume$);223 const subscription = ajax$.subscribe();224 return () => subscription.unsubscribe();225 }).retryWhen(() => this.resume$);226 }227}