Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxJS 操作符指南 #12

Open
bigggge opened this issue Sep 27, 2017 · 0 comments
Open

RxJS 操作符指南 #12

bigggge opened this issue Sep 27, 2017 · 0 comments
Labels

Comments

@bigggge
Copy link
Member

bigggge commented Sep 27, 2017

RxJS Operators

rxmarbles
learn-rxjs-operators
RxJS

CREATION OBSERVABLES 创建

form

将数组、promise 或迭代器转换成 observable

var array = [10, 20, 30];
var result = Rx.Observable.from(array);
result.subscribe(x => console.log(x));

// 结果如下:
// 10 20 30

interval

基于给定时间间隔发出数字序列。

// 每1秒发出数字序列中的值
const source = Rx.Observable.interval(1000);
// 数字: 0,1,2,3,4,5....
const subscribe = source.subscribe(val => console.log(val));

of

按顺序发出任意数量的值。

// 发出任意类型值
const source = Rx.Observable.of({name: 'Brian'}, [1,2,3], function hello(){ return 'Hello'});
// 输出: {name: 'Brian}, [1,2,3], function hello() { return 'Hello' }
const subscribe = source.subscribe(val => console.log(val));

timer

给定持续时间后,再按照指定间隔时间依次发出数字。

/*
  timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
  然后每2秒发出序列值
*/
const source = Rx.Observable.timer(1000, 2000);
// 输出: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));

CONDITIONAL OPERATORS 条件

defaultIfEmpty

如果在完成前没有发出任何通知,那么发出给定的值

var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));

every

如果完成时所有的值都能通过断言,那么发出 true,否则发出 false 。

 Observable.of(1, 2, 3, 4, 5, 6)
    .every(x => x < 5)
    .subscribe(x => console.log(x)); // -> false

sequenceEqual

使用可选的比较函数,按顺序比较两个 Observables 的所有值,然后返回单个布尔值的 Observable, 以表示两个序列是否相等。

var code = Rx.Observable.from([
 "ArrowUp",
 "ArrowUp",
 "ArrowDown",
 "ArrowDown",
 "ArrowLeft",
 "ArrowRight",
 "ArrowLeft",
 "ArrowRight",
 "KeyB",
 "KeyA",
 "Enter" // no start key, clearly.
]);

var keys = Rx.Observable.fromEvent(document, 'keyup')
 .map(e => e.code);
var matches = keys.bufferCount(11, 1)
 .mergeMap(
   last11 =>
     Rx.Observable.from(last11)
       .sequenceEqual(code)
  );
matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));

COMBINATION OPERATORS 组合

combineLatest

当任意 observable 发出值时,发出每个 observable 的最新值。

var a = Rx.Observable.of(1, 2);
var b = Rx.Observable.of('a', 'b', 'c');
var observable = a.combineLatest(b, (a, b) => '' + a + b);
observable.subscribe(x => console.log(x));

// With output to console:
// 2a
// 2b
// 2c

concat

按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值。

// 发出 1,2,3
const sourceOne = Rx.Observable.of(1,2,3);
// 发出 4,5,6
const sourceTwo = Rx.Observable.of(4,5,6);
// 先发出 sourceOne 的值,当完成时订阅 sourceTwo
const example = sourceOne.concat(sourceTwo);
// 输出: 1,2,3,4,5,6
const subscribe = example.subscribe(val => console.log('Example: Basic concat:', val));

merge

将多个 observables 转换成单个 observable 。

// 每2.5秒发出值
const first = Rx.Observable.interval(2500);
// 每1秒发出值
const second = Rx.Observable.interval(1000);
// 作为实例方法使用
const example = first.merge(second);
// 输出: 0,1,0,2....
const subscribe = example.subscribe(val => console.log(val));

race

使用首先发出值的 observable 。

