From 889f84a49d2664eddb24ee5c4ae7f20fb7b9248e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 2 Apr 2018 09:09:35 -0700 Subject: [PATCH] fix(skipUntil): properly manages notifier subscription - No longer waits until notifier is complete to complete resulting observable - Unsubs from notifier after first notification - Updates tests to be correct - Corrects some grammar in test descriptions fixes #1886 --- src/internal/operators/skipUntil.ts | 30 ++++++++++------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index ced640f9fe..24f40ee350 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -4,7 +4,8 @@ import { Observable } from '../Observable'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; -import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types'; +import { Subscription } from '../Subscription'; /** * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. @@ -26,8 +27,8 @@ class SkipUntilOperator implements Operator { constructor(private notifier: Observable) { } - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier)); + call(destination: Subscriber, source: any): TeardownLogic { + return source.subscribe(new SkipUntilSubscriber(destination, this.notifier)); } } @@ -39,12 +40,11 @@ class SkipUntilOperator implements Operator { class SkipUntilSubscriber extends OuterSubscriber { private hasValue: boolean = false; - private isInnerStopped: boolean = false; + private innerSubscription: Subscription; - constructor(destination: Subscriber, - notifier: Observable) { + constructor(destination: Subscriber, notifier: ObservableInput) { super(destination); - this.add(subscribeToResult(this, notifier)); + this.add(this.innerSubscription = subscribeToResult(this, notifier)); } protected _next(value: T) { @@ -53,24 +53,14 @@ class SkipUntilSubscriber extends OuterSubscriber { } } - protected _complete() { - if (this.isInnerStopped) { - super._complete(); - } else { - this.unsubscribe(); - } - } - notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { this.hasValue = true; + this.innerSubscription.unsubscribe(); } - notifyComplete(): void { - this.isInnerStopped = true; - if (this.isStopped) { - super._complete(); - } + notifyComplete() { + /* do nothing */ } }