From f81b5665f556c668a39ee470520681ccb10107fe Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 11 Nov 2015 13:59:14 +0200 Subject: [PATCH 1/2] test(throttleTime): add tests for throttleTime operator Partially resolves issue #666. --- spec/operators/throttle-spec.js | 25 ------ spec/operators/throttleTime-spec.js | 114 ++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 25 deletions(-) delete mode 100644 spec/operators/throttle-spec.js create mode 100644 spec/operators/throttleTime-spec.js diff --git a/spec/operators/throttle-spec.js b/spec/operators/throttle-spec.js deleted file mode 100644 index d7e1a79b32..0000000000 --- a/spec/operators/throttle-spec.js +++ /dev/null @@ -1,25 +0,0 @@ -/* globals describe, it, expect */ -var Rx = require('../../dist/cjs/Rx'); -var Observable = Rx.Observable; -var Scheduler = Rx.Scheduler; - -describe('Observable.prototype.throttle()', function () { - it('should throttle events', function (done) { - Observable.of(1, 2, 3).throttle(50) - .subscribe(function (x) { - expect(x).toBe(1); - }, null, done); - }); - - it('should throttle events multiple times', function (done) { - var expected = ['1-0', '2-0']; - Observable.concat( - Observable.timer(0, 10).take(3).map(function (x) { return '1-' + x; }), - Observable.timer(80, 10).take(5).map(function (x) { return '2-' + x; }) - ) - .throttle(50) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); - }); -}); \ No newline at end of file diff --git a/spec/operators/throttleTime-spec.js b/spec/operators/throttleTime-spec.js new file mode 100644 index 0000000000..564453c84f --- /dev/null +++ b/spec/operators/throttleTime-spec.js @@ -0,0 +1,114 @@ +/* globals describe, it, expect, expectObservable, expectSubscription, hot, cold */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Scheduler = Rx.Scheduler; + +describe('Observable.prototype.throttleTime()', function () { + it('should throttle events by 50 time units', function (done) { + Observable.of(1, 2, 3).throttleTime(50) + .subscribe(function (x) { + expect(x).toBe(1); + }, null, done); + }); + + it('should throttle events multiple times', function (done) { + var expected = ['1-0', '2-0']; + Observable.concat( + Observable.timer(0, 10).take(3).map(function (x) { return '1-' + x; }), + Observable.timer(80, 10).take(5).map(function (x) { return '2-' + x; }) + ) + .throttleTime(50) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); + + it('should simply mirror the source if values are not emitted often enough', function () { + var e1 = hot('-a--------b-----c----|'); + var subs = '^ !'; + var expected = '-a--------b-----c----|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should immediately emit the first value in each time window', function () { + var e1 = hot('-a-x-y----b---x-cx---|'); + var subs = '^ !'; + var expected = '-a--------b-----c----|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence', function () { + var e1 = hot('abcdefabcdefabcdefabcdefa|'); + var subs = '^ !'; + var expected = 'a-----a-----a-----a-----a|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should complete when source does not emit', function () { + var e1 = hot('-----|'); + var subs = '^ !'; + var expected = '-----|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source does not emit and raises error', function () { + var e1 = hot('-----#'); + var expected = '-----#'; + + expectObservable(e1.throttleTime(10, rxTestScheduler)).toBe(expected); + }); + + it('should handle an empty source', function () { + var e1 = cold('|'); + var subs = '(^!)'; + var expected = '|'; + + expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a never source', function () { + var e1 = cold('-'); + var subs = '^'; + var expected = '-'; + + expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a throw source', function () { + var e1 = cold('#'); + var subs = '(^!)'; + var expected = '#'; + + expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should throttle and does not complete when source does not completes', function () { + var e1 = hot('-a--(bc)-------d----------------'); + var unsub = ' !'; + var subs = '^ !'; + var expected = '-a-------------d----------------'; + + expectObservable(e1.throttleTime(50, rxTestScheduler), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should throttle values until source raises error', function () { + var e1 = hot('-a--(bc)-------d---------------#'); + var subs = '^ !'; + var expected = '-a-------------d---------------#'; + + expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); +}); \ No newline at end of file From 5bcd14cd812c82c7c515d41196d2a709c3461a1b Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 11 Nov 2015 14:18:11 +0200 Subject: [PATCH 2/2] fix(throttleTime): fix and rename throttleTime operator Rename throttle to throttleTime. Fix this operator to correctly take in the first 'next' value but ignore subsequent 'next' values for a specified time period. Related to issue #666. --- src/CoreOperators.ts | 2 +- src/Observable.ts | 2 +- src/Rx.KitchenSink.ts | 4 ++-- src/Rx.ts | 4 ++-- .../{throttle.ts => throttleTime.ts} | 23 ++++++++----------- 5 files changed, 16 insertions(+), 19 deletions(-) rename src/operators/{throttle.ts => throttleTime.ts} (59%) diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index f262c579b2..70ead563c1 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -75,7 +75,7 @@ export interface CoreOperators { switchMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take?: (count: number) => Observable; takeUntil?: (notifier: Observable) => Observable; - throttle?: (delay: number, scheduler?: Scheduler) => Observable; + throttleTime?: (delay: number, scheduler?: Scheduler) => Observable; timeout?: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith?: (due: number | Date, withObservable: Observable, scheduler?: Scheduler) => Observable | Observable; toArray?: () => Observable; diff --git a/src/Observable.ts b/src/Observable.ts index d8602e51ec..ffe050aac5 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -228,7 +228,7 @@ export class Observable implements CoreOperators { switchMapTo: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take: (count: number) => Observable; takeUntil: (notifier: Observable) => Observable; - throttle: (delay: number, scheduler?: Scheduler) => Observable; + throttleTime: (delay: number, scheduler?: Scheduler) => Observable; timeout: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith: (due: number | Date, withObservable: Observable, scheduler?: Scheduler) => Observable | Observable; toArray: () => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 064480827b..395037279e 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -285,8 +285,8 @@ observableProto.take = take; import {takeUntil} from './operators/takeUntil'; observableProto.takeUntil = takeUntil; -import {throttle} from './operators/throttle'; -observableProto.throttle = throttle; +import {throttleTime} from './operators/throttleTime'; +observableProto.throttleTime = throttleTime; import {timeInterval} from './operators/extended/timeInterval'; observableProto.timeInterval = timeInterval; diff --git a/src/Rx.ts b/src/Rx.ts index 6485b998eb..fe7c04983b 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -239,8 +239,8 @@ observableProto.take = take; import {takeUntil} from './operators/takeUntil'; observableProto.takeUntil = takeUntil; -import {throttle} from './operators/throttle'; -observableProto.throttle = throttle; +import {throttleTime} from './operators/throttleTime'; +observableProto.throttleTime = throttleTime; import {timeout} from './operators/timeout'; observableProto.timeout = timeout; diff --git a/src/operators/throttle.ts b/src/operators/throttleTime.ts similarity index 59% rename from src/operators/throttle.ts rename to src/operators/throttleTime.ts index b473fc901e..0c4f53e48f 100644 --- a/src/operators/throttle.ts +++ b/src/operators/throttleTime.ts @@ -4,20 +4,20 @@ import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; import {nextTick} from '../schedulers/nextTick'; -export function throttle(delay: number, scheduler: Scheduler = nextTick) { - return this.lift(new ThrottleOperator(delay, scheduler)); +export function throttleTime(delay: number, scheduler: Scheduler = nextTick) { + return this.lift(new ThrottleTimeOperator(delay, scheduler)); } -class ThrottleOperator implements Operator { +class ThrottleTimeOperator implements Operator { constructor(private delay: number, private scheduler: Scheduler) { } call(subscriber: Subscriber): Subscriber { - return new ThrottleSubscriber(subscriber, this.delay, this.scheduler); + return new ThrottleTimeSubscriber(subscriber, this.delay, this.scheduler); } } -class ThrottleSubscriber extends Subscriber { +class ThrottleTimeSubscriber extends Subscriber { private throttled: Subscription; constructor(destination: Subscriber, @@ -28,24 +28,21 @@ class ThrottleSubscriber extends Subscriber { _next(value: T) { if (!this.throttled) { - this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { value, subscriber: this })); + this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, { subscriber: this })); + this.destination.next(value); } } - throttledNext(value: T) { - this.clearThrottle(); - this.destination.next(value); - } - clearThrottle() { const throttled = this.throttled; if (throttled) { throttled.unsubscribe(); this.remove(throttled); + this.throttled = null; } } } -function dispatchNext({ value, subscriber }) { - subscriber.throttledNext(value); +function dispatchNext({ subscriber }) { + subscriber.clearThrottle(); } \ No newline at end of file