diff --git a/spec/operators/throttle-spec.js b/spec/operators/throttle-spec.js index d7e1a79b32..99f3838c27 100644 --- a/spec/operators/throttle-spec.js +++ b/spec/operators/throttle-spec.js @@ -1,25 +1,314 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, expectSubscription, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var Scheduler = Rx.Scheduler; describe('Observable.prototype.throttle()', function () { - it('should throttle events', function (done) { - Observable.of(1, 2, 3).throttle(50) - .subscribe(function (x) { - expect(x).toBe(1); - }, null, done); - }); - - it('should throttle events multiple times', function (done) { - var expected = ['1-0', '2-0']; - Observable.concat( - Observable.timer(0, 10).take(3).map(function (x) { return '1-' + x; }), - Observable.timer(80, 10).take(5).map(function (x) { return '2-' + x; }) - ) - .throttle(50) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); - }); -}); \ No newline at end of file + it('should simply mirror the source if values are not emitted often enough', function () { + var e1 = hot('-a--------b-----c----|'); + var e1subs = '^ !'; + var e2 = cold( '----| '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = '-a--------b-----c----|'; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should immediately emit the first value in each time window', function () { + var e1 = hot('-a-xy-----b--x--cxxx-|'); + var e1subs = '^ !'; + var e2 = cold( '----| '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = '-a--------b-----c----|'; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should throttle with duration Observable using next to close the duration', function () { + var e1 = hot('-a-xy-----b--x--cxxx-|'); + var e1subs = '^ !'; + var e2 = cold( '----x-y-z '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = '-a--------b-----c----|'; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should interrupt source and duration when result is unsubscribed early', function () { + var e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|'); + var unsub = ' ! '; + var e1subs = '^ ! '; + var e2 = cold( '------------------| '); + var e2subs = ' ^ ! '; + var expected = '-a------------- '; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence', function () { + var e1 = hot('abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ !'; + var e2 = cold('-----| '); + var e2subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^!']; + var expected = 'a-----a-----a-----a-----a|'; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should mirror source if durations are always empty', function () { + var e1 = hot('abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ !'; + var e2 = cold('|'); + var expected = 'abcdefabcdefabcdefabcdefa|'; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should take only the first value emitted if duration is a never', function () { + var e1 = hot('----abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ !'; + var e2 = cold('-'); + var e2subs = ' ^ !'; + var expected = '----a------------------------|'; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should unsubscribe duration Observable when source raise error', function () { + var e1 = hot('----abcdefabcdefabcdefabcdefa#'); + var e1subs = '^ !'; + var e2 = cold('-'); + var e2subs = ' ^ !'; + var expected = '----a------------------------#'; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should raise error as soon as just-throw duration is used', function () { + var e1 = hot('----abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ ! '; + var e2 = cold('#'); + var e2subs = ' (^!) '; + var expected = '----(a#) '; + + var result = e1.throttle(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should throttle using durations of varying lengths', function () { + var e1 = hot('abcdefabcdabcdefghabca| '); + var e1subs = '^ ! '; + var e2 = [cold('-----| '), + cold( '---| '), + cold( '-------| '), + cold( '--| '), + cold( '----|')]; + var e2subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^! ']; + var expected = 'a-----a---a-------a--a| '; + + var i = 0; + var result = e1.throttle(function () { return e2[i++]; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + for (var j = 0; j < e2.length; j++) { + expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); + } + }); + + it('should propagate error from duration Observable', function () { + var e1 = hot('abcdefabcdabcdefghabca| '); + var e1subs = '^ ! '; + var e2 = [cold('-----| '), + cold( '---| '), + cold( '-------# ')]; + var e2subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = 'a-----a---a------# '; + + var i = 0; + var result = e1.throttle(function () { return e2[i++]; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + for (var j = 0; j < e2.length; j++) { + expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); + } + }); + + it('should propagate error thrown from durationSelector function', function () { + var e1 = hot('abcdefabcdabcdefghabca| '); + var e1subs = '^ ! '; + var e2 = [cold('-----| '), + cold( '---| '), + cold( '-------| ')]; + var e2subs = ['^ ! ', + ' ^ ! ']; + var expected = 'a-----a---# '; + + var i = 0; + var result = e1.throttle(function () { + if (i === 2) { + throw 'error'; + } + return e2[i++]; + }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + for (var j = 0; j < e2subs.length; j++) { + expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); + } + }); + + it('should complete when source does not emit', function () { + var e1 = hot('-----|'); + var subs = '^ !'; + var expected = '-----|'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.throttle(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source does not emit and raises error', function () { + var e1 = hot('-----#'); + var subs = '^ !'; + var expected = '-----#'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.throttle(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle an empty source', function () { + var e1 = cold('|'); + var subs = '(^!)'; + var expected = '|'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.throttle(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a never source', function () { + var e1 = cold('-'); + var subs = '^'; + var expected = '-'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.throttle(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a throw source', function () { + var e1 = cold('#'); + var subs = '(^!)'; + var expected = '#'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.throttle(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should throttle by promise resolves', function (done) { + var e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(50).mapTo(4) + ); + var expected = [1,2,3,4]; + + e1.throttle(function () { + return new Promise(function (resolve) { resolve(42); }); + }).subscribe( + function (x) { + expect(x).toEqual(expected.shift()); }, + function () { + throw 'should not be called'; + }, + function () { + expect(expected.length).toBe(0); + done(); + } + ); + }); + + it('should raise error when promise rejects', function (done) { + var e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(50).mapTo(4) + ); + var expected = [1,2,3]; + var error = new Error('error'); + + e1.throttle(function (x) { + if (x === 3) { + return new Promise(function (resolve, reject) {reject(error);}); + } else { + return new Promise(function (resolve) {resolve(42);}); + } + }).subscribe( + function (x) { + expect(x).toEqual(expected.shift()); }, + function (err) { + expect(err).toBe(error); + expect(expected.length).toBe(0); + done(); + }, + function () { + throw 'should not be called'; + } + ); + }); +}); diff --git a/spec/operators/throttleTime-spec.js b/spec/operators/throttleTime-spec.js new file mode 100644 index 0000000000..564453c84f --- /dev/null +++ b/spec/operators/throttleTime-spec.js @@ -0,0 +1,114 @@ +/* globals describe, it, expect, expectObservable, expectSubscription, hot, cold */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Scheduler = Rx.Scheduler; + +describe('Observable.prototype.throttleTime()', function () { + it('should throttle events by 50 time units', function (done) { + Observable.of(1, 2, 3).throttleTime(50) + .subscribe(function (x) { + expect(x).toBe(1); + }, null, done); + }); + + it('should throttle events multiple times', function (done) { + var expected = ['1-0', '2-0']; + Observable.concat( + Observable.timer(0, 10).take(3).map(function (x) { return '1-' + x; }), + Observable.timer(80, 10).take(5).map(function (x) { return '2-' + x; }) + ) + .throttleTime(50) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); + + it('should simply mirror the source if values are not emitted often enough', function () { + var e1 = hot('-a--------b-----c----|'); + var subs = '^ !'; + var expected = '-a--------b-----c----|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should immediately emit the first value in each time window', function () { + var e1 = hot('-a-x-y----b---x-cx---|'); + var subs = '^ !'; + var expected = '-a--------b-----c----|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence', function () { + var e1 = hot('abcdefabcdefabcdefabcdefa|'); + var subs = '^ !'; + var expected = 'a-----a-----a-----a-----a|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should complete when source does not emit', function () { + var e1 = hot('-----|'); + var subs = '^ !'; + var expected = '-----|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source does not emit and raises error', function () { + var e1 = hot('-----#'); + var expected = '-----#'; + + expectObservable(e1.throttleTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should handle an empty source', function () { + var e1 = cold('|'); + var subs = '(^!)'; + var expected = '|'; + + expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a never source', function () { + var e1 = cold('-'); + var subs = '^'; + var expected = '-'; + + expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a throw source', function () { + var e1 = cold('#'); + var subs = '(^!)'; + var expected = '#'; + + expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should throttle and does not complete when source does not completes', function () { + var e1 = hot('-a--(bc)-------d----------------'); + var unsub = ' !'; + var subs = '^ !'; + var expected = '-a-------------d----------------'; + + expectObservable(e1.throttleTime(50, rxTestScheduler), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should throttle values until source raises error', function () { + var e1 = hot('-a--(bc)-------d---------------#'); + var subs = '^ !'; + var expected = '-a-------------d---------------#'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index f262c579b2..75322fbd7d 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -75,7 +75,8 @@ export interface CoreOperators { switchMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take?: (count: number) => Observable; takeUntil?: (notifier: Observable) => Observable; - throttle?: (delay: number, scheduler?: Scheduler) => Observable; + throttle?: (durationSelector: (value: T) => Observable | Promise) => Observable; + throttleTime?: (delay: number, scheduler?: Scheduler) => Observable; timeout?: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith?: (due: number | Date, withObservable: Observable, scheduler?: Scheduler) => Observable | Observable; toArray?: () => Observable; diff --git a/src/Observable.ts b/src/Observable.ts index d8602e51ec..f861100ddb 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -228,7 +228,8 @@ export class Observable implements CoreOperators { switchMapTo: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take: (count: number) => Observable; takeUntil: (notifier: Observable) => Observable; - throttle: (delay: number, scheduler?: Scheduler) => Observable; + throttle: (durationSelector: (value: T) => Observable | Promise) => Observable; + throttleTime: (delay: number, scheduler?: Scheduler) => Observable; timeout: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith: (due: number | Date, withObservable: Observable, scheduler?: Scheduler) => Observable | Observable; toArray: () => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 064480827b..238862f58c 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -288,6 +288,9 @@ observableProto.takeUntil = takeUntil; import {throttle} from './operators/throttle'; observableProto.throttle = throttle; +import {throttleTime} from './operators/throttleTime'; +observableProto.throttleTime = throttleTime; + import {timeInterval} from './operators/extended/timeInterval'; observableProto.timeInterval = timeInterval; diff --git a/src/Rx.ts b/src/Rx.ts index 6485b998eb..88ac4b7de7 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -242,6 +242,9 @@ observableProto.takeUntil = takeUntil; import {throttle} from './operators/throttle'; observableProto.throttle = throttle; +import {throttleTime} from './operators/throttleTime'; +observableProto.throttleTime = throttleTime; + import {timeout} from './operators/timeout'; observableProto.timeout = timeout; diff --git a/src/operators/throttle.ts b/src/operators/throttle.ts index b473fc901e..36b2babe72 100644 --- a/src/operators/throttle.ts +++ b/src/operators/throttle.ts @@ -1,19 +1,23 @@ import {Operator} from '../Operator'; +import {Observable} from '../Observable'; +import {PromiseObservable} from '../observables/PromiseObservable'; import {Subscriber} from '../Subscriber'; -import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; -import {nextTick} from '../schedulers/nextTick'; -export function throttle(delay: number, scheduler: Scheduler = nextTick) { - return this.lift(new ThrottleOperator(delay, scheduler)); +import {tryCatch} from '../util/tryCatch'; +import {isPromise} from '../util/isPromise'; +import {errorObject} from '../util/errorObject'; + +export function throttle(durationSelector: (value: T) => Observable | Promise): Observable { + return this.lift(new ThrottleOperator(durationSelector)); } class ThrottleOperator implements Operator { - constructor(private delay: number, private scheduler: Scheduler) { + constructor(private durationSelector: (value: T) => Observable | Promise) { } call(subscriber: Subscriber): Subscriber { - return new ThrottleSubscriber(subscriber, this.delay, this.scheduler); + return new ThrottleSubscriber(subscriber, this.durationSelector); } } @@ -21,31 +25,60 @@ class ThrottleSubscriber extends Subscriber { private throttled: Subscription; constructor(destination: Subscriber, - private delay: number, - private scheduler: Scheduler) { + private durationSelector: (value: T) => Observable | Promise) { super(destination); } - _next(value: T) { + _next(value: T): void { if (!this.throttled) { - this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { value, subscriber: this })); + const destination = this.destination; + let duration = tryCatch(this.durationSelector)(value); + if (duration === errorObject) { + destination.error(errorObject.e); + return; + } + if (isPromise(duration)) { + duration = PromiseObservable.create(duration); + } + this.add(this.throttled = duration._subscribe(new ThrottleDurationSelectorSubscriber(this))); + destination.next(value); } } - throttledNext(value: T) { + _error(err: any): void { + this.clearThrottle(); + super._error(err); + } + + _complete(): void { this.clearThrottle(); - this.destination.next(value); + super._complete(); } - clearThrottle() { + clearThrottle(): void { const throttled = this.throttled; if (throttled) { throttled.unsubscribe(); this.remove(throttled); + this.throttled = null; } } } -function dispatchNext({ value, subscriber }) { - subscriber.throttledNext(value); -} \ No newline at end of file +class ThrottleDurationSelectorSubscriber extends Subscriber { + constructor(private parent: ThrottleSubscriber) { + super(null); + } + + _next(unused: T): void { + this.parent.clearThrottle(); + } + + _error(err): void { + this.parent.error(err); + } + + _complete(): void { + this.parent.clearThrottle(); + } +} diff --git a/src/operators/throttleTime.ts b/src/operators/throttleTime.ts new file mode 100644 index 0000000000..0c4f53e48f --- /dev/null +++ b/src/operators/throttleTime.ts @@ -0,0 +1,48 @@ +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {Scheduler} from '../Scheduler'; +import {Subscription} from '../Subscription'; +import {nextTick} from '../schedulers/nextTick'; + +export function throttleTime(delay: number, scheduler: Scheduler = nextTick) { + return this.lift(new ThrottleTimeOperator(delay, scheduler)); +} + +class ThrottleTimeOperator implements Operator { + constructor(private delay: number, private scheduler: Scheduler) { + } + + call(subscriber: Subscriber): Subscriber { + return new ThrottleTimeSubscriber(subscriber, this.delay, this.scheduler); + } +} + +class ThrottleTimeSubscriber extends Subscriber { + private throttled: Subscription; + + constructor(destination: Subscriber, + private delay: number, + private scheduler: Scheduler) { + super(destination); + } + + _next(value: T) { + if (!this.throttled) { + this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { subscriber: this })); + this.destination.next(value); + } + } + + clearThrottle() { + const throttled = this.throttled; + if (throttled) { + throttled.unsubscribe(); + this.remove(throttled); + this.throttled = null; + } + } +} + +function dispatchNext({ subscriber }) { + subscriber.clearThrottle(); +} \ No newline at end of file