diff --git a/perf/filter-map-fusion.js b/perf/filter-map-fusion.js index 648c799..d51231a 100644 --- a/perf/filter-map-fusion.js +++ b/perf/filter-map-fusion.js @@ -19,7 +19,7 @@ for(var i = 0; i< a.length; ++i) { a[i] = i; } -var suite = Benchmark.Suite('filter -> map -> reduce ' + n + ' integers'); +var suite = Benchmark.Suite('filter -> map -> fusion ' + n + ' integers'); var options = { defer: true, onError: function(e) { diff --git a/src/index.ts b/src/index.ts index 00cfb85..16549a9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -884,94 +884,6 @@ class Last implements Operator { } } -class MapFlattenListener implements InternalListener { - private out: Stream; - private op: MapFlatten; - - constructor(out: Stream, op: MapFlatten) { - this.out = out; - this.op = op; - } - - _n(r: R) { - this.out._n(r); - } - - _e(err: any) { - this.out._e(err); - } - - _c() { - this.op.inner = NO as Stream; - this.op.less(); - } -} - -class MapFlatten implements Operator { - public type: string; - public ins: Stream; - public out: Stream; - public mapOp: MapOp>; - public inner: Stream; // Current inner Stream - private il: InternalListener; // Current inner InternalListener - private open: boolean; - - constructor(mapOp: MapOp>) { - this.type = `${mapOp.type}+flatten`; - this.ins = mapOp.ins; - this.out = NO as Stream; - this.mapOp = mapOp; - this.inner = NO as Stream; - this.il = NO_IL; - this.open = true; - } - - _start(out: Stream): void { - this.out = out; - this.inner = NO as Stream; - this.il = NO_IL; - this.open = true; - this.mapOp.ins._add(this); - } - - _stop(): void { - this.mapOp.ins._remove(this); - if (this.inner !== NO) this.inner._remove(this.il); - this.out = NO as Stream; - this.inner = NO as Stream; - this.il = NO_IL; - } - - less(): void { - if (!this.open && this.inner === NO) { - const u = this.out; - if (u === NO) return; - u._c(); - } - } - - _n(v: T) { - const u = this.out; - if (u === NO) return; - const {inner, il} = this; - const s = _try(this.mapOp, v, u); - if (s === NO) return; - if (inner !== NO && il !== NO_IL) inner._remove(il); - (this.inner = s as Stream)._add(this.il = new MapFlattenListener(u, this)); - } - - _e(err: any) { - const u = this.out; - if (u === NO) return; - u._e(err); - } - - _c() { - this.open = false; - this.less(); - } -} - class MapOp implements Operator { public type = 'map'; public ins: Stream; @@ -1015,25 +927,6 @@ class MapOp implements Operator { } } -class FilterMapFusion extends MapOp { - public type = 'filter+map'; - public passes: (t: T) => boolean; - - constructor(passes: (t: T) => boolean, project: (t: T) => R, ins: Stream) { - super(project, ins); - this.passes = passes; - } - - _n(t: T) { - if (!this.passes(t)) return; - const u = this.out; - if (u === NO) return; - const r = _try(this, t, u); - if (r === NO) return; - u._n(r as R); - } -} - class Remember implements InternalProducer { public type = 'remember'; public ins: Stream; @@ -1619,10 +1512,7 @@ export class Stream implements InternalListener { } as CombineSignature; protected _map(project: (t: T) => U): Stream | MemoryStream { - const p = this._prod; - const ctor = this.ctor(); - if (p instanceof Filter) return new ctor(new FilterMapFusion(p.f, project, p.ins)); - return new ctor(new MapOp(project, this)); + return new (this.ctor())(new MapOp(project, this)); } /** @@ -1665,7 +1555,7 @@ export class Stream implements InternalListener { mapTo(projectedValue: U): Stream { const s = this.map(() => projectedValue); const op: Operator = s._prod as Operator; - op.type = op.type.replace('map', 'mapTo'); + op.type = 'mapTo'; return s; } @@ -1890,11 +1780,7 @@ export class Stream implements InternalListener { */ flatten(this: Stream>): T { const p = this._prod; - return new Stream( - p instanceof MapOp && !(p instanceof FilterMapFusion) ? - new MapFlatten(p as MapOp>) : - new Flatten(this) - ) as T & Stream; + return new Stream(new Flatten(this)) as T & Stream; } /** diff --git a/tests/operator/filter.ts b/tests/operator/filter.ts index 674d9db..1f045bd 100644 --- a/tests/operator/filter.ts +++ b/tests/operator/filter.ts @@ -81,26 +81,6 @@ describe('Stream.prototype.filter', () => { }); }); - it('should should have filter+map fusion metadata', (done: any) => { - const isEven = (x: number) => x % 2 === 0; - const stream = xs.of(1, 2, 3, 4, 5, 6, 7, 8) - .filter(isEven) - .map(x => 10 * x); - - assert.strictEqual(stream['_prod']['type'], 'filter+map'); - done(); - }); - - it('should should have filter+mapTo fusion metadata', (done: any) => { - const isEven = (x: number) => x % 2 === 0; - const stream = xs.of(1, 2, 3, 4, 5, 6, 7, 8) - .filter(isEven) - .mapTo(10); - - assert.strictEqual(stream['_prod']['type'], 'filter+mapTo'); - done(); - }); - it('should call functions in correct order for filter+filter fusion', (done: any) => { const object$ = xs.of( { foo: { a: 10 } }, diff --git a/tests/operator/flatten.ts b/tests/operator/flatten.ts index 37dc2a5..08b4288 100644 --- a/tests/operator/flatten.ts +++ b/tests/operator/flatten.ts @@ -49,15 +49,6 @@ describe('Stream.prototype.flatten', () => { }); }); - it('should have an ins field as metadata', (done: any) => { - const source: Stream = xs.periodic(100).take(3); - const stream: Stream = source - .map((i: number) => xs.of(1 + i, 2 + i, 3 + i)) - .flatten(); - assert.strictEqual(stream['_prod']['ins'], source); - done(); - }); - it('should return a flat stream with correct TypeScript types', (done: any) => { const streamStrings: Stream = Stream.create({ start: (listener: Listener) => {}, @@ -285,7 +276,7 @@ describe('Stream.prototype.flatten', () => { }); }); - describe('with filter+map fusion', () => { + describe('with filter and map', () => { it('should execute the predicate, the projection, and the flattening', (done: any) => { let predicateCallCount = 0; let projectCallCount = 0; @@ -321,14 +312,6 @@ describe('Stream.prototype.flatten', () => { }); describe('with mapTo', () => { - it('should have the correct \'type\' metadata on the operator producer', (done: any) => { - const source: Stream> = xs.periodic(100).take(3) - .mapTo(xs.of(1, 2, 3)); - const stream: Stream = source.flatten(); - assert.strictEqual(stream['_prod']['type'], 'mapTo+flatten'); - done(); - }); - it('should not restart inner stream if switching to the same inner stream', (done: any) => { const outer = fromDiagram('-A---------B----------C--------|'); const nums = fromDiagram( '-a-b-c-----------------------|', {