Skip to content

Commit

Permalink
feat(operator): implement map operator with MapMachine
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Feb 24, 2016
1 parent 63502a9 commit 76df500
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 105 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"lint": "tslint -c tslint.json src/*.ts",
"premocha": "npm run lib",
"mocha": "mocha tests/**/*.ts --require ts-node/register",
"postmocha": "rm -f tests/**/*.js",
"postmocha": "rm -rf tests/**/*.js",
"test": "npm run lint && npm run mocha",
"prelib": "rm -rf lib/ && mkdirp lib/",
"lib": "tsc",
Expand Down
6 changes: 6 additions & 0 deletions src/Machine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {Observer} from './Observer';

export interface Machine<T> {
start: (observer: Observer<T>) => void;
stop: () => void;
}
5 changes: 5 additions & 0 deletions src/Observer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface Observer<T> {
next: (x: T) => void;
error: (err: any) => void;
complete: () => void;
}
51 changes: 51 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import {Observer} from './Observer';
import {Machine} from './Machine';
import {MapMachine} from './operator/MapMachine';

export class Stream<T> implements Observer<T> {
public observers: Array<Observer<T>>;
public num: number; // Number of non-operator subscribers

constructor(public machine: Machine<T>) {
this.observers = [];
this.num = 0;
}

next(x: T): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].next(x);
}
}

error(err: any): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].error(err);
}
}

complete(): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].complete();
}
}

subscribe(observer: Observer<T>): void {
this.observers.push(observer);
if (++this.num === 1) this.machine.start(this);
}

unsubscribe(observer: Observer<T>): void {
const i = this.observers.indexOf(observer);
if (i > -1) {
this.observers.splice(i, 1);
if (--this.num <= 0) this.machine.stop();
}
}

map<U>(projection: (t: T) => U): Stream<U> {
return new Stream<U>(new MapMachine(projection, this));
}
}
28 changes: 28 additions & 0 deletions src/factory/interval.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import {Machine} from '../Machine';
import {Observer} from '../Observer';
import {Stream} from '../Stream';

class IntervalMachine implements Machine<number> {
on: boolean;
intervalID: any;
i: number;

constructor(public period: number) {
this.intervalID = -1;
this.i = 0;
}

start(stream: Observer<number>): void {
this.intervalID = setInterval(() => stream.next(this.i++), this.period);
}

stop(): void {
this.i = 0;
if (this.intervalID !== -1) clearInterval(this.intervalID);
}
}

export default function interval(period: number) {
const intervalMachine = new IntervalMachine(period);
return new Stream<number>(intervalMachine);
}
82 changes: 4 additions & 78 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,81 +1,7 @@
export interface Observer<T> {
next: (x: T) => void;
error: (err: any) => void;
complete: () => void;
}

export interface Machine<T> {
start: (observer: Observer<T>) => void;
stop: () => void;
}

export class Stream<T> implements Observer<T> {
public observers: Array<Observer<T>>;

constructor(public machine: Machine<T>) {
this.observers = [];
}

next(x: T): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].next(x);
}
}

error(err: any): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].error(err);
}
}

complete(): void {
const len = this.observers.length;
for (let i = len - 1; i >= 0; i--) {
this.observers[i].complete();
}
}

subscribe(observer: Observer<T>): void {
this.observers.push(observer);
if (this.observers.length === 1) this.machine.start(this);
}

unsubscribe(observer: Observer<T>): void {
const i = this.observers.indexOf(observer);
if (i > -1) {
this.observers.splice(i, 1);
if (!this.observers.length) this.machine.stop();
}
}
}

class IntervalMachine implements Machine<number> {
on: boolean;
intervalID: any;
i: number;

constructor(public period: number) {
this.intervalID = -1;
this.i = 0;
}

start(observer: Observer<number>): void {
this.intervalID = setInterval(() => observer.next(this.i++), this.period);
}

stop(): void {
this.i = 0;
if (this.intervalID !== -1) clearInterval(this.intervalID);
}
}

export function interval(period: number) {
const intervalMachine = new IntervalMachine(period);
return new Stream<number>(intervalMachine);
}
import interval from './factory/interval';
import {Stream} from './Stream';

export default {
interval
Stream,
interval,
};
24 changes: 24 additions & 0 deletions src/operator/MapMachine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Stream} from '../Stream';

export class MapMachine<T, U> implements Machine<U> {
public proxy: Observer<T>;

constructor(public projection: (t: T) => U,
public inStream: Stream<T>) {
}

start(outStream: Observer<U>): void {
this.proxy = {
next: (t: T) => outStream.next(this.projection(t)),
error: outStream.error,
complete: outStream.complete,
};
this.inStream.subscribe(this.proxy);
}

stop(): void {
this.inStream.unsubscribe(this.proxy);
}
}
22 changes: 0 additions & 22 deletions tests/stream.js

This file was deleted.

26 changes: 22 additions & 4 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,30 @@ import * as assert from 'assert';
describe('Stream', () => {
it('can be subscribed and unsubscribed with one observer', (done) => {
const stream = xs.interval(100);
let i = 0;
const expected = [0, 1, 2];
let observer = {
next: (x: number) => {
assert.equal(x, i);
i += 1;
if (i === 2) {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
complete: done.fail,
};
stream.subscribe(observer);
});
});

describe('Stream.prototype.map', () => {
it('should transform values from input stream to output stream', (done) => {
const stream = xs.interval(100).map(i => 10 * i);
const expected = [0, 10, 20];
let observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
Expand Down

0 comments on commit 76df500

Please sign in to comment.