// 接收第一个发出值的 observable
const example = Rx.Observable.race(
  // 每1.5秒发出值
  Rx.Observable.interval(1500),
  // 每1秒发出值
  Rx.Observable.interval(1000).mapTo('1s won!'),
  // 每2秒发出值
  Rx.Observable.interval(2000),
  // 每2.5秒发出值
  Rx.Observable.interval(2500)
);
//输出: "1s won!"..."1s won!"...etc
const subscribe = example.subscribe(val => console.log(val));

startWith

发出给定的第一个值

// 发出 (1,2,3)
const source = Rx.Observable.of(1,2,3);
// 从0开始
const example =  source.startWith(0);
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));

withLatestFrom

结合源 Observable 和另外的 Observables 以创建新的 Observable, 该 Observable 的值由每 个 Observable 最新的值计算得出,当且仅当源发出的时候。

// 对于每个点击事件,发出一个包含最新时间和点击事件的数组。
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var result = clicks.withLatestFrom(timer);
result.subscribe(x => console.log(x));

zip

将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。

// 从不同的源头结合年龄和名称
let age$ = Observable.of(27, 25, 29);
let name$ = Observable.of('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of(true, true, false);

Observable
  .zip(age$,
    name$,
    isDev$,
    (age, name, isDev) => ({ age, name, isDev }))
  .subscribe(x => console.log(x));

// 输出:
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }
// { age: 29, name: 'Beer', isDev: false }

FILTERING OPERATORS 过滤

debounceTime

只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。

// 在一顿狂点后只发出最新的点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));

debounce

只有在另一个 Observable 决定的一段特定时间经过后并且没有发出另一个源值之后,才从源 Observable 中发出一个值。

var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

distinct

返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。

Observable.of(1, 1, 2, 2, 2, 3, 2, 1)
  .distinct()
  .subscribe(x => console.log(x)); // 1, 2, 3

distinctUntilChanged

返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。

Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3)
  .distinctUntilChanged()
  .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3

elementAt

只发出单个值,这个值位于源 Observable 的发送序列中的指定 index 处。

// 只发出第三次的点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));

// 结果:
// click 1 = nothing
// click 2 = nothing
// click 3 = 打印到控制台的 MouseEvent 对象

filter

通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。

// 只发出目标是 DIV 元素的点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));

find

只发出源 Observable 所发出的值中第一个满足条件的值。

var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

findIndex

first

ignoreElements

last

sample

当提供的 observable 发出时从源 observable 中取样。

// 每次点击, 取样最新的 "seconds" 时间器
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = seconds.sample(clicks);
result.subscribe(x => console.log(x));

skip

跳过N个(由参数提供)发出值。

// 每1秒发出值
const source = Rx.Observable.interval(1000);
// 跳过前5个发出值
const example = source.skip(5);
// 输出: 5...6...7...8........
const subscribe = example.subscribe(val => console.log(val));

skipUntil

skipWhile

take

只发出源 Observable 最初发出的的N个值 (N = count)。

var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));

takeLast

takeUntil

发出源 Observable 发出的值,直到 notifier Observable 发出值。

var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));

takeWhile

throttle

仅当由提供的函数所确定的持续时间已经过去时才发出值。

// 每1秒发出值
const source = Rx.Observable.interval(1000);
// throttle 3秒后才发出最新值
const example = source.throttle(val => Rx.Observable.interval(3000));
// 输出: 0...3...6...9
const subscribe = example.subscribe(val => console.log(val));

throttleTime

当指定的持续时间经过后发出最新值。

// 以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));

MATHEMATICAL OPERATORS 数学

count

max

min

reduce

TRANSFORMATION OPERATORS 转换

buffer

bufferCount

bufferTime

收集发出的值,直到经过了提供的时间才将其作为数组发出。

// 每一秒都发出最新点击事件的数组
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));

bufferToggle

bufferWhen

concatMap

将值映射成内部 observable,并按顺序订阅和发出。

// 发出 'Hello' 和 'Goodbye'
const source = Rx.Observable.of('Hello', 'Goodbye');
// 使用 promise 的示例
const examplePromise = val => new Promise(resolve => resolve(`${val} World!`));
// 将 source 的值映射成内部 observable,当一个完成发出结果后再继续下一个
const example = source.concatMap(val => examplePromise(val))
// 输出: 'Example w/ Promise: 'Hello World', Example w/ Promise: 'Goodbye World'
const subscribe = example.subscribe(val => console.log('Example w/ Promise:', val));

concatMapTo

map

将给定的 project 函数应用于源 Observable 发出的每个值,并将结果值作为 Observable 发出。

var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

mapTo

mergeMap

映射成 observable 并发出值。

// 将每个字母映射并打平成一个 Observable ,每1秒钟一次
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
  Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));

// 结果如下:
// a0
// b0
// c0
// a1
// b1
// c1
// 继续列出a、b、c加上各自的自增数列

mergeMapTo

pairwise

pluck

将每个源值(对象)映射成它指定的嵌套属性。

const source = Rx.Observable.from([
  {name: 'Joe', age: 30, job: {title: 'Developer', language: 'JavaScript'}},
  // 当找不到 job 属性的时候会返回 undefined
  {name: 'Sarah', age:35}
]);
// 提取 job 中的 title 属性
const example = source.pluck('job', 'title');
// 输出: "Developer" , undefined
const subscribe = example.subscribe(val => console.log(val));

repeat

scan

随着时间的推移进行归并。

// 计数点击次数
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(x => console.log(x));

switchMap

switchMapTo

UTILITY OPERATORS 工具

delay

根据给定时间延迟发出值。

// 发出一项
const example = Rx.Observable.of(null);
// 每延迟一次输出便增加1秒延迟时间
const message = Rx.Observable.merge(
    example.mapTo('Hello'),
    example.mapTo('World!').delay(1000),
    example.mapTo('Goodbye').delay(2000),
    example.mapTo('World!').delay(3000)
  );
// 输出: 'Hello'...'World!'...'Goodbye'...'World!'
const subscribe = message.subscribe(val => console.log(val));

delayWhen

延迟发出值,延迟时间由提供函数决定。

// 将每次点击延迟0到5秒的随机时间
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
  Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));

do

透明地执行操作或副作用,比如打印日志。

var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks
  .do(ev => console.log(ev))
  .map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

toPromise

将 observable 转换成 promise 。

// 返回基础的 observable
const sample = val => Rx.Observable.of(val).delay(5000);
// 将基础的 observable 转换成 promise
const example = sample('First Example')
  .toPromise()
  // 输出: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

ERROR HANDLING OPERATORS 错误处理

catch

处理 observable 序列中的错误。

// 创建立即拒绝的 Promise
const myBadPromise = () => new Promise((resolve, reject) => reject('Rejected!'));
// 1秒后发出单个值
const source = Rx.Observable.timer(1000);
// 捕获拒绝的 promise,并返回包含错误信息的 observable
const example = source.flatMap(() => Rx.Observable
                                       .fromPromise(myBadPromise())
                                       .catch(error => Rx.Observable.of(`Bad Promise: ${error}`))
                                    );
// 输出: 'Bad Promise: Rejected'
const subscribe = example.subscribe(val => console.log(val));

retry

如果发生错误,以指定次数重试 observable 序列。

// 每1秒发出值
const source = Rx.Observable.interval(1000);
const example = source
  .flatMap(val => {
    // 抛出错误以进行演示
    if(val > 5){
      return Rx.Observable.throw('Error!');
    }
    return Rx.Observable.of(val);
  })
  // 出错的话可以重试2次
  .retry(2);
/*
  输出: 
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  "Error!: Retried 2 times then quit!"
*/
const subscribe = example
  .subscribe({
     next: val => console.log(val),
     error: val => console.log(`${val}: Retried 2 times then quit!`)
});

retryWhen

MUILTCASTING OPERATORS 多播

share

在多个订阅者间共享源 observable 。

@bigggge bigggge added the JS label Sep 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant