diff --git a/src/Stream.ts b/src/Stream.ts index 8b19be7..773de79 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -8,6 +8,7 @@ import {DebugOperator} from './operator/DebugOperator'; import {FoldOperator} from './operator/FoldOperator'; import {LastOperator} from './operator/LastOperator'; import {StartWithOperator} from './operator/StartWithOperator'; +import {FlattenOperator} from './operator/FlattenOperator'; import {FlattenConcurrentlyOperator} from './operator/FlattenConcurrentlyOperator'; import { CombineProducer, @@ -166,6 +167,10 @@ export class Stream implements Listener { return new Stream(new StartWithOperator(this, x)); } + flatten>(): T { + return new Stream(new FlattenOperator(>> ( this))); + } + flattenConcurrently>(): T { return new Stream(new FlattenConcurrentlyOperator(>> ( this))); } diff --git a/src/operator/FlattenOperator.ts b/src/operator/FlattenOperator.ts new file mode 100644 index 0000000..9134c79 --- /dev/null +++ b/src/operator/FlattenOperator.ts @@ -0,0 +1,107 @@ +import {Observer} from '../Observer'; +import {Operator} from '../Operator'; +import {Stream} from '../Stream'; +import {emptyObserver} from '../utils/emptyObserver'; +import {MapOperator} from './MapOperator'; + +export class Inner implements Observer { + constructor(public out: Stream, + public op: FlattenOperator) { + } + + next(t: T) { + this.out.next(t); + } + + error(err: any) { + this.out.error(err); + } + + end() { + this.op.curr = null; + this.op.less(); + } +} + +export class Outer implements Observer> { + constructor(public out: Stream, + public op: FlattenOperator) { + } + + next(s: Stream) { + this.op.cut(); + (this.op.curr = s).subscribe(this.op.inner = new Inner(this.out, this.op)); + } + + error(err: any) { + this.out.error(err); + } + + end() { + this.op.open = false; + this.op.less(); + } +} + +export class MapOuter implements Observer { + constructor(public out: Stream, + public pr: (t: T) => Stream, // pr = project + public op: FlattenOperator) { + } + + next(v: T) { + this.op.cut(); + (this.op.curr = this.pr(v)).subscribe(this.op.inner = new Inner(this.out, this.op)); + } + + error(err: any) { + this.out.error(err); + } + + end() { + this.op.open = false; + this.op.less(); + } +} + +export class FlattenOperator implements Operator, T> { + public proxy: Observer> = emptyObserver; + public mapOp: MapOperator>; + public curr: Stream; // Current inner Stream + public inner: Observer; // Current inner Observer + public open: boolean = true; + public out: Stream; + + constructor(public ins: Stream>) { + if (ins._prod instanceof MapOperator) { + this.mapOp = >> ins._prod; + } + } + + start(out: Stream): void { + this.out = out; + const mapOp = this.mapOp; + if (mapOp) { + mapOp.ins.subscribe(this.proxy = new MapOuter(out, mapOp.project, this)); + } else { + this.ins.subscribe(this.proxy = new Outer(out, this)); + } + } + + stop(): void { + this.ins.unsubscribe(this.proxy); + } + + cut(): void { + const {curr, inner} = this; + if (curr && inner) { + curr.unsubscribe(inner); + } + } + + less(): void { + if (!this.open && !this.curr) { + this.out.end(); + } + } +} diff --git a/tests/operator/flatten.ts b/tests/operator/flatten.ts new file mode 100644 index 0000000..af4e4a1 --- /dev/null +++ b/tests/operator/flatten.ts @@ -0,0 +1,73 @@ +import xs from '../../src/index'; +import * as assert from 'assert'; + +describe('Stream.prototype.flatten', () => { + describe('with map', () => { + it('should expand each interval event with 3 sync events', (done) => { + const stream = xs.interval(100).take(3) + .map(i => xs.of(1 + i, 2 + i, 3 + i)) + .flatten(); + const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5]; + const observer = { + next: (x: number) => { + assert.equal(x, expected.shift()); + if (expected.length === 0) { + stream.unsubscribe(observer); + done(); + } + }, + error: done.fail, + end: done.fail, + }; + stream.subscribe(observer); + }); + + it('should expand 3 sync events as an interval, only last one passes', (done) => { + const stream = xs.from([0, 1, 2]) + .map(i => xs.interval(100 * i).take(2).map(x => `${i}${x}`)) + .flatten(); + // ---x---x---x---x---x---x + // ---00--01 + // -------10------11 + // -----------20----------21 + const expected = ['20', '21']; + const observer = { + next: (x: number) => { + assert.equal(x, expected.shift()); + if (expected.length === 0) { + stream.unsubscribe(observer); + done(); + } + }, + error: (err: any) => done(err), + end: () => done(new Error('No end() should be called')), + }; + stream.subscribe(observer); + }); + + it('should expand 3 async events as an interval each', (done) => { + const stream = xs.interval(140).take(3) + .map(i => + xs.interval(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`) + ) + .flatten(); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ----10--11--12 + // ------------20-----------21----------22 + const expected = ['00', '10', '20', '21', '22']; + const observer = { + next: (x: number) => { + assert.equal(x, expected.shift()); + if (expected.length === 0) { + stream.unsubscribe(observer); + done(); + } + }, + error: (err: any) => done(err), + end: () => done(new Error('No end() should be called')), + }; + stream.subscribe(observer); + }); + }); +}); diff --git a/tests/operator/flattenConcurrently.ts b/tests/operator/flattenConcurrently.ts index 463461b..5070d5a 100644 --- a/tests/operator/flattenConcurrently.ts +++ b/tests/operator/flattenConcurrently.ts @@ -44,5 +44,30 @@ describe('Stream.prototype.flattenConcurrently', () => { }; stream.addListener(listener); }); + + it('should expand 3 async events as an interval each', (done) => { + const stream = xs.interval(140).take(3) + .map(i => + xs.interval(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`) + ) + .flattenConcurrently(); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ----10--11--12 + // ------------20-----------21----------22 + const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22']; + const observer = { + next: (x: number) => { + assert.equal(x, expected.shift()); + if (expected.length === 0) { + stream.unsubscribe(observer); + done(); + } + }, + error: (err: any) => done(err), + end: () => done(new Error('No end() should be called')), + }; + stream.subscribe(observer); + }); }); });