Skip to content

Commit

Permalink
fix(MemoryStream): fix teardown of MemoryStream to forget past execut…
Browse files Browse the repository at this point in the history
…ions

Fix MemoryStream to teardown its own internal memory after its execution ends, particularly, when
the last listener is removed and async stop is executed, which is not necessarily the _x() teardown.

Fixes issue #71.
  • Loading branch information
staltz committed Jul 8, 2016
1 parent 5f0db83 commit 6bdf596
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
17 changes: 14 additions & 3 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,13 @@ export class Stream<T> implements InternalListener<T> {
this._ils = [];
}

_lateStop() {
// this._prod is not null, because this _lateStop is called from _remove
// where we already checked that this._prod is truthy
this._prod._stop();
this._err = null;
}

/**
* Adds a Listener to the Stream.
*
Expand Down Expand Up @@ -1027,10 +1034,9 @@ export class Stream<T> implements InternalListener<T> {
const i = a.indexOf(il);
if (i > -1) {
a.splice(i, 1);
const p = this._prod;
if (p && a.length <= 0) {
if (this._prod && a.length <= 0) {
this._err = null;
this._stopID = setTimeout(() => p._stop());
this._stopID = setTimeout(() => this._lateStop());
} else if (a.length === 1) {
this._pruneCycles();
}
Expand Down Expand Up @@ -1793,6 +1799,11 @@ export class MemoryStream<T> extends Stream<T> {
super._add(il);
}

_lateStop() {
this._has = false;
super._lateStop();
}

_x(): void {
this._has = false;
super._x();
Expand Down
27 changes: 27 additions & 0 deletions tests/memoryStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,31 @@ describe('MemoryStream', () => {
},
});
});

it('should teardown upstream MemoryStream memory on late async stop', (done) => {
const stream = xs.periodic(500).mapTo('world').startWith('hello').take(2);
const expected1 = ['hello', 'world'];

function addSecondListener() {
const expected2 = ['hello', 'world'];
stream.addListener({
next: (x: string) => {
assert.strictEqual(x, expected2.shift());
},
error: (err: any) => done(err),
complete: () => done(),
});
}

stream.addListener({
next: (x: string) => {
assert.equal(x, expected1.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(expected1.length, 0);
setTimeout(addSecondListener, 200);
},
});
});
});

0 comments on commit 6bdf596

Please sign in to comment.