Skip to content

Commit

Permalink
feat(extra): implement new extra operator: split
Browse files Browse the repository at this point in the history
  • Loading branch information
staltz committed May 11, 2016
1 parent e06d502 commit 84742e8
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ export class Stream<T> implements InternalListener<T> {
private _stopID: any = empty;
private _prod: InternalProducer<T>;

constructor(producer: InternalProducer<T>) {
constructor(producer?: InternalProducer<T>) {
this._prod = producer;
this._ils = [];
}
Expand Down
70 changes: 70 additions & 0 deletions src/extra/split.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {Operator, InternalListener, Stream, emptyListener} from '../core';

class SeparatorIL<T> implements InternalListener<any> {
constructor(private out: Stream<Stream<T>>,
private op: SplitOperator<T>) {
}

_n(t: any) {
this.op.up();
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.op.curr._c();
this.out._c();
}
}

export class SplitOperator<T> implements Operator<T, Stream<T>> {
public type = 'split';
public curr: Stream<T> = new Stream<T>();
private out: Stream<Stream<T>> = null;
private sil: InternalListener<any> = emptyListener; // sil = separator InternalListener

constructor(public s: Stream<any>, // s = separator
public ins: Stream<T>) {
}

_start(out: Stream<Stream<T>>): void {
this.out = out;
this.s._add(this.sil = new SeparatorIL<T>(out, this));
this.ins._add(this);
out._n(this.curr);
}

_stop(): void {
this.ins._remove(this);
this.s._remove(this.sil);
this.curr = new Stream<T>();
this.out = null;
this.sil = emptyListener;
}

up(): void {
this.curr._c();
this.out._n(this.curr = new Stream<T>());
}

_n(t: T) {
this.curr._n(t);
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.curr._c();
this.out._c();
}
}

export default function split<T>(separator: Stream<any>): (ins: Stream<T>) => Stream<Stream<T>> {
return function splitOperator(ins: Stream<T>): Stream<Stream<T>> {
return new Stream<Stream<T>>(new SplitOperator(separator, ins));
};
}
73 changes: 73 additions & 0 deletions tests/extra/split.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import xs, {Stream} from '../../src/index';
import split from '../../src/extra/split';
import concat from '../../src/extra/concat';
import * as assert from 'assert';

describe('split (extra)', () => {
it('should split a stream using a separator stream', (done) => {
const source = xs.periodic(50).take(10);
const separator = concat(xs.periodic(167).take(2), xs.never());
const stream = source.compose(split(separator));
const outerExpected = [
[0, 1, 2],
[3, 4, 5],
[6, 7, 8, 9]
];

stream.addListener({
next: (inner: Stream<number>) => {
const innerExpected = outerExpected.shift();
inner.addListener({
next: (x: number) => {
assert.equal(x, innerExpected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(innerExpected.length, 0);
}
});
},
error: (err: any) => done(err),
complete: () => {
assert.equal(outerExpected.length, 0);
done();
},
});
});

it('should be canceled out if flattened immediately after', (done) => {
const source = xs.periodic(50).take(10);
const separator = concat(xs.periodic(167).take(2), xs.never());
const stream = source.compose(split(separator)).flatten();
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
}
});
});

it('should complete when the separator completes', (done) => {
const source = xs.periodic(50).take(10);
const separator = xs.periodic(167).take(2);
const stream = source.compose(split(separator)).flatten();
const expected = [0, 1, 2, 3, 4, 5];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
}
});
});
});
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"src/extra/flattenSequentially.ts",
"src/extra/fromEvent.ts",
"src/extra/pairwise.ts",
"src/extra/split.ts",
"src/index.ts"
],
"filesGlob": [
Expand Down

0 comments on commit 84742e8

Please sign in to comment.