Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Teardown first #7448

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 6 additions & 24 deletions packages/observable/src/observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,6 @@ export interface SubscriberOverrides<T> {
* will be handled and passed to the destination's `error` method.
*/
complete?: () => void;
/**
* If provided, this function will be called after all teardown has occurred
* for this {@link Subscriber}. This is generally used for cleanup purposes
* during operator development.
*/
finalize?: () => void;
}

/**
Expand All @@ -248,8 +242,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
protected readonly _errorOverride: ((err: any) => void) | null = null;
/** @internal */
protected readonly _completeOverride: (() => void) | null = null;
/** @internal */
protected readonly _onFinalize: (() => void) | null = null;

/**
* @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead.
Expand Down Expand Up @@ -283,7 +275,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
this._nextOverride = overrides?.next ?? null;
this._errorOverride = overrides?.error ?? null;
this._completeOverride = overrides?.complete ?? null;
this._onFinalize = overrides?.finalize ?? null;

// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -355,7 +346,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
if (!this.closed) {
this.isStopped = true;
super.unsubscribe();
this._onFinalize?.();
}
}

Expand All @@ -364,19 +354,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}

protected _error(err: any): void {
try {
this.destination.error(err);
} finally {
this.unsubscribe();
}
this.unsubscribe();
this.destination.error(err);
}

protected _complete(): void {
try {
this.destination.complete();
} finally {
this.unsubscribe();
}
this.unsubscribe();
this.destination.complete();
}
}

Expand Down Expand Up @@ -428,22 +412,20 @@ function overrideNext<T>(this: Subscriber<T>, value: T): void {
}

function overrideError(this: Subscriber<unknown>, err: any): void {
this.unsubscribe();
try {
this._errorOverride!(err);
} catch (error) {
this.destination.error(error);
} finally {
this.unsubscribe();
}
}

function overrideComplete(this: Subscriber<unknown>): void {
this.unsubscribe();
try {
this._completeOverride!();
} catch (error) {
this.destination.error(error);
} finally {
this.unsubscribe();
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/rxjs/spec/observables/dom/webSocket-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ describe('webSocket', () => {
});
socket.triggerClose({ wasClean: true });

expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete', 'B unsub']);
expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete']);
});

it('should not close the socket until all subscriptions complete', () => {
Expand Down
15 changes: 13 additions & 2 deletions packages/rxjs/src/internal/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Subscriber, Subscription} from '@rxjs/observable';
import type { Subscriber, Subscription } from '@rxjs/observable';
import { Observable, operate } from '@rxjs/observable';
import type { NextObserver } from '../../types.js';

Expand Down Expand Up @@ -219,8 +219,11 @@ export class WebSocketSubject<In, Out = In> extends Observable<Out> {
multiplex(subMsg: () => In, unsubMsg: () => In, messageFilter: (value: Out) => boolean) {
return new Observable<Out>((destination) => {
this.next(subMsg());
let isUnsub = true;
destination.add(() => {
this.next(unsubMsg());
if (isUnsub) {
this.next(unsubMsg());
}
});
this.subscribe(
operate({
Expand All @@ -230,6 +233,14 @@ export class WebSocketSubject<In, Out = In> extends Observable<Out> {
destination.next(x);
}
},
error: (err) => {
isUnsub = false;
destination.error(err);
},
complete: () => {
isUnsub = false;
destination.complete();
},
})
);
});
Expand Down
6 changes: 3 additions & 3 deletions packages/rxjs/src/internal/observable/forkJoin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ export function forkJoin(...args: any[]): Observable<any> {
}
values[sourceIndex] = value;
},
complete: () => remainingCompletions--,
finalize: () => {
if (!remainingCompletions || !hasValue) {
complete: () => {
remainingCompletions--;
if (remainingCompletions <= 0 || !hasValue) {
if (remainingEmissions === 0) {
destination.next(keys ? createObject(keys, values) : values);
destination.complete();
Expand Down
9 changes: 5 additions & 4 deletions packages/rxjs/src/internal/operators/bufferCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
let buffers: T[][] = [];
let count = 0;

destination.add(() => {
// Clean up our memory when we finalize
buffers = null!;
});

source.subscribe(
operate({
destination,
Expand Down Expand Up @@ -108,10 +113,6 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
}
destination.complete();
},
finalize: () => {
// Clean up our memory when we finalize
buffers = null!;
},
})
);
});
Expand Down
63 changes: 32 additions & 31 deletions packages/rxjs/src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
// this is only really used for when *just* the buffer time span is passed.
let restartOnEmit = false;

destination.add(() => {
bufferRecords = null;
});

/**
* Does the work of emitting the buffer from the record, ensuring that the
* record is removed before the emission so reentrant code (from some custom scheduling, perhaps)
Expand Down Expand Up @@ -127,36 +131,33 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper

startBuffer();

const bufferTimeSubscriber = operate({
destination,
next: (value: T) => {
// Copy the records, so if we need to remove one we
// don't mutate the array. It's hard, but not impossible to
// set up a buffer time that could mutate the array and
// cause issues here.
const recordsCopy = bufferRecords!.slice();
for (const record of recordsCopy) {
// Loop over all buffers and
const { buffer } = record;
buffer.push(value);
// If the buffer is over the max size, we need to emit it.
maxBufferSize <= buffer.length && emit(record);
}
},
complete: () => {
// The source completed, emit all of the active
// buffers we have before we complete.
while (bufferRecords?.length) {
destination.next(bufferRecords.shift()!.buffer);
}
bufferTimeSubscriber?.unsubscribe();
destination.complete();
destination.unsubscribe();
},
// Clean up
finalize: () => (bufferRecords = null),
});

source.subscribe(bufferTimeSubscriber);
source.subscribe(
operate({
destination,
next: (value: T) => {
// Copy the records, so if we need to remove one we
// don't mutate the array. It's hard, but not impossible to
// set up a buffer time that could mutate the array and
// cause issues here.
const recordsCopy = bufferRecords!.slice();
for (const record of recordsCopy) {
// Loop over all buffers and
const { buffer } = record;
buffer.push(value);
// If the buffer is over the max size, we need to emit it.
maxBufferSize <= buffer.length && emit(record);
}
},
complete: () => {
// The source completed, emit all of the active
// buffers we have before we complete.
while (bufferRecords?.length) {
destination.next(bufferRecords.shift()!.buffer);
}
destination.complete();
destination.unsubscribe();
},
})
);
});
}
20 changes: 11 additions & 9 deletions packages/rxjs/src/internal/operators/bufferWhen.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Subscriber} from '@rxjs/observable';
import type { Subscriber } from '@rxjs/observable';
import { operate, Observable, from } from '@rxjs/observable';
import type { ObservableInput, OperatorFunction } from '../types.js';
import { noop } from '../util/noop.js';
Expand Down Expand Up @@ -43,14 +43,18 @@ import { noop } from '../util/noop.js';
*/
export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> {
return (source) =>
new Observable((subscriber) => {
new Observable((destination) => {
// The buffer we keep and emit.
let buffer: T[] | null = null;
// A reference to the subscriber used to subscribe to
// the closing notifier. We need to hold this so we can
// end the subscription after the first notification.
let closingSubscriber: Subscriber<T> | null = null;

destination.add(() => {
buffer = closingSubscriber = null!;
});

// Ends the previous closing notifier subscription, so it
// terminates after the first emission, then emits
// the current buffer if there is one, starts a new buffer, and starts a
Expand All @@ -62,12 +66,12 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
// emit the buffer if we have one, and start a new buffer.
const b = buffer;
buffer = [];
b && subscriber.next(b);
b && destination.next(b);

// Get a new closing notifier and subscribe to it.
from(closingSelector()).subscribe(
(closingSubscriber = operate({
destination: subscriber,
destination,
next: openBuffer,
complete: noop,
}))
Expand All @@ -80,17 +84,15 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
// Subscribe to our source.
source.subscribe(
operate({
destination: subscriber,
destination,
// Add every new value to the current buffer.
next: (value) => buffer?.push(value),
// When we complete, emit the buffer if we have one,
// then complete the result.
complete: () => {
buffer && subscriber.next(buffer);
subscriber.complete();
buffer && destination.next(buffer);
destination.complete();
},
// Release memory on finalization
finalize: () => (buffer = closingSubscriber = null!),
})
);
});
Expand Down
10 changes: 5 additions & 5 deletions packages/rxjs/src/internal/operators/debounce.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Subscriber} from '@rxjs/observable';
import type { Subscriber } from '@rxjs/observable';
import { operate, Observable, from } from '@rxjs/observable';
import type { MonoTypeOperatorFunction, ObservableInput } from '../types.js';
import { noop } from '../util/noop.js';
Expand Down Expand Up @@ -69,6 +69,10 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
// The subscriber/subscription for the current debounce, if there is one.
let durationSubscriber: Subscriber<any> | null = null;

destination.add(() => {
lastValue = durationSubscriber = null;
});

const emit = () => {
// Unsubscribe any current debounce subscription we have,
// we only cared about the first notification from it, and we
Expand Down Expand Up @@ -106,10 +110,6 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
emit();
destination.complete();
},
finalize: () => {
// Finalization.
lastValue = durationSubscriber = null;
},
})
);
});
Expand Down
8 changes: 4 additions & 4 deletions packages/rxjs/src/internal/operators/debounceTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
let lastValue: T;
let activeTask: Subscription | void;

