diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index e4d9f0e44f..a854abdc95 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -8,7 +8,7 @@ describe('throttle operator', () => { it('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx-|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ']; @@ -24,7 +24,7 @@ describe('throttle operator', () => { it('should simply mirror the source if values are not emitted often enough', () => { const e1 = hot('-a--------b-----c----|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ']; @@ -57,7 +57,7 @@ describe('throttle operator', () => { const e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|'); const unsub = ' ! '; const e1subs = '^ ! '; - const e2 = cold( '------------------| '); + const e2 = cold( '------------------x '); const e2subs = ' ^ ! '; const expected = '-a------------- '; @@ -71,7 +71,7 @@ describe('throttle operator', () => { it('should not break unsubscription chains when result is unsubscribed explicitly', () => { const e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|'); const e1subs = '^ ! '; - const e2 = cold( '------------------| '); + const e2 = cold( '------------------x '); const e2subs = ' ^ ! '; const expected = '-a------------- '; const unsub = ' ! '; @@ -90,7 +90,7 @@ describe('throttle operator', () => { it('should handle a busy producer emitting a regular repeating sequence', () => { const e1 = hot('abcdefabcdefabcdefabcdefa|'); const e1subs = '^ !'; - const e2 = cold('-----| '); + const e2 = cold('-----x '); const e2subs = ['^ ! ', ' ^ ! ', ' ^ ! ', @@ -105,7 +105,19 @@ describe('throttle operator', () => { expectSubscriptions(e2.subscriptions).toBe(e2subs); }); - it('should mirror source if durations are always empty', () => { + it('should mirror source if durations are immediate', () => { + const e1 = hot('abcdefabcdefabcdefabcdefa|'); + const e1subs = '^ !'; + const e2 = cold('x'); + const expected = 'abcdefabcdefabcdefabcdefa|'; + + const result = e1.pipe(throttle(() => e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should mirror source if durations are empty', () => { const e1 = hot('abcdefabcdefabcdefabcdefa|'); const e1subs = '^ !'; const e2 = cold('|'); @@ -162,11 +174,11 @@ describe('throttle operator', () => { it('should throttle using durations of constying lengths', () => { const e1 = hot('abcdefabcdabcdefghabca| '); const e1subs = '^ ! '; - const e2 = [cold('-----| '), - cold( '---| '), - cold( '-------| '), - cold( '--| '), - cold( '----|')]; + const e2 = [cold('-----x '), + cold( '---x '), + cold( '-------x '), + cold( '--x '), + cold( '----x')]; const e2subs = ['^ ! ', ' ^ ! ', ' ^ ! ', @@ -187,8 +199,8 @@ describe('throttle operator', () => { it('should propagate error from duration Observable', () => { const e1 = hot('abcdefabcdabcdefghabca| '); const e1subs = '^ ! '; - const e2 = [cold('-----| '), - cold( '---| '), + const e2 = [cold('-----x '), + cold( '---x '), cold( '-------# ')]; const e2subs = ['^ ! ', ' ^ ! ', @@ -208,7 +220,7 @@ describe('throttle operator', () => { it('should propagate error thrown from durationSelector function', () => { const s1 = hot('--^--x--x--x--x--x--x--e--x--x--x--|'); const s1Subs = '^ !'; - const n1 = cold( '----|'); + const n1 = cold( '----x'); const n1Subs = [' ^ ! ', ' ^ ! ', ' ^ ! ']; @@ -332,7 +344,7 @@ describe('throttle operator', () => { it('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx------|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ', @@ -351,7 +363,7 @@ describe('throttle operator', () => { it('should work for individual values', () => { const s1 = hot('-^-x------------------|'); const s1Subs = '^ !'; - const n1 = cold( '------------------------|'); + const n1 = cold( '------------------------x'); const n1Subs = [' ^ !']; const exp = '--x------------------|'; @@ -364,7 +376,7 @@ describe('throttle operator', () => { it('should emit trailing value after throttle duration when source completes', () => { const e1 = hot('-a--------xy| '); const e1subs = '^ ! '; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ !']; const expected = '-a--------x---(y|)'; @@ -381,7 +393,7 @@ describe('throttle operator', () => { it('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx------|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ ! ', @@ -400,7 +412,7 @@ describe('throttle operator', () => { it('should work for individual values', () => { const s1 = hot('-^-x------------------|'); const s1Subs = '^ !'; - const n1 = cold( '------------------------|'); + const n1 = cold( '------------------------x'); const n1Subs = [' ^ !']; const exp = '--x------------------|'; @@ -410,10 +422,10 @@ describe('throttle operator', () => { expectSubscriptions(n1.subscriptions).toBe(n1Subs); }); - it('should wait for trailing throttle to complete before completing, even if source completes', () => { + it('should wait for trailing throttle before completing, even if source completes', () => { const source = hot( '-^--x--------y---------|'); const sourceSubs = '^ !'; - const duration = cold( '------------------------|'); + const duration = cold( '------------------------x'); const durationSubs = ' ^ !'; const exp = '---x-----------------------(y|)'; @@ -426,7 +438,7 @@ describe('throttle operator', () => { it('should emit trailing value after throttle duration when source completes', () => { const e1 = hot('-a--------x| '); const e1subs = '^ ! '; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ !']; @@ -442,7 +454,7 @@ describe('throttle operator', () => { it('should emit the last trailing value after throttle duration when source completes', () => { const e1 = hot('-a--------xy| '); const e1subs = '^ ! '; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ ! ', ' ^ !']; @@ -458,7 +470,7 @@ describe('throttle operator', () => { it('should complete when source completes if no value is available', () => { const e1 = hot('-a-----|'); const e1subs = '^ !'; - const e2 = cold( '----| '); + const e2 = cold( '----x '); const e2subs = [' ^ ! ', ' ^ !']; const expected = '-----a-|'; diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 9dff66248e..d7c48c848d 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -32,7 +32,7 @@ export const defaultThrottleConfig: ThrottleConfig = { * value arrives, it is forwarded to the output Observable, and then the timer * is enabled by calling the `durationSelector` function with the source value, * which returns the "duration" Observable. When the duration Observable emits a - * value or completes, the timer is disabled, and this process repeats for the + * value, the timer is disabled, and this process repeats for the * next source value. * * ## Example @@ -70,7 +70,7 @@ export function throttle( let throttled: Subscription | null = null; let isComplete = false; - const throttlingDone = () => { + const endThrottling = () => { throttled?.unsubscribe(); throttled = null; if (trailing) { @@ -79,9 +79,14 @@ export function throttle( } }; + const cleanupThrottling = () => { + throttled = null; + isComplete && subscriber.complete(); + }; + const startThrottle = (value: T) => (throttled = innerFrom(durationSelector(value)).subscribe( - new OperatorSubscriber(subscriber, throttlingDone, undefined, throttlingDone) + new OperatorSubscriber(subscriber, endThrottling, undefined, cleanupThrottling) )); const send = () => {