diff --git a/src/core.ts b/src/core.ts index 5475a54..41ea7e5 100644 --- a/src/core.ts +++ b/src/core.ts @@ -2159,8 +2159,22 @@ export class MemoryStream extends Stream { } _add(il: InternalListener): void { - if (this._has) { il._n(this._v); } - super._add(il); + const ta = this._target; + if (ta !== NO) return ta._add(il); + const a = this._ils; + a.push(il); + if (a.length > 1) { + if (this._has) il._n(this._v); + return; + } + if (this._stopID !== NO) { + if (this._has) il._n(this._v); + clearTimeout(this._stopID); + this._stopID = NO; + } else if (this._has) il._n(this._v); else { + const p = this._prod; + if (p !== NO) p._start(this); + } } _stopNow() { diff --git a/tests/memoryStream.ts b/tests/memoryStream.ts index 53b773b..e5602ca 100644 --- a/tests/memoryStream.ts +++ b/tests/memoryStream.ts @@ -161,4 +161,108 @@ describe('MemoryStream', () => { }, }); }); + + /** + * When an operator listener is about to be added to a memory stream, the + * operator will synchronously receive the remembered value, and may + * synchronously remove itself from the memory stream. This happens in the + * case of e.g. take(). In those cases, the operator should NOT be added AFTER + * it just "got the remembered value and removed itself", because that would + * create leaky executions of the stream. This was reported as bug #53 in + * GitHub. + */ + it('should not allow an operator listener to be indefinitely attached', (done) => { + let debugCalled = 0; + const debugExpected = [42, 0]; + const source$ = xs.periodic(100).startWith(42) + .debug(x => { + debugCalled += 1; + assert.strictEqual(debugExpected.length > 0, true); + assert.strictEqual(x, debugExpected.shift()); + }); + + const expectedA = [42, 0]; + const expectedB = [42]; + let completeCalled = 0; + + source$.take(2).addListener({ + next: (x) => { + assert.strictEqual(x, expectedA.shift()); + }, + error: (err) => {}, + complete: () => { + completeCalled += 1; + }, + }); + + source$.take(1).addListener({ + next: (x) => { + assert.strictEqual(x, expectedB.shift()); + }, + error: (err) => {}, + complete: () => { + completeCalled += 1; + } + }); + + setTimeout(() => { + assert.strictEqual(debugExpected.length, 0); + assert.strictEqual(expectedA.length, 0); + assert.strictEqual(expectedB.length, 0); + assert.strictEqual(completeCalled, 2); + assert.strictEqual(debugCalled, 2); + done(); + }, 500); + }); + + it('should emit remembered value also when cancelling a stop', (done) => { + const expectedA = [1]; + const expectedB = [1, 2]; + let completeCalled = 0; + let doneCalled = 0; + + const source$ = xs.createWithMemory({ + start(listener: Listener) { + listener.next(1); + setTimeout(() => { + listener.next(2); + listener.complete(); + }, 100); + }, + stop() { + doneCalled += 1; + }, + }); + + const listenerA = { + next: (x: number) => { + assert.strictEqual(x, expectedA.shift()); + source$.removeListener(listenerA); + }, + error: (err: any) => {}, + complete: () => { + completeCalled += 1; + }, + }; + + source$.addListener(listenerA); + + source$.addListener({ + next: (x) => { + assert.strictEqual(x, expectedB.shift()); + }, + error: (err) => {}, + complete: () => { + completeCalled += 1; + } + }); + + setTimeout(() => { + assert.strictEqual(expectedA.length, 0); + assert.strictEqual(expectedB.length, 0); + assert.strictEqual(completeCalled, 1); + assert.strictEqual(doneCalled, 1); + done(); + }, 300); + }); });