From 76df500fc5a9bacc297dec313afe1daa12bddcbe Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Wed, 24 Feb 2016 22:41:10 +0200 Subject: [PATCH] feat(operator): implement map operator with MapMachine --- package.json | 2 +- src/Machine.ts | 6 +++ src/Observer.ts | 5 +++ src/Stream.ts | 51 ++++++++++++++++++++++++ src/factory/interval.ts | 28 +++++++++++++ src/index.ts | 82 ++------------------------------------ src/operator/MapMachine.ts | 24 +++++++++++ tests/stream.js | 22 ---------- tests/stream.ts | 26 ++++++++++-- 9 files changed, 141 insertions(+), 105 deletions(-) create mode 100644 src/Machine.ts create mode 100644 src/Observer.ts create mode 100644 src/Stream.ts create mode 100644 src/factory/interval.ts create mode 100644 src/operator/MapMachine.ts delete mode 100644 tests/stream.js diff --git a/package.json b/package.json index bc469b2..2f8171c 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "lint": "tslint -c tslint.json src/*.ts", "premocha": "npm run lib", "mocha": "mocha tests/**/*.ts --require ts-node/register", - "postmocha": "rm -f tests/**/*.js", + "postmocha": "rm -rf tests/**/*.js", "test": "npm run lint && npm run mocha", "prelib": "rm -rf lib/ && mkdirp lib/", "lib": "tsc", diff --git a/src/Machine.ts b/src/Machine.ts new file mode 100644 index 0000000..ad641b5 --- /dev/null +++ b/src/Machine.ts @@ -0,0 +1,6 @@ +import {Observer} from './Observer'; + +export interface Machine { + start: (observer: Observer) => void; + stop: () => void; +} diff --git a/src/Observer.ts b/src/Observer.ts new file mode 100644 index 0000000..87fc5d7 --- /dev/null +++ b/src/Observer.ts @@ -0,0 +1,5 @@ +export interface Observer { + next: (x: T) => void; + error: (err: any) => void; + complete: () => void; +} diff --git a/src/Stream.ts b/src/Stream.ts new file mode 100644 index 0000000..cb53507 --- /dev/null +++ b/src/Stream.ts @@ -0,0 +1,51 @@ +import {Observer} from './Observer'; +import {Machine} from './Machine'; +import {MapMachine} from './operator/MapMachine'; + +export class Stream implements Observer { + public observers: Array>; + public num: number; // Number of non-operator subscribers + + constructor(public machine: Machine) { + this.observers = []; + this.num = 0; + } + + next(x: T): void { + const len = this.observers.length; + for (let i = len - 1; i >= 0; i--) { + this.observers[i].next(x); + } + } + + error(err: any): void { + const len = this.observers.length; + for (let i = len - 1; i >= 0; i--) { + this.observers[i].error(err); + } + } + + complete(): void { + const len = this.observers.length; + for (let i = len - 1; i >= 0; i--) { + this.observers[i].complete(); + } + } + + subscribe(observer: Observer): void { + this.observers.push(observer); + if (++this.num === 1) this.machine.start(this); + } + + unsubscribe(observer: Observer): void { + const i = this.observers.indexOf(observer); + if (i > -1) { + this.observers.splice(i, 1); + if (--this.num <= 0) this.machine.stop(); + } + } + + map(projection: (t: T) => U): Stream { + return new Stream(new MapMachine(projection, this)); + } +} diff --git a/src/factory/interval.ts b/src/factory/interval.ts new file mode 100644 index 0000000..da5c8f3 --- /dev/null +++ b/src/factory/interval.ts @@ -0,0 +1,28 @@ +import {Machine} from '../Machine'; +import {Observer} from '../Observer'; +import {Stream} from '../Stream'; + +class IntervalMachine implements Machine { + on: boolean; + intervalID: any; + i: number; + + constructor(public period: number) { + this.intervalID = -1; + this.i = 0; + } + + start(stream: Observer): void { + this.intervalID = setInterval(() => stream.next(this.i++), this.period); + } + + stop(): void { + this.i = 0; + if (this.intervalID !== -1) clearInterval(this.intervalID); + } +} + +export default function interval(period: number) { + const intervalMachine = new IntervalMachine(period); + return new Stream(intervalMachine); +} diff --git a/src/index.ts b/src/index.ts index ba3cce6..a878520 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,81 +1,7 @@ -export interface Observer { - next: (x: T) => void; - error: (err: any) => void; - complete: () => void; -} - -export interface Machine { - start: (observer: Observer) => void; - stop: () => void; -} - -export class Stream implements Observer { - public observers: Array>; - - constructor(public machine: Machine) { - this.observers = []; - } - - next(x: T): void { - const len = this.observers.length; - for (let i = len - 1; i >= 0; i--) { - this.observers[i].next(x); - } - } - - error(err: any): void { - const len = this.observers.length; - for (let i = len - 1; i >= 0; i--) { - this.observers[i].error(err); - } - } - - complete(): void { - const len = this.observers.length; - for (let i = len - 1; i >= 0; i--) { - this.observers[i].complete(); - } - } - - subscribe(observer: Observer): void { - this.observers.push(observer); - if (this.observers.length === 1) this.machine.start(this); - } - - unsubscribe(observer: Observer): void { - const i = this.observers.indexOf(observer); - if (i > -1) { - this.observers.splice(i, 1); - if (!this.observers.length) this.machine.stop(); - } - } -} - -class IntervalMachine implements Machine { - on: boolean; - intervalID: any; - i: number; - - constructor(public period: number) { - this.intervalID = -1; - this.i = 0; - } - - start(observer: Observer): void { - this.intervalID = setInterval(() => observer.next(this.i++), this.period); - } - - stop(): void { - this.i = 0; - if (this.intervalID !== -1) clearInterval(this.intervalID); - } -} - -export function interval(period: number) { - const intervalMachine = new IntervalMachine(period); - return new Stream(intervalMachine); -} +import interval from './factory/interval'; +import {Stream} from './Stream'; export default { - interval + Stream, + interval, }; diff --git a/src/operator/MapMachine.ts b/src/operator/MapMachine.ts new file mode 100644 index 0000000..792de27 --- /dev/null +++ b/src/operator/MapMachine.ts @@ -0,0 +1,24 @@ +import {Observer} from '../Observer'; +import {Machine} from '../Machine'; +import {Stream} from '../Stream'; + +export class MapMachine implements Machine { + public proxy: Observer; + + constructor(public projection: (t: T) => U, + public inStream: Stream) { + } + + start(outStream: Observer): void { + this.proxy = { + next: (t: T) => outStream.next(this.projection(t)), + error: outStream.error, + complete: outStream.complete, + }; + this.inStream.subscribe(this.proxy); + } + + stop(): void { + this.inStream.unsubscribe(this.proxy); + } +} diff --git a/tests/stream.js b/tests/stream.js deleted file mode 100644 index 8e6253d..0000000 --- a/tests/stream.js +++ /dev/null @@ -1,22 +0,0 @@ -"use strict"; -var index_1 = require('../src/index'); -var assert = require('assert'); -describe('Stream', function () { - it('can be subscribed and unsubscribed with one observer', function (done) { - var stream = index_1.default.interval(100); - var i = 0; - var observer = { - next: function (x) { - assert.equal(x, i); - i += 1; - if (i === 2) { - stream.unsubscribe(observer); - done(); - } - }, - error: done.fail, - complete: done.fail, - }; - stream.subscribe(observer); - }); -}); diff --git a/tests/stream.ts b/tests/stream.ts index 474fbac..00bfe9e 100644 --- a/tests/stream.ts +++ b/tests/stream.ts @@ -4,12 +4,30 @@ import * as assert from 'assert'; describe('Stream', () => { it('can be subscribed and unsubscribed with one observer', (done) => { const stream = xs.interval(100); - let i = 0; + const expected = [0, 1, 2]; let observer = { next: (x: number) => { - assert.equal(x, i); - i += 1; - if (i === 2) { + assert.equal(x, expected.shift()); + if (expected.length === 0) { + stream.unsubscribe(observer); + done(); + } + }, + error: done.fail, + complete: done.fail, + }; + stream.subscribe(observer); + }); +}); + +describe('Stream.prototype.map', () => { + it('should transform values from input stream to output stream', (done) => { + const stream = xs.interval(100).map(i => 10 * i); + const expected = [0, 10, 20]; + let observer = { + next: (x: number) => { + assert.equal(x, expected.shift()); + if (expected.length === 0) { stream.unsubscribe(observer); done(); }