Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用可观察对象的数据架构 #3

Open
chenbj2333 opened this issue May 27, 2017 · 0 comments
Open

使用可观察对象的数据架构 #3

chenbj2333 opened this issue May 27, 2017 · 0 comments

Comments

@chenbj2333
Copy link
Owner

chenbj2333 commented May 27, 2017

十分努力的啃一遍官网(缓慢学习中)

RxJS API 中文版
构建流式应用—RxJS详解
RxJS 中文文档
RxMarbles

rxjs是一个使用observable sequences(可观察序列)来构建异步和基于事件的程序的库。它提供了一个核心类型Observable,以及围绕着核心类型的类型Observer,Schedulers , Subjects,还有一些衍生自数组的operators,像map,filter,reduce,every等等。这些让我们可以以集合的方式处理异步事件。
简单的说,rxjs:reactive extensions for javascript,就是把随时间不断变化的数据,状态,事件等转化成可被观察的序列。

rxjs中解决异步事件管理的一些基本概念:

  • Observable(可观察对象):代表一个可调用的未来的值或者事件的集合。
  • Observer(观察者):回调集合,监听Observab发送的值。
  • Subscription:代表Observable的执行,主要用来取消执行。
  • Operators:纯函数,处理集合。
  • Subject:EventEmitter,将一个值或者事件多播到多个观察者的唯一途径。
  • Schedulers:集中调度控制并发。

first examples

通常注册事件监听的方式:

let button = document.querySelector('button');
button.addEventListener('click', () => console.log('clicked!'));

使用rxjs创建方式:

let button = document.querySelector('button');
Observable.fromEvent(button, 'click').subscribe(
  () => console.log('clicked!')
);

Purity 纯函数

什么叫纯函数?
给出同样的参数,该函数总是得出同样结果。不会调用任何会对外界产生影响的函数。比如没有数据库调用,没有http请求,也不会改变外部的数据结构。
使用rxjs来隔离状态

let button = document.querySelector('button');
Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));

scan操作符把每个中间过程中计算出的结果值发送出去,scan(count => count + 1, 0)中,0是初始值,每次点击时,会返回count => count + 1的结果,并把这个结果作为下一次函数运行时的初始值。

Flow 流

关于流的一些概念

  1. 在应用中,流扮演着和承诺(promise)一样的角色。promise发出单个值,而流发出多个值。可以在流上持续响应数据的变化。
  2. 在响应式编程中,代码订阅了数据变化时接收通知,流会把数据推送给这些订阅者。
  3. RxJS是函数式的。
  4. 流是可以组合的。可以把流想像成一个贯穿数据的操作管道。可以订阅流中的任何部分,也可以组合他们创建新的流。
    rxjs有一系列的operators,他们能帮助你去控制事件如何流入可观察对象observables。
    例如
let button = document.querySelector('button');
Observable.fromEvent(button, 'click').throttleTime(1000)
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));

Values 值

可以通过可观察对象来转化值
例如,下面的程序是在每次点击鼠标时获取x坐标的位置

let button = document.querySelector('button');
Observable.fromEvent(button, 'click')
.throttleTime(1000)
.map(event => event.clientX)
.scan((count, clientX) => count + clientX, 0)
.subscribe(count => console.log(count));

Observable 可观察对象

pull and push

data Producer 和data Consumer之间的交流是通过pull(拉)和push(推)这两种不同的方式。
什么是pull?
data Consumer决定何时从data Producer那里获取数据,但是data Producer并不会意识到什么时候数据将会被发送给data Consumer。
1498634084 1
什么是push?
data Producer决定何时发送数据给data Consumer,data Consumer不会在接收数据之前意识到它将要接收这个数据。
promise是现在js中最常见的push体系。一个promise发送一个resolved value来注册一个回调,不同于函数的地方是,promise决定何时数据才被推送到这个回调函数中。
rxjs引入了observables,一个observable对象是一个i额产生多值的生产者,并推送给observer。

Observables as generalizations of functions

可观察对象像一个零参的函数,但是允许返回多个值。
订阅一个可观察对象类似于调用一个函数。

Anatomy of an Observable 分析可观察对象

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observable
    创建(create)一个可观察对象,观察者订阅(subscribe)它,执行(execute)它然后给观察者发送next/error/complete通知。他们的执行可能会被处理(disposed)。

Observer 观察者

什么是观察者?
观察者是可观察对象所发送数据的消费者。观察者简单的说是一组回调函数,分别对应一种被可观察对象发送的通知的类型:next,error和complete。
使用观察者需要订阅可观察对象。

Subscription 订阅

什么是订阅?
订阅通常是一个可观察对象的执行。订阅对象有一个重要的方法:unsubscribe,该方法不需要参数,仅仅用来废弃可观察对象所持有的资源。
订阅对象还有两个方法。一个是add,用于把一个订阅加进另一个订阅里。还有一个是remove,用于解除被add添加的子订阅。

let subscription = Observable.subscribe(x => console.log('first:' + x));
let childSubscription = Observable.subscribe(x => console.log('second:' + x));
subscription.add(childSubscription);

以下转自雨异博客

实例方法Operators

1 创建

  • 发射完数据更新自动关闭: from,fromPromise, of,range
  • 不发射直接关闭:empty
  • 抛出异常后关闭:throw
  • 不发射数据也不关闭:never
  • 保持发射数据且不自动关闭:timer,interval,fromEvent
  • 需要手动发射数据且不自动关闭: create,(包括subject的create)

2 转换

  • 1 : 1效果:map , mapTo , flatMap , scan , expand , pluck
  • 1 : N效果:concat , concatAll , concatMap , concatMapTo , merge, mergeAll, mergeMap, mergeMapTo, switchMap, switchMapTo
  • N : 1效果:buffer, bufferCount, bufferTime, bufferWhen
  • 1 : source效果:groupBy, window, windowCount, windowTime, windowWhen
  • 1 : sources效果:partition

3 过滤

  • 防抖动(一段时间内只取最新数据作为一次发射数据,其他数据取消发射):debounce, debounceTime, throttle(和debounce唯一区别是debounce取一段时间内最新的,而throttle忽略这段时间后,发现新值才发送), throttleTime
  • 去重(重叠的发射数据只去第一数据作为发射数据,其他相同数据取消发射):distinct, distinctUntilChanged
  • 定位(根据条件值去一个或部分几个数据作为对应发射数据,其他取消发射):elementAt, first, last, filter, take, takeLatst, takeUntil, takeWhile
  • 跳过(根据条件去除符合条件的,取剩下的值作为每次发射数据):skip, skipUntil, skipWhile, ignoreElements(忽略所有的,等同于empty)
  • 样本:sample, source=source1.sample(source2), 以source2发射数据时来发现最新一次source1发射的数据,作为source的发射数据

4 组合
做个source组合成新的souce

  • concat, concatAll和merge, mergeAll,在转换分类讲过了
  • combineLastest,source = source1.combineLastest(source2, func),source1和source2一旦发射数据,func会触发,拿到source1和source2最新的发射数据,返回新的数据,作为source的发射数据。
  • combineAll,同combineLastest,,source = sources.combineAll()
  • forkJoin,source = Rx.Observable.forkJoin(sources), 所有的sources都关闭后,获取各自最新的发射数组组合为数组,作为source的发射数据
  • zip和forkJoin的区别是,zip是sources都有发送数据时,组合为一个数组作为source的发送数据,而sources任一source关闭了,则取source最后发射的数值。
  • zipAll,同concat对concatAll
  • startWith,source = source1.startWith(value), 表示在source1的最前面注入第一次发射数据
  • withLastestFrom, soruce = source1.withLastestFrom(source2, func), 表示source1每次发射数据时,获取source2最新发射的数据,如果存在则func处理得到新的数组作为source的发射数据

5 判断

  • find和findIndex分别是指定发射数据和发射数据的下标(第几次发送的),应该放到过滤分类才合理
  • isEmpty, every, include等,判断是否为真,判断的结果当做是source的发射数据

6 错误处理

  • catch,source在Operators调用过程中出现的异常,都可以在catch捕获到,同时可以返回新的source,因为出现异常的当前source会自动销毁掉。
  • retry,source = source.retry(times), source的所有发射,重复来几遍。
  • retryWhen,根据条件来决定来几遍,只有当条件为false时才跳出循环。

7 工具

  • do,在每次响应订阅前,可以通过source.do(func),做一些提前处理等任何动作,比如打印一下发射的数据等。
  • delay, delayWhen,每次发送数据时,都延迟一定时间间隔后再发送。
  • observeOn, 设置scheduler,即发射数据的响应方式,Schedulers详细查看地址, 这里不讲解了,项目中应用得不多。
  • subcribeOn, timeInterval设置sheduler
  • toPromise, source转成promise,可以通过promise.then达到source.subscribe的效果
  • toArray,把source所有发射的数据,组成数组输出。

8 计算
把source的所有发射数据进行指定计算后,得出的数据作为新source的发射数据,计算方法分别有:max, min, count, reduce, average等

9 其他

  • cache, source = source1.cache(1);共享source1的订阅结果,即不管source订阅几回,响应方法接收到的发射数据都是同一份。
  • 共享source订阅结果很重要,因为组合等方法组合多个source时,其中包含sourceA,同时sourceA还需要单独订阅其结果,在不用cache情况下,sourceA会产生2个subscription,即2个订阅实例,但是我们更希望是能达到sourceA发生变化时,都能通知到所有的组合sourceA的source。
  • publish,publishSource = source.publish(),让source的订阅的工作延后,即source不会发射数据,而是等到publishSource.connect()调用后才开发发射数据。效果和delay很相似,不同的是可以控制合适发射。
  • share,当source订阅多次,那么每次响应时do都会调用多次,通过share合并响应,则source发射一次数据更新,多次响应当当一次响应处理,do也调用一次。
@chenbj2333 chenbj2333 changed the title 使用可观察对象的数据架构:服务部分 使用可观察对象的数据架构 May 31, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant