diff --git a/src/internal/operators/mergeInternals.ts b/src/internal/operators/mergeInternals.ts new file mode 100644 index 0000000000..d78c2e4100 --- /dev/null +++ b/src/internal/operators/mergeInternals.ts @@ -0,0 +1,103 @@ +/** @prettier */ +import { Observable } from '../Observable'; +import { from } from '../observable/from'; +import { Subscriber } from '../Subscriber'; +import { ObservableInput } from '../types'; +import { OperatorSubscriber } from './OperatorSubscriber'; + +/** + * A process embodying the general "merge" strategy. This is used in + * `mergeMap` and `mergeScan` because the logic is otherwise nearly identical. + * @param source The original source observable + * @param subscriber The consumer subscriber + * @param project The projection function to get our inner sources + * @param concurrent The number of concurrent inner subscriptions + * @param onBeforeNext Additional logic to apply before nexting to our consumer + * @param onBeforeComplete Additional logic to apply before telling the consumer we're complete. + */ +export function mergeInternals( + source: Observable, + subscriber: Subscriber, + project: (value: T, index: number) => ObservableInput, + concurrent: number, + onBeforeNext?: (innerValue: R) => void, + onBeforeComplete?: () => void +) { + // Buffered values, in the event of going over our concurrency limit + let buffer: T[] = []; + // The number of active inner subscriptions. + let active = 0; + // An index to pass to our accumulator function + let index = 0; + // Whether or not the outer source has completed. + let isComplete = false; + + /** + * Checks to see if we can complete our result or not. + */ + const checkComplete = () => { + // If the outer has completed, and nothing is left in the buffer, + // and we don't have any active inner subscriptions, then we can + // Emit the state and complete. + if (isComplete && !buffer.length && !active) { + // In the case of `mergeScan`, we need additional handling here. + onBeforeComplete?.(); + subscriber.complete(); + } + }; + + const doInnerSub = (value: T) => { + active++; + from(project(value, index++)).subscribe( + new OperatorSubscriber( + subscriber, + (innerValue) => { + // `mergeScan` has additional handling here. For example + // taking the inner value and updating state. + onBeforeNext?.(innerValue); + subscriber.next(innerValue); + }, + // Errors are passed to the destination. + undefined, + () => { + // INNER SOURCE COMPLETE + // Decrement the active count to ensure that the next time + // we try to call `doInnerSub`, the number is accurate. + active--; + // If we have more values in the buffer, try to process those + // Note that this call will increment `active` ahead of the + // next conditional, if there were any more inner subscriptions + // to start. + while (buffer.length && active < concurrent) { + doInnerSub(buffer.shift()!); + } + // Check to see if we can complete, and complete if so. + checkComplete(); + } + ) + ); + }; + + // Subscribe to our source observable. + source.subscribe( + new OperatorSubscriber( + subscriber, + // If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait. + (value) => (active < concurrent ? doInnerSub(value) : buffer.push(value)), + // Errors are passed through + undefined, + () => { + // Outer completed, make a note of it, and check to see if we can complete everything. + isComplete = true; + checkComplete(); + } + ) + ); + + // Additional teardown (for when the destination is torn down). + // Other teardown is added implicitly via subscription above. + return () => { + // Ensure buffered values are released. + buffer = null!; + }; +} diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 79cb1e5a95..3766bbfde2 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -1,11 +1,10 @@ /** @prettier */ import { Observable } from '../Observable'; -import { Subscription } from '../Subscription'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; import { operate } from '../util/lift'; -import { OperatorSubscriber } from './OperatorSubscriber'; +import { mergeInternals } from './mergeInternals'; /* tslint:disable:max-line-length */ export function mergeMap>( @@ -92,100 +91,8 @@ export function mergeMap>( } else if (typeof resultSelector === 'number') { concurrent = resultSelector; } - return operate((source, subscriber) => { - // Whether or not the outer subscription is complete - let isComplete = false; - // The number of active inner subscriptions - let active = 0; - // The index of the value from source (used for projection) - let index = 0; - // The buffered values from the source (used for concurrency) - let buffer: T[] = []; - /** - * Called to check to see if we can complete, and completes the result if - * nothing is active. - */ - const checkComplete = () => isComplete && !active && subscriber.complete(); - - /** - * Attempts to start an inner subscription from a buffered value, - * so long as we don't have more active inner subscriptions than - * the concurrency limit allows. - */ - const tryInnerSub = () => { - while (active < concurrent && buffer.length > 0) { - doInnerSub(buffer.shift()!); - } - }; - - /** - * Creates an inner observable and subscribes to it with the - * given outer value. - * @param value the value to process - */ - const doInnerSub = (value: T) => { - // Subscribe to the inner source - active++; - subscriber.add( - from(project(value, index++)).subscribe( - new OperatorSubscriber( - subscriber, - // INNER SOURCE NEXT - // We got a value from the inner source, emit it from the result. - (innerValue) => subscriber.next(innerValue), - // Errors are sent to the consumer. - undefined, - () => { - // INNER SOURCE COMPLETE - // Decrement the active count to ensure that the next time - // we try to call `doInnerSub`, the number is accurate. - active--; - // If we have more values in the buffer, try to process those - // Note that this call will increment `active` ahead of the - // next conditional, if there were any more inner subscriptions - // to start. - buffer.length && tryInnerSub(); - // Check to see if we can complete, and complete if so. - checkComplete(); - } - ) - ) - ); - }; - - let outerSubs: Subscription; - outerSubs = source.subscribe( - new OperatorSubscriber( - subscriber, - // OUTER SOURCE NEXT - // If we are under our concurrency limit, start the inner subscription with the value - // right away. Otherwise, push it onto the buffer and wait. - (value) => (active < concurrent ? doInnerSub(value) : buffer.push(value)), - // Let errors pass through. - undefined, - () => { - // OUTER SOURCE COMPLETE - // We don't necessarily stop here. If have any pending inner subscriptions - // we need to wait for those to be done first. That includes buffered inners - // that we haven't even subscribed to yet. - isComplete = true; - // If nothing is active, and nothing in the buffer, with no hope of getting any more - // we can complete the result - checkComplete(); - // Be sure to teardown the outer subscription ASAP, in any case. - outerSubs?.unsubscribe(); - } - ) - ); - - // Additional teardown. Called when the destination is torn down. - // Other teardown is registered implicitly above during subscription. - return () => { - // Release buffered values - buffer = null!; - }; - }); + return operate((source, subscriber) => mergeInternals(source, subscriber, project, concurrent)); } /** diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index c8b080113b..881f0849c1 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -1,8 +1,7 @@ /** @prettier */ import { ObservableInput, OperatorFunction } from '../types'; import { operate } from '../util/lift'; -import { OperatorSubscriber } from './OperatorSubscriber'; -import { from } from '../observable/from'; +import { mergeInternals } from './mergeInternals'; /** * Applies an accumulator function over the source Observable where the @@ -48,94 +47,22 @@ export function mergeScan( concurrent = Infinity ): OperatorFunction { return operate((source, subscriber) => { - // Buffered values, in the event of going over our concurrency limit - let buffer: T[] = []; - // The number of active inner subscriptions. - let active = 0; // Whether or not we have gotten any accumulated state. This is used to // decide whether or not to emit in the event of an empty result. let hasState = false; // The accumulated state. let state = seed; - // An index to pass to our accumulator function - let index = 0; - // Whether or not the outer source has completed. - let isComplete = false; - /** - * Checks to see if we can complete our result or not. - */ - const checkComplete = () => { - // If the outer has completed, and nothing is left in the buffer, - // and we don't have any active inner subscriptions, then we can - // Emit the state and complete. - if (isComplete && !buffer.length && !active) { - // TODO: This seems like it might result in a double emission, perhaps bad behavior? - // maybe we should change this in an upcoming major? - !hasState && subscriber.next(state); - subscriber.complete(); - } - }; - - const doInnerSub = (value: T) => { - active++; - from(accumulator(state!, value, index++)).subscribe( - new OperatorSubscriber( - subscriber, - (innerValue) => { - hasState = true; - // Intentially terse. Set the state, then emit it. - subscriber.next((state = innerValue)); - }, - // Errors are passed to the destination. - undefined, - - // TODO: Much of this code is duplicated from mergeMap. Perhaps - // look into a way to unify this. - - () => { - // INNER SOURCE COMPLETE - // Decrement the active count to ensure that the next time - // we try to call `doInnerSub`, the number is accurate. - active--; - // If we have more values in the buffer, try to process those - // Note that this call will increment `active` ahead of the - // next conditional, if there were any more inner subscriptions - // to start. - buffer.length && tryInnerSub(); - // Check to see if we can complete, and complete if so. - checkComplete(); - } - ) - ); - }; - - const tryInnerSub = () => { - while (buffer.length && active < concurrent) { - doInnerSub(buffer.shift()!); - } - }; - - source.subscribe( - new OperatorSubscriber( - subscriber, - // If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait. - (value) => (active < concurrent ? doInnerSub(value) : buffer.push(value)), - // Errors are passed through - undefined, - () => { - // Outer completed, make a note of it, and check to see if we can complete everything. - isComplete = true; - checkComplete(); - } - ) + return mergeInternals( + source, + subscriber, + (value, index) => accumulator(state, value, index), + concurrent, + (value) => { + hasState = true; + state = value; + }, + () => !hasState && subscriber.next(state) ); - - // Additional teardown (for when the destination is torn down). - // Other teardown is added implicitly via subscription above. - return () => { - // Ensure buffered values are released. - buffer = null!; - }; }); }