Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove GroupBySubscriber #6806

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/internal/operators/OperatorSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
* this handler are sent to the `destination` error handler.
* @param onFinalize Additional teardown logic here. This will only be called on teardown if the
* subscriber itself is not already closed. This is called after all other teardown logic is executed.
* @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe.
* NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription
* to the resulting observable does not actually disconnect from the source if there are active subscriptions
* to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!)
*/
constructor(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onComplete?: () => void,
onError?: (err: any) => void,
private onFinalize?: () => void
private onFinalize?: () => void,
private shouldUnsubscribe?: () => boolean
) {
// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -75,9 +80,11 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
}

unsubscribe() {
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
}
}
47 changes: 17 additions & 30 deletions src/internal/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,20 @@ export function groupBy<T, K, R>(
// next call from the source.
const handleError = (err: any) => notify((consumer) => consumer.error(err));

// The number of actively subscribed groups
let activeGroups = 0;

// Whether or not teardown was attempted on this subscription.
let teardownAttempted = false;

// Capturing a reference to this, because we need a handle to it
// in `createGroupedObservable` below. This is what we use to
// subscribe to our source observable. This sometimes needs to be unsubscribed
// out-of-band with our `subscriber` which is the downstream subscriber, or destination,
// in cases where a user unsubscribes from the main resulting subscription, but
// still has groups from this subscription subscribed and would expect values from it
// Consider: `source.pipe(groupBy(fn), take(2))`.
const groupBySourceSubscriber = new GroupBySubscriber(
const groupBySourceSubscriber = new OperatorSubscriber(
subscriber,
(value: T) => {
// Because we have to notify all groups of any errors that occur in here,
Expand Down Expand Up @@ -234,7 +240,14 @@ export function groupBy<T, K, R>(
// When the source subscription is _finally_ torn down, release the subjects and keys
// in our groups Map, they may be quite large and we don't want to keep them around if we
// don't have to.
() => groups.clear()
() => groups.clear(),
() => {
teardownAttempted = true;
// We only kill our subscription to the source if we have
// no active groups. As stated above, consider this scenario:
// source$.pipe(groupBy(fn), take(2)).
return activeGroups === 0;
}
);

// Subscribe to the source
Expand All @@ -247,16 +260,14 @@ export function groupBy<T, K, R>(
*/
function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
const result: any = new Observable<T>((groupSubscriber) => {
groupBySourceSubscriber.activeGroups++;
activeGroups++;
const innerSub = groupSubject.subscribe(groupSubscriber);
return () => {
innerSub.unsubscribe();
// We can kill the subscription to our source if we now have no more
// active groups subscribed, and a teardown was already attempted on
// the source.
--groupBySourceSubscriber.activeGroups === 0 &&
groupBySourceSubscriber.teardownAttempted &&
groupBySourceSubscriber.unsubscribe();
--activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
};
});
result.key = key;
Expand All @@ -265,30 +276,6 @@ export function groupBy<T, K, R>(
});
}

/**
* This was created because groupBy is a bit unique, in that emitted groups that have
* subscriptions have to keep the subscription to the source alive until they
* are torn down.
*/
class GroupBySubscriber<T> extends OperatorSubscriber<T> {
/**
* The number of actively subscribed groups
*/
activeGroups = 0;
/**
* Whether or not teardown was attempted on this subscription.
*/
teardownAttempted = false;

unsubscribe() {
this.teardownAttempted = true;
// We only kill our subscription to the source if we have
// no active groups. As stated above, consider this scenario:
// source$.pipe(groupBy(fn), take(2)).
this.activeGroups === 0 && super.unsubscribe();
}
}

/**
* An observable of values that is the emitted by the result of a {@link groupBy} operator,
* contains a `key` property for the grouping.
Expand Down