diff --git a/spec/schedulers/VirtualTimeScheduler-spec.js b/spec/schedulers/VirtualTimeScheduler-spec.js new file mode 100644 index 0000000000..3c9be40873 --- /dev/null +++ b/spec/schedulers/VirtualTimeScheduler-spec.js @@ -0,0 +1,46 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var VirtualTimeScheduler = Rx.VirtualTimeScheduler; + +describe('VirtualTimeScheduler', function() { + it('should exist', function () { + expect(typeof VirtualTimeScheduler).toBe('function'); + }); + + it('should schedule things in order when flushed if each this is scheduled synchrously', function () { + var v = new VirtualTimeScheduler(); + var invoked = []; + var invoke = function (state) { + invoked.push(state); + }; + v.schedule(invoke, 0, 1); + v.schedule(invoke, 0, 2); + v.schedule(invoke, 0, 3); + v.schedule(invoke, 0, 4); + v.schedule(invoke, 0, 5); + + v.flush(); + + expect(invoked).toEqual([1, 2, 3, 4, 5]); + }); + + + + it('should schedule things in order when flushed if each this is scheduled at random', function () { + var v = new VirtualTimeScheduler(); + var invoked = []; + var invoke = function (state) { + invoked.push(state); + }; + v.schedule(invoke, 0, 1); + v.schedule(invoke, 100, 2); + v.schedule(invoke, 0, 3); + v.schedule(invoke, 500, 4); + v.schedule(invoke, 0, 5); + v.schedule(invoke, 100, 6); + + v.flush(); + + expect(invoked).toEqual([1, 3, 5, 2, 6, 4]); + }); +}); \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index 57d32b4d78..8d57ce98fa 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -1,6 +1,7 @@ import Subject from './Subject'; import ImmediateScheduler from './schedulers/ImmediateScheduler'; import NextTickScheduler from './schedulers/NextTickScheduler'; +import VirtualTimeScheduler from './schedulers/VirtualTimeScheduler'; import immediate from './schedulers/immediate'; import nextTick from './schedulers/nextTick'; import Observable from './Observable'; @@ -227,5 +228,6 @@ export { ReplaySubject, BehaviorSubject, ConnectableObservable, - Notification  + Notification, + VirtualTimeScheduler }; diff --git a/src/Scheduler.ts b/src/Scheduler.ts index d0a9f3dbe2..b6ff795663 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -1,9 +1,18 @@ import Subscription from './Subscription'; +import Action from './schedulers/Action'; interface Scheduler { now(): number; schedule(work: (state?: any) => Subscription|void, delay?: number, state?: any): Subscription; + + flush(): void; + + actions: Action[]; + + scheduled: boolean; + + active: boolean; } export default Scheduler; \ No newline at end of file diff --git a/src/operators/bufferTime.ts b/src/operators/bufferTime.ts index a8c3a199d6..08fea0f9c3 100644 --- a/src/operators/bufferTime.ts +++ b/src/operators/bufferTime.ts @@ -88,7 +88,7 @@ function dispatchBufferTimeSpanOnly(state) { function dispatchBufferCreation(state) { let { bufferTimeSpan, subscriber, scheduler } = state; let buffer = subscriber.openBuffer(); - var action = >this; + var action = this; action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer })); action.schedule(state); } diff --git a/src/operators/windowTime.ts b/src/operators/windowTime.ts index adf71492e8..3e8c30a67a 100644 --- a/src/operators/windowTime.ts +++ b/src/operators/windowTime.ts @@ -94,7 +94,7 @@ function dispatchWindowTimeSpanOnly(state) { function dispatchWindowCreation(state) { let { windowTimeSpan, subscriber, scheduler } = state; let window = subscriber.openWindow(); - let action = >this; + let action = this; let context = { action, subscription: null }; action.add(context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, { subscriber, window, context })); action.schedule(state); diff --git a/src/schedulers/Action.ts b/src/schedulers/Action.ts index 761bfe085a..f7b505f7e2 100644 --- a/src/schedulers/Action.ts +++ b/src/schedulers/Action.ts @@ -1,48 +1,11 @@ import Subscription from '../Subscription'; -import ImmediateScheduler from './ImmediateScheduler'; - -export default class Action extends Subscription { +interface Action extends Subscription { + work: (state?: any) => void|Subscription state: any; + delay?: number; + schedule(state: any); + execute(): void; +} - constructor(public scheduler: ImmediateScheduler, - public work: (x?: any) => Subscription | void) { - super(); - } - - schedule(state?: any): Action { - if (this.isUnsubscribed) { - return this; - } - - this.state = state; - const scheduler = this.scheduler; - scheduler.actions.push(this); - scheduler.flush(); - return this; - } - - execute() { - if (this.isUnsubscribed) { - throw new Error("How did did we execute a canceled Action?"); - } - this.work(this.state); - } - - unsubscribe() { - - const scheduler = this.scheduler; - const actions = scheduler.actions; - const index = actions.indexOf(this); - - this.work = void 0; - this.state = void 0; - this.scheduler = void 0; - - if (index !== -1) { - actions.splice(index, 1); - } - - super.unsubscribe(); - } -} \ No newline at end of file +export default Action; \ No newline at end of file diff --git a/src/schedulers/FutureAction.ts b/src/schedulers/FutureAction.ts index 0dcca4e0ba..e8aa974c2e 100644 --- a/src/schedulers/FutureAction.ts +++ b/src/schedulers/FutureAction.ts @@ -1,10 +1,11 @@ import Subscription from '../Subscription'; import ImmediateScheduler from './ImmediateScheduler'; import Action from './Action'; +import ImmediateAction from './ImmediateAction'; -export default class FutureAction extends Action { +export default class FutureAction extends ImmediateAction { - id: number; + id: any; constructor(public scheduler: ImmediateScheduler, public work: (x?: any) => Subscription | void, @@ -12,7 +13,7 @@ export default class FutureAction extends Action { super(scheduler, work); } - schedule(state?:any): Action { + schedule(state?:any): Action { if (this.isUnsubscribed) { return this; } diff --git a/src/schedulers/ImmediateAction.ts b/src/schedulers/ImmediateAction.ts new file mode 100644 index 0000000000..0b5e39c4d4 --- /dev/null +++ b/src/schedulers/ImmediateAction.ts @@ -0,0 +1,49 @@ +import Subscription from '../Subscription'; +import Scheduler from '../Scheduler'; +import Action from './Action'; + +export default class ImmediateAction extends Subscription implements Action { + + state: any; + + constructor(public scheduler: Scheduler, + public work: (x?: any) => Subscription | void) { + super(); + } + + schedule(state?: any): Action { + if (this.isUnsubscribed) { + return this; + } + + this.state = state; + const scheduler = this.scheduler; + scheduler.actions.push(this); + scheduler.flush(); + return this; + } + + execute() { + if (this.isUnsubscribed) { + throw new Error("How did did we execute a canceled Action?"); + } + this.work(this.state); + } + + unsubscribe() { + + const scheduler = this.scheduler; + const actions = scheduler.actions; + const index = actions.indexOf(this); + + this.work = void 0; + this.state = void 0; + this.scheduler = void 0; + + if (index !== -1) { + actions.splice(index, 1); + } + + super.unsubscribe(); + } +} \ No newline at end of file diff --git a/src/schedulers/ImmediateScheduler.ts b/src/schedulers/ImmediateScheduler.ts index b613d8e9b6..b2319b7e1f 100644 --- a/src/schedulers/ImmediateScheduler.ts +++ b/src/schedulers/ImmediateScheduler.ts @@ -1,11 +1,12 @@ import { Immediate } from '../util/Immediate'; import Scheduler from '../Scheduler'; -import Action from './Action'; +import ImmediateAction from './ImmediateAction'; import Subscription from '../Subscription'; import FutureAction from './FutureAction'; +import Action from './Action'; export default class ImmediateScheduler implements Scheduler { - actions: Action[] = []; + actions: ImmediateAction[] = []; active: boolean = false; scheduled: boolean = false; @@ -31,11 +32,11 @@ export default class ImmediateScheduler implements Scheduler { this.scheduleLater(work, delay, state); } - scheduleNow(work: (x?: any) => Subscription | void, state?: any): Action { - return new Action(this, work).schedule(state); + scheduleNow(work: (x?: any) => Subscription | void, state?: any): Action { + return new ImmediateAction(this, work).schedule(state); } - scheduleLater(work: (x?: any) => Subscription | void, delay: number, state?: any): Action { + scheduleLater(work: (x?: any) => Subscription | void, delay: number, state?: any): Action { return new FutureAction(this, work, delay).schedule(state); } } \ No newline at end of file diff --git a/src/schedulers/NextTickAction.ts b/src/schedulers/NextTickAction.ts index b8dac9c21d..0509a5e1aa 100644 --- a/src/schedulers/NextTickAction.ts +++ b/src/schedulers/NextTickAction.ts @@ -1,12 +1,13 @@ import {Immediate} from '../util/Immediate'; import Subscription from '../Subscription'; +import ImmediateAction from './ImmediateAction'; import Action from './Action'; -export default class NextTickAction extends Action { +export default class NextTickAction extends ImmediateAction { id: number; - schedule(state?: any): Action { + schedule(state?: any): Action { if (this.isUnsubscribed) { return this; } diff --git a/src/schedulers/NextTickScheduler.ts b/src/schedulers/NextTickScheduler.ts index 02a38cb6c8..04b4ff9dd7 100644 --- a/src/schedulers/NextTickScheduler.ts +++ b/src/schedulers/NextTickScheduler.ts @@ -3,11 +3,12 @@ import ImmediateScheduler from './ImmediateScheduler'; import Subscription from '../Subscription'; import Action from './Action'; import NextTickAction from './NextTickAction'; +import ImmediateAction from './ImmediateAction'; export default class NextTickScheduler extends ImmediateScheduler { - scheduleNow(work: (x?: any) => Subscription, state?: any): Action { + scheduleNow(work: (x?: any) => Subscription, state?: any): Action { return (this.scheduled ? - new Action(this, work) : + new ImmediateAction(this, work) : new NextTickAction(this, work)).schedule(state); } } \ No newline at end of file diff --git a/src/schedulers/VirtualTimeScheduler.ts b/src/schedulers/VirtualTimeScheduler.ts new file mode 100644 index 0000000000..3b3aeceaef --- /dev/null +++ b/src/schedulers/VirtualTimeScheduler.ts @@ -0,0 +1,82 @@ +import Scheduler from '../Scheduler'; +import Subscription from '../Subscription'; +import Action from './Action'; + +export default class VirtualTimeScheduler implements Scheduler { + actions: Action[] = []; + active: boolean = false; + scheduled: boolean = false; + index: number = 0; + sorted: boolean = false; + + now() { + return 0; + } + + sortActions() { + if (!this.sorted) { + ([]>this.actions).sort((a, b) => { + return a.delay === b.delay ? (a.index > b.index ? 1 : -1) : (a.delay > b.delay ? 1 : -1); + }); + this.sorted = true; + } + } + + flush() { + this.sortActions(); + this.actions.forEach(action => { + action.execute(); + }); + this.actions.length = 0; + } + + schedule(work: (x?: any) => Subscription | void, delay: number = 0, state?: any): Subscription { + this.sorted = false; + return new VirtualAction(this, work, delay, this.index++).schedule(state); + } +} + +class VirtualAction extends Subscription implements Action { + state: any; + + constructor(public scheduler: VirtualTimeScheduler, + public work: (x?: any) => Subscription | void, + public delay: number, + public index: number) { + super(); + } + + schedule(state?: any): VirtualAction { + if (this.isUnsubscribed) { + return this; + } + + this.state = state; + const scheduler = this.scheduler; + scheduler.actions.push(this); + return this; + } + + execute() { + if (this.isUnsubscribed) { + throw new Error("How did did we execute a canceled Action?"); + } + this.work(this.state); + } + + unsubscribe() { + const scheduler = this.scheduler; + const actions = scheduler.actions; + const index = actions.indexOf(this); + + this.work = void 0; + this.state = void 0; + this.scheduler = void 0; + + if (index !== -1) { + actions.splice(index, 1); + } + + super.unsubscribe(); + } +} \ No newline at end of file