diff --git a/src/operator/MapToOperator.ts b/src/operator/MapToOperator.ts new file mode 100644 index 0000000..04c9692 --- /dev/null +++ b/src/operator/MapToOperator.ts @@ -0,0 +1,38 @@ +import {InternalListener} from '../InternalListener'; +import {Operator} from '../Operator'; +import {Stream} from '../Stream'; +import {emptyListener} from '../utils/emptyListener'; + +export class Proxy implements InternalListener { + constructor(public out: Stream, + public op: MapToOperator) { + } + + _n(t: T) { + this.out._n(this.op.projectedValue); + } + + _e(err: any) { + this.out._e(err); + } + + _c() { + this.out._c(); + } +} + +export class MapToOperator implements Operator { + public proxy: InternalListener = emptyListener; + + constructor(public projectedValue: R, + public ins: Stream) { + } + + _start(out: Stream): void { + this.ins._add(this.proxy = new Proxy(out, this)); + } + + _stop(): void { + this.ins._remove(this.proxy); + } +}