destination.add(() => {
lastValue = activeTask = null!;
});

source.subscribe(
operate({
destination,
Expand Down Expand Up @@ -94,10 +98,6 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
}
destination.complete();
},
finalize: () => {
// Finalization.
lastValue = activeTask = null!;
},
})
);
});
Expand Down
21 changes: 15 additions & 6 deletions packages/rxjs/src/internal/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ export function groupBy<T, K, R>(
// A lookup for the groups that we have so far.
const groups = new Map<K, SubjectLike<any>>();

destination.add(() => {
// Free up memory.
groups.clear();
});

// Used for notifying all groups and the subscriber in the same way.
const notify = (cb: (group: Observer<any>) => void) => {
groups.forEach(cb);
Expand Down Expand Up @@ -202,9 +207,18 @@ export function groupBy<T, K, R>(
// Our duration notified! We can complete the group.
// The group will be removed from the map in the finalization phase.
group!.complete();
groups.delete(key);
durationSubscriber?.unsubscribe();
},
error: (err) => {
group!.error(err);
groups.delete(key);
durationSubscriber?.unsubscribe();
},
complete: () => {
groups.delete(key);
durationSubscriber?.unsubscribe();
},
finalize: () => groups.delete(key),
});

// Start our duration notifier.
Expand All @@ -222,11 +236,6 @@ export function groupBy<T, K, R>(
error: handleError,
// Source completes.
complete: () => notify((consumer) => consumer.complete()),
// Free up memory.
// When the source subscription is _finally_ torn down, release the subjects and keys
// in our groups Map, they may be quite large and we don't want to keep them around if we
// don't have to.
finalize: () => groups.clear(),
});

// Subscribe to the source
Expand Down
Loading
Loading