diff --git a/src/Stream.ts b/src/Stream.ts index e9034dd..c0b376b 100644 --- a/src/Stream.ts +++ b/src/Stream.ts @@ -3,6 +3,7 @@ import {Machine} from './Machine'; import {MapMachine} from './operator/MapMachine'; import {FilterMachine} from './operator/FilterMachine'; import {TakeMachine} from './operator/TakeMachine'; +import {SkipMachine} from './operator/SkipMachine'; export class Stream implements Observer { public observers: Array>; @@ -58,4 +59,8 @@ export class Stream implements Observer { take(amount: number): Stream { return new Stream(new TakeMachine(amount, this)); } + + skip(amount: number): Stream { + return new Stream(new SkipMachine(amount, this)); + } } diff --git a/src/operator/SkipMachine.ts b/src/operator/SkipMachine.ts new file mode 100644 index 0000000..1d5e360 --- /dev/null +++ b/src/operator/SkipMachine.ts @@ -0,0 +1,28 @@ +import {Observer} from '../Observer'; +import {Machine} from '../Machine'; +import {Stream} from '../Stream'; + +export class SkipMachine implements Machine { + public proxy: Observer; + public skipped: number; + + constructor(public max: number, + public inStream: Stream) { + this.skipped = 0; + } + + start(outStream: Observer): void { + this.proxy = { + next: (t: T) => { + if (this.skipped++ >= this.max) outStream.next(t); + }, + error: outStream.error, + complete: outStream.complete, + }; + this.inStream.subscribe(this.proxy); + } + + stop(): void { + this.inStream.unsubscribe(this.proxy); + } +} diff --git a/tests/stream.ts b/tests/stream.ts index 1bbb53f..97c8acb 100644 --- a/tests/stream.ts +++ b/tests/stream.ts @@ -76,3 +76,22 @@ describe('Stream.prototype.take', () => { stream.subscribe(observer); }); }); + +describe('Stream.prototype.skip', () => { + it('should allow specifying max amount to skip from input stream', (done) => { + const stream = xs.interval(50).skip(4) + const expected = [4, 5, 6]; + let observer = { + next: (x: number) => { + assert.equal(x, expected.shift()); + if (expected.length === 0) { + stream.unsubscribe(observer); + done(); + } + }, + error: done.fail, + complete: done.fail, + }; + stream.subscribe(observer); + }); +});