From 47e67ff8ca02bdd0e4f34c6069a0290f79979361 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Mon, 24 Oct 2016 17:53:17 +0300 Subject: [PATCH] fix(MemoryStream): fix a leaking execution bug Instead of MemoryStream._add using super._add, we reimplement MemoryStream._add with custom logic. Now it's guaranteed that it will add the listener to the listeners array before emitting the remembered value. This way, if the listener is an operator like take() which can synchronously stop and remove itself from the source, we guarantee that effective _add happens before effective _remove. Previously, we had effective _add happen after effective _remove, causing a stream execution to remain even though the listener would never be removed. BREAKING CHANGE: This is generally safe to update, but note that the behavior around MemoryStream, startWith, take, imitate etc may have slightly changed, so it is recommended to run tests on your application and see if it is working, in case your application code was relying on buggy behavior. Closes #53 --- src/core.ts | 18 +++++++- tests/memoryStream.ts | 104 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/src/core.ts b/src/core.ts index 5475a54..41ea7e5 100644 --- a/src/core.ts +++ b/src/core.ts @@ -2159,8 +2159,22 @@ export class MemoryStream extends Stream { } _add(il: InternalListener): void { - if (this._has) { il._n(this._v); } - super._add(il); + const ta = this._target; + if (ta !== NO) return ta._add(il); + const a = this._ils; + a.push(il); + if (a.length > 1) { + if (this._has) il._n(this._v); + return; + } + if (this._stopID !== NO) { + if (this._has) il._n(this._v); + clearTimeout(this._stopID); + this._stopID = NO; + } else if (this._has) il._n(this._v); else { + const p = this._prod; + if (p !== NO) p._start(this); + } } _stopNow() { diff --git a/tests/memoryStream.ts b/tests/memoryStream.ts index 53b773b..e5602ca 100644 --- a/tests/memoryStream.ts +++ b/tests/memoryStream.ts @@ -161,4 +161,108 @@ describe('MemoryStream', () => { }, }); }); + + /** + * When an operator listener is about to be added to a memory stream, the + * operator will synchronously receive the remembered value, and may + * synchronously remove itself from the memory stream. This happens in the + * case of e.g. take(). In those cases, the operator should NOT be added AFTER + * it just "got the remembered value and removed itself", because that would + * create leaky executions of the stream. This was reported as bug #53 in + * GitHub. + */ + it('should not allow an operator listener to be indefinitely attached', (done) => { + let debugCalled = 0; + const debugExpected = [42, 0]; + const source$ = xs.periodic(100).startWith(42) + .debug(x => { + debugCalled += 1; + assert.strictEqual(debugExpected.length > 0, true); + assert.strictEqual(x, debugExpected.shift()); + }); + + const expectedA = [42, 0]; + const expectedB = [42]; + let completeCalled = 0; + + source$.take(2).addListener({ + next: (x) => { + assert.strictEqual(x, expectedA.shift()); + }, + error: (err) => {}, + complete: () => { + completeCalled += 1; + }, + }); + + source$.take(1).addListener({ + next: (x) => { + assert.strictEqual(x, expectedB.shift()); + }, + error: (err) => {}, + complete: () => { + completeCalled += 1; + } + }); + + setTimeout(() => { + assert.strictEqual(debugExpected.length, 0); + assert.strictEqual(expectedA.length, 0); + assert.strictEqual(expectedB.length, 0); + assert.strictEqual(completeCalled, 2); + assert.strictEqual(debugCalled, 2); + done(); + }, 500); + }); + + it('should emit remembered value also when cancelling a stop', (done) => { + const expectedA = [1]; + const expectedB = [1, 2]; + let completeCalled = 0; + let doneCalled = 0; + + const source$ = xs.createWithMemory({ + start(listener: Listener) { + listener.next(1); + setTimeout(() => { + listener.next(2); + listener.complete(); + }, 100); + }, + stop() { + doneCalled += 1; + }, + }); + + const listenerA = { + next: (x: number) => { + assert.strictEqual(x, expectedA.shift()); + source$.removeListener(listenerA); + }, + error: (err: any) => {}, + complete: () => { + completeCalled += 1; + }, + }; + + source$.addListener(listenerA); + + source$.addListener({ + next: (x) => { + assert.strictEqual(x, expectedB.shift()); + }, + error: (err) => {}, + complete: () => { + completeCalled += 1; + } + }); + + setTimeout(() => { + assert.strictEqual(expectedA.length, 0); + assert.strictEqual(expectedB.length, 0); + assert.strictEqual(completeCalled, 1); + assert.strictEqual(doneCalled, 1); + done(); + }, 300); + }); });