From 4663fe10eeed10e0f132d1b46cfac8d74e8cc8a6 Mon Sep 17 00:00:00 2001 From: Christian Johns Date: Tue, 24 Jul 2018 12:13:57 -0700 Subject: [PATCH] feat(flattenConcurrentlyAtMost): add new extra Add flattenConcurrentlyAtMost(n) extra. flattenConcurrentlyAtMost is designed to provide consumer-configurable concurrency to flattening operations. Two flattening extras exist which allow consumers to flatten a meta stream with maximum concurrency, or with no concurrency. This new operator supports a concurrency limit, representing the maximum amount of _additional_ streams to connect to during flattening. Resolve staltz/xstream#161. --- src/extra/flattenConcurrentlyAtMost.ts | 132 ++++++++ tests/extra/flattenConcurrentlyAtMost.ts | 409 +++++++++++++++++++++++ 2 files changed, 541 insertions(+) create mode 100644 src/extra/flattenConcurrentlyAtMost.ts create mode 100644 tests/extra/flattenConcurrentlyAtMost.ts diff --git a/src/extra/flattenConcurrentlyAtMost.ts b/src/extra/flattenConcurrentlyAtMost.ts new file mode 100644 index 0000000..eb34103 --- /dev/null +++ b/src/extra/flattenConcurrentlyAtMost.ts @@ -0,0 +1,132 @@ +import {Operator, Stream, OutSender, InternalListener} from '../index'; + +class FCAMIL implements InternalListener, OutSender { + constructor(public out: Stream, + private op: FlattenConcAMOperator) { + } + + _n(t: T) { + this.out._n(t); + } + + _e(err: any) { + this.out._e(err); + } + + _c() { + this.op.less(); + } +} + +export class FlattenConcAMOperator implements Operator, T> { + public type = 'flattenConcurrentlyAtMost'; + public out: Stream = null as any; + private _l: number = 0; + private _d: boolean = false; + private _seq: Array> = []; + + constructor(public n: number, public ins: Stream>) { + } + + _start(out: Stream): void { + this.out = out; + this.ins._add(this); + } + + _stop(): void { + this.ins._remove(this); + this._l = 0; + this.out = null as any; + this._seq = []; + } + + less(): void { + const seq = this._seq; + if (--this._l === 0 && seq.length === 0 && this._d) { + const u = this.out; + if (!u) return; + u._c(); + } + if (this._l < this.n && seq.length > 0) { + this._n(seq.shift() as Stream); + } + } + + _n(s: Stream) { + const u = this.out; + if (!u) return; + if (this._l < this.n) { + this._l++; + s._add(new FCAMIL(u, this)); + } else { + this._seq.push(s); + } + } + + _e(err: any) { + const u = this.out; + if (!u) return; + u._e(err); + } + + _c() { + const seq = this._seq; + this._d = true; + if (this._l === 0 && seq.length === 0) { + const u = this.out; + if (!u) return; + u._c(); + } + } +} + +/** + * Flattens a "stream of streams", handling multiple concurrent nested streams + * simultaneously, up to some limit `n`. + * + * If the input stream is a stream that emits streams, then this operator will + * return an output stream which is a flat stream: emits regular events. The + * flattening happens concurrently, up to the configured limit. It works like + * this: when the input stream emits a nested stream, + * *flattenConcurrentlyAtMost* will start imitating that nested one. When the + * next nested stream is emitted on the input stream, + * *flattenConcurrentlyAtMost* will check to see how many streams it is connected + * to. If it is connected to a number of streams less than the limit, it will also + * imitate that new one, but will continue to imitate the previous nested streams + * as well. + * + * If the limit has already been reached, *flattenConcurrentlyAtMost* will put the + * stream in a queue. When any of the streams it is listening to completes, a stream + * is taken out of the queue and `flattenConcurrentlyAtMost` will connect to it. + * + * This process continues until the metastream completes and there are no more + * connected streams or streams in the queue. + * + * Marble diagrams: + * + * ```text + * --+--------+--------------- + * \ \ + * \ ----1----2---3--| + * --a--b----c----| + * flattenConcurrentlyAtMost(1) + * -----a--b----c-1----2---3--| + * ``` + * + * ```text + * --+---+---+-| + * \ \ \ + * \ \ ---fgh----i-----jh--| + * \ -----1----2----3--| + * ---a--b-----c--| + * flattenConcurrentlyAtMost(2) + * ---------a--b-1---c2--i-3------fgh----i-----jh--| + * ``` + * + * @return {Stream} + */ +export default function flattenConcurrentlyAtMost(n: number): (ins: Stream>) => Stream { + return function flattenConcAMOperator(ins: Stream>) { + return new Stream(new FlattenConcAMOperator(n, ins)); + }; +} diff --git a/tests/extra/flattenConcurrentlyAtMost.ts b/tests/extra/flattenConcurrentlyAtMost.ts new file mode 100644 index 0000000..7ea4569 --- /dev/null +++ b/tests/extra/flattenConcurrentlyAtMost.ts @@ -0,0 +1,409 @@ +/// +/// +import xs, {Stream, Listener} from '../../src/index'; +import flattenConcurrentlyAtMost from '../../src/extra/flattenConcurrentlyAtMost'; +import * as assert from 'assert'; + +describe('flattenConcurrentlyAtMost (extra)', () => { + + describe('with n less than Infinity', () => { + it('should process only n streams at a time', (done: any) => { + const stream = xs.periodic(15) + .map(_ => xs.periodic(20)) + .compose(flattenConcurrentlyAtMost(2)) + .take(10); + const expected = [0, 0, 1, 1, 2, 2, 3, 3, 4, 4 ]; + + stream.addListener({ + next: (x: number) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); + + it('should error if any stream errors', (done: any) => { + const stream = xs.periodic(15) + .map( (x: any) => xs.of('helloWorld').map((x: any) => x.sss())) + .compose(flattenConcurrentlyAtMost(2)); + + stream.addListener({ + error: () => done(), + }); + }); + + it('should lazily subscribe to streams in the buffer', (done: any) => { + const stream = xs.of(0, 1, 2) + .map(i => xs.periodic(100 * (i + 1) + 10 * i).take(2).map(x => `${i}${x}`)) + .compose(flattenConcurrentlyAtMost(2)); + // ---x---x---x---x---x---x + // ---00--01 + // --------10------11 + // -----------20----------21 + const expected = ['00', '01', '10', '11', '20', '21']; + + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); + }); + + describe('with n === Infinity', () => { + describe('with map', () => { + it('should expand each periodic event with 3 sync events', (done: any) => { + const stream = xs.periodic(100).take(3) + .map(i => xs.of(1 + i, 2 + i, 3 + i)) + .compose(flattenConcurrentlyAtMost(Infinity)); + const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5]; + + stream.addListener({ + next: (x: number) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); + + it('should return a flat stream with correct TypeScript types', (done: any) => { + const streamStrings: Stream = Stream.create({ + start: (listener: Listener) => {}, + stop: () => {} + }); + + const streamBooleans: Stream = Stream.create({ + start: (listener: Listener) => {}, + stop: () => {} + }); + + // Type checked by the compiler. Without Stream it does not compile. + const flat: Stream = streamStrings.map(x => streamBooleans) + .compose(flattenConcurrentlyAtMost(Infinity)); + done(); + }); + + it('should expand 3 sync events as a periodic each', (done: any) => { + const stream = xs.of(0, 1, 2) + .map(i => xs.periodic(100 * (i + 1) + 10 * i).take(2).map(x => `${i}${x}`)) + .compose(flattenConcurrentlyAtMost(Infinity)); + // ---x---x---x---x---x---x + // ---00--01 + // --------10------11 + // -----------20----------21 + const expected = ['00', '01', '10', '20', '11', '21']; + + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); + + it('should expand 3 async events as a periodic each', (done: any) => { + const stream = xs.periodic(140).take(3) + .map(i => + xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`) + ) + .compose(flattenConcurrentlyAtMost(Infinity)); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ----10--11--12 + // ------------20-----------21----------22 + const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22']; + + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); + + it('should expand 3 async events as a periodic each, no optimization', (done: any) => { + const stream = xs.periodic(140).take(3) + .map(i => + xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`) + ) + .filter(() => true) // breaks the optimization map+flattenConcurrently + .compose(flattenConcurrentlyAtMost(Infinity)); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ----10--11--12 + // ------------20-----------21----------22 + + const expected = ['00', '01', '10', '02', '11', '12', '20', '21', '22']; + + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + } + }); + }); + + it('should propagate user mistakes in project as errors', (done: any) => { + const source = xs.periodic(30).take(1); + const stream = source.map( + x => { + const y = ( x).toLowerCase(); + return xs.of(y); + } + ).compose(flattenConcurrentlyAtMost(Infinity)); + + stream.addListener({ + next: () => done('next should not be called'), + error: (err) => { + assert.notStrictEqual(err.message.match(/is not a function$/), null); + done(); + }, + complete: () => { + done('complete should not be called'); + }, + }); + }); + }); + + describe('with filter+map fusion', () => { + it('should execute the predicate, the projection, and the flattening', (done: any) => { + let predicateCallCount = 0; + let projectCallCount = 0; + + const stream = xs.periodic(140).take(3) + .filter(i => { + predicateCallCount += 1; + return i % 2 === 0; + }) + .map(i => { + projectCallCount += 1; + return xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`); + }) + .compose(flattenConcurrentlyAtMost(Infinity)); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ------------20-----------21----------22 + const expected = ['00', '01', '02', '20', '21', '22']; + + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + assert.equal(predicateCallCount, 3); + assert.equal(projectCallCount, 2); + done(); + } + }); + }); + }); + }); + + describe('with n === 1', () => { + describe('with map', () => { + it('should expand each periodic event with 3 sync events', (done: any) => { + const stream = xs.periodic(100).take(3) + .map(i => xs.of(1 + i, 2 + i, 3 + i)) + .compose(flattenConcurrentlyAtMost(1)); + const expected = [1, 2, 3, 2, 3, 4, 3, 4, 5]; + const listener = { + next: (x: number) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }; + stream.addListener(listener); + }); + + it('should expand each sync event as a periodic stream and concatenate', (done: any) => { + const stream = xs.of(1, 2, 3) + .map(i => xs.periodic(100).take(3).map(x => `${i}${x}`)) + .compose(flattenConcurrentlyAtMost(1)); + const expected = ['10', '11', '12', '20', '21', '22', '30', '31', '32']; + const listener = { + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }; + stream.addListener(listener); + }); + + it('should expand 3 sync events as a periodic each', (done: any) => { + const stream = xs.of(1, 2, 3) + .map(i => xs.periodic(100 * i).take(2).map(x => `${i}${x}`)) + .compose(flattenConcurrentlyAtMost(1)); + // ---x---x---x---x---x---x + // ---10--11 + // -------20------21 + // -----------30----------31 + const expected = ['10', '11', '20', '21', '30', '31']; + const listener = { + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }; + stream.addListener(listener); + }); + + it('should expand 3 async events as a periodic each', (done: any) => { + const stream = xs.periodic(140).take(3) + .map(i => + xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`) + ) + .compose(flattenConcurrentlyAtMost(1)); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ----10--11--12 + // ------------20-----------21----------22 + const expected = ['00', '01', '02', '10', '11', '12', '20', '21', '22']; + stream.addListener({ + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + }, + }); + }); + + it('should expand 3 async events as a periodic each, no optimization', (done: any) => { + const stream = xs.periodic(140).take(3) + .map(i => + xs.periodic(100 * (i < 2 ? 1 : i)).take(3).map(x => `${i}${x}`) + ) + .filter(() => true) // breaks an optimization map+flattenConcurrentlyAtMost + .compose(flattenConcurrentlyAtMost(1)); + // ---x---x---x---x---x---x---x---x---x---x---x---x + // ---00--01--02 + // ----10--11--12 + // ------------20-----------21----------22 + + const expected = ['00', '01', '02', '10', '11', '12', '20', '21', '22']; + const listener = { + next: (x: string) => { + assert.equal(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.equal(expected.length, 0); + done(); + } + }; + stream.addListener(listener); + }); + + it('should propagate user mistakes in project as errors', (done: any) => { + const source = xs.periodic(30).take(1); + const stream = source.map( + x => { + const y = ( x).toLowerCase(); + return xs.of(y); + } + ).compose(flattenConcurrentlyAtMost(1)); + + stream.addListener({ + next: () => done('next should not be called'), + error: (err) => { + assert.notStrictEqual(err.message.match(/is not a function$/), null); + done(); + }, + complete: () => { + done('complete should not be called'); + }, + }); + }); + + it('should emit data from inner streams after synchronous outer completes', (done: any) => { + const outer = xs.of(42); + const stream = outer.map(i => xs.periodic(50).take(2).mapTo(i)) + .compose(flattenConcurrentlyAtMost(1)); + const expected = [42, 42]; + + stream.addListener({ + next: (x: number) => { + assert.strictEqual(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => { + assert.strictEqual(expected.length, 0); + done(); + }, + }); + }); + + it('should stop inner emissions if result stops', (done: any) => { + const expectedInner = [0, 1]; + + const stream = xs.of(1) + .map(i => + xs.periodic(150).take(3) // 150ms, 300ms, 450ms, 600ms + .debug(x => assert.strictEqual(x, expectedInner.shift())) + ) + .compose(flattenConcurrentlyAtMost(1)); + + const expected = [0, 1]; + const listener = { + next: (x: number) => { + assert.strictEqual(x, expected.shift()); + }, + error: (err: any) => done(err), + complete: () => done('should not call complete'), + }; + + stream.addListener(listener); + setTimeout(() => { + stream.removeListener(listener); + }, 390); + + setTimeout(() => { + assert.strictEqual(expectedInner.length, 0); + assert.strictEqual(expected.length, 0); + done(); + }, 500); + }); + }); + }); +});