diff --git a/src/core.ts b/src/core.ts index 61664b0..1b786c4 100644 --- a/src/core.ts +++ b/src/core.ts @@ -30,12 +30,13 @@ export interface InternalProducer { } export interface Operator extends InternalProducer, InternalListener { + type: string; + ins: Stream; _start: (out: Stream) => void; _stop: () => void; _n: (v: T) => void; _e: (err: any) => void; _c: () => void; - type: string; } export interface Producer { @@ -708,13 +709,13 @@ export class LastOperator implements Operator { } } -class MFCIL implements InternalListener { - constructor(private out: Stream, - private op: MapFlattenConcOperator) { +class MFCIL implements InternalListener { + constructor(private out: Stream, + private op: MapFlattenConcOperator) { } - _n(t: T) { - this.out._n(t); + _n(r: R) { + this.out._n(r); } _e(err: any) { @@ -726,15 +727,17 @@ class MFCIL implements InternalListener { } } -export class MapFlattenConcOperator implements InternalProducer, InternalListener { +export class MapFlattenConcOperator implements Operator { public type = 'map+flattenConcurrently'; + public ins: Stream; private active: number = 1; // number of outers and inners that have not yet ended - private out: Stream = null; + private out: Stream = null; - constructor(public mapOp: MapOperator>) { + constructor(public mapOp: MapOperator>) { + this.ins = mapOp.ins; } - _start(out: Stream): void { + _start(out: Stream): void { this.out = out; this.mapOp.ins._add(this); } @@ -769,13 +772,13 @@ export class MapFlattenConcOperator implements InternalProducer, InternalL } } -class MFIL implements InternalListener { - constructor(private out: Stream, - private op: MapFlattenOperator) { +class MFIL implements InternalListener { + constructor(private out: Stream, + private op: MapFlattenOperator) { } - _n(t: T) { - this.out._n(t); + _n(r: R) { + this.out._n(r); } _e(err: any) { @@ -788,17 +791,19 @@ class MFIL implements InternalListener { } } -export class MapFlattenOperator implements InternalProducer, InternalListener { +export class MapFlattenOperator implements Operator { public type = 'map+flatten'; - public inner: Stream = null; // Current inner Stream - private il: InternalListener = null; // Current inner InternalListener + public ins: Stream; + public inner: Stream = null; // Current inner Stream + private il: InternalListener = null; // Current inner InternalListener private open: boolean = true; - private out: Stream = null; + private out: Stream = null; - constructor(public mapOp: MapOperator>) { + constructor(public mapOp: MapOperator>) { + this.ins = mapOp.ins; } - _start(out: Stream): void { + _start(out: Stream): void { this.out = out; this.mapOp.ins._add(this); } @@ -1636,7 +1641,7 @@ export class Stream implements InternalListener { const p = this._prod; return new Stream( p instanceof MapOperator || p instanceof FilterMapOperator ? - new MapFlattenOperator(>> p) : + new MapFlattenOperator(>> p) : new FlattenOperator(>> this) ); } @@ -1670,7 +1675,7 @@ export class Stream implements InternalListener { const p = this._prod; return new Stream( p instanceof MapOperator || p instanceof FilterMapOperator ? - new MapFlattenConcOperator(>> p) : + new MapFlattenConcOperator(>> p) : new FlattenConcOperator(>> this) ); } diff --git a/tests/operator/flatten.ts b/tests/operator/flatten.ts index 62ca110..faf5207 100644 --- a/tests/operator/flatten.ts +++ b/tests/operator/flatten.ts @@ -21,6 +21,15 @@ describe('Stream.prototype.flatten', () => { }); }); + it('should have an ins field as metadata', (done) => { + const source: Stream = xs.periodic(100).take(3) + const stream: Stream = source + .map((i: number) => xs.of(1 + i, 2 + i, 3 + i)) + .flatten(); + assert.strictEqual(stream['_prod']['ins'], source); + done(); + }); + it('should return a flat stream with correct TypeScript types', (done) => { const streamStrings: Stream = Stream.create({ start: (listener: Listener) => {}, diff --git a/tests/operator/flattenConcurrently.ts b/tests/operator/flattenConcurrently.ts index 4990738..9127960 100644 --- a/tests/operator/flattenConcurrently.ts +++ b/tests/operator/flattenConcurrently.ts @@ -21,6 +21,15 @@ describe('Stream.prototype.flattenConcurrently', () => { }); }); + it('should have an ins field as metadata', (done) => { + const source: Stream = xs.periodic(100).take(3) + const stream: Stream = source + .map((i: number) => xs.of(1 + i, 2 + i, 3 + i)) + .flattenConcurrently(); + assert.strictEqual(stream['_prod']['ins'], source); + done(); + }); + it('should return a flat stream with correct TypeScript types', (done) => { const streamStrings: Stream = Stream.create({ start: (listener: Listener) => {},