From f16e6a9479f6f4103dc34691b037c3d030cee52e Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Mon, 3 Apr 2017 16:24:27 +0300 Subject: [PATCH] fix(map): remove operator fusion to avoid bugs Operator fusion such as filter+map or map+flatten has been causing bugs to users, such as #165 and #178. Since this library's focus is not "as high performance as possible", we need to prioritize correctness and library size. Removing operator fusion should solve the mentioned issues besides others not reported. #165 and #178 --- perf/filter-map-fusion.js | 2 +- src/index.ts | 120 +------------------------------------- tests/operator/filter.ts | 20 ------- tests/operator/flatten.ts | 19 +----- 4 files changed, 5 insertions(+), 156 deletions(-) diff --git a/perf/filter-map-fusion.js b/perf/filter-map-fusion.js index 648c799..d51231a 100644 --- a/perf/filter-map-fusion.js +++ b/perf/filter-map-fusion.js @@ -19,7 +19,7 @@ for(var i = 0; i< a.length; ++i) { a[i] = i; } -var suite = Benchmark.Suite('filter -> map -> reduce ' + n + ' integers'); +var suite = Benchmark.Suite('filter -> map -> fusion ' + n + ' integers'); var options = { defer: true, onError: function(e) { diff --git a/src/index.ts b/src/index.ts index 00cfb85..16549a9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -884,94 +884,6 @@ class Last implements Operator { } } -class MapFlattenListener implements InternalListener { - private out: Stream; - private op: MapFlatten; - - constructor(out: Stream, op: MapFlatten) { - this.out = out; - this.op = op; - } - - _n(r: R) { - this.out._n(r); - } - - _e(err: any) { - this.out._e(err); - } - - _c() { - this.op.inner = NO as Stream; - this.op.less(); - } -} - -class MapFlatten implements Operator { - public type: string; - public ins: Stream; - public out: Stream; - public mapOp: MapOp>; - public inner: Stream; // Current inner Stream - private il: InternalListener; // Current inner InternalListener - private open: boolean; - - constructor(mapOp: MapOp>) { - this.type = `${mapOp.type}+flatten`; - this.ins = mapOp.ins; - this.out = NO as Stream; - this.mapOp = mapOp; - this.inner = NO as Stream; - this.il = NO_IL; - this.open = true; - } - - _start(out: Stream): void { - this.out = out; - this.inner = NO as Stream; - this.il = NO_IL; - this.open = true; - this.mapOp.ins._add(this); - } - - _stop(): void { - this.mapOp.ins._remove(this); - if (this.inner !== NO) this.inner._remove(this.il); - this.out = NO as Stream; - this.inner = NO as Stream; - this.il = NO_IL; - } - - less(): void { - if (!this.open && this.inner === NO) { - const u = this.out; - if (u === NO) return; - u._c(); - } - } - - _n(v: T) { - const u = this.out; - if (u === NO) return; - const {inner, il} = this; - const s = _try(this.mapOp, v, u); - if (s === NO) return; - if (inner !== NO && il !== NO_IL) inner._remove(il); - (this.inner = s as Stream)._add(this.il = new MapFlattenListener(u, this)); - } - - _e(err: any) { - const u = this.out; - if (u === NO) return; - u._e(err); - } - - _c() { - this.open = false; - this.less(); - } -} - class MapOp implements Operator { public type = 'map'; public ins: Stream; @@ -1015,25 +927,6 @@ class MapOp implements Operator { } } -class FilterMapFusion extends MapOp { - public type = 'filter+map'; - public passes: (t: T) => boolean; - - constructor(passes: (t: T) => boolean, project: (t: T) => R, ins: Stream) { - super(project, ins); - this.passes = passes; - } - - _n(t: T) { - if (!this.passes(t)) return; - const u = this.out; - if (u === NO) return; - const r = _try(this, t, u); - if (r === NO) return; - u._n(r as R); - } -} - class Remember implements InternalProducer { public type = 'remember'; public ins: Stream; @@ -1619,10 +1512,7 @@ export class Stream implements InternalListener { } as CombineSignature; protected _map(project: (t: T) => U): Stream | MemoryStream { - const p = this._prod; - const ctor = this.ctor(); - if (p instanceof Filter) return new ctor(new FilterMapFusion(p.f, project, p.ins)); - return new ctor(new MapOp(project, this)); + return new (this.ctor())(new MapOp(project, this)); } /** @@ -1665,7 +1555,7 @@ export class Stream implements InternalListener { mapTo(projectedValue: U): Stream { const s = this.map(() => projectedValue); const op: Operator = s._prod as Operator; - op.type = op.type.replace('map', 'mapTo'); + op.type = 'mapTo'; return s; } @@ -1890,11 +1780,7 @@ export class Stream implements InternalListener { */ flatten(this: Stream>): T { const p = this._prod; - return new Stream( - p instanceof MapOp && !(p instanceof FilterMapFusion) ? - new MapFlatten(p as MapOp>) : - new Flatten(this) - ) as T & Stream; + return new Stream(new Flatten(this)) as T & Stream; } /** diff --git a/tests/operator/filter.ts b/tests/operator/filter.ts index 674d9db..1f045bd 100644 --- a/tests/operator/filter.ts +++ b/tests/operator/filter.ts @@ -81,26 +81,6 @@ describe('Stream.prototype.filter', () => { }); }); - it('should should have filter+map fusion metadata', (done: any) => { - const isEven = (x: number) => x % 2 === 0; - const stream = xs.of(1, 2, 3, 4, 5, 6, 7, 8) - .filter(isEven) - .map(x => 10 * x); - - assert.strictEqual(stream['_prod']['type'], 'filter+map'); - done(); - }); - - it('should should have filter+mapTo fusion metadata', (done: any) => { - const isEven = (x: number) => x % 2 === 0; - const stream = xs.of(1, 2, 3, 4, 5, 6, 7, 8) - .filter(isEven) - .mapTo(10); - - assert.strictEqual(stream['_prod']['type'], 'filter+mapTo'); - done(); - }); - it('should call functions in correct order for filter+filter fusion', (done: any) => { const object$ = xs.of( { foo: { a: 10 } }, diff --git a/tests/operator/flatten.ts b/tests/operator/flatten.ts index 37dc2a5..08b4288 100644 --- a/tests/operator/flatten.ts +++ b/tests/operator/flatten.ts @@ -49,15 +49,6 @@ describe('Stream.prototype.flatten', () => { }); }); - it('should have an ins field as metadata', (done: any) => { - const source: Stream = xs.periodic(100).take(3); - const stream: Stream = source - .map((i: number) => xs.of(1 + i, 2 + i, 3 + i)) - .flatten(); - assert.strictEqual(stream['_prod']['ins'], source); - done(); - }); - it('should return a flat stream with correct TypeScript types', (done: any) => { const streamStrings: Stream = Stream.create({ start: (listener: Listener) => {}, @@ -285,7 +276,7 @@ describe('Stream.prototype.flatten', () => { }); }); - describe('with filter+map fusion', () => { + describe('with filter and map', () => { it('should execute the predicate, the projection, and the flattening', (done: any) => { let predicateCallCount = 0; let projectCallCount = 0; @@ -321,14 +312,6 @@ describe('Stream.prototype.flatten', () => { }); describe('with mapTo', () => { - it('should have the correct \'type\' metadata on the operator producer', (done: any) => { - const source: Stream> = xs.periodic(100).take(3) - .mapTo(xs.of(1, 2, 3)); - const stream: Stream = source.flatten(); - assert.strictEqual(stream['_prod']['type'], 'mapTo+flatten'); - done(); - }); - it('should not restart inner stream if switching to the same inner stream', (done: any) => { const outer = fromDiagram('-A---------B----------C--------|'); const nums = fromDiagram( '-a-b-c-----------------------|', {