Skip to content

Commit

Permalink
fix(delay): unsubscribe scheduled action
Browse files Browse the repository at this point in the history
  • Loading branch information
ubnt-michals committed Apr 26, 2018
1 parent 7d8863f commit 51995e0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
17 changes: 17 additions & 0 deletions spec/operators/delay-spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { expect } from 'chai';
import * as Rx from '../../dist/package/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

Expand Down Expand Up @@ -147,4 +148,20 @@ describe('Observable.prototype.delay', () => {

expectObservable(result).toBe(expected);
});

it('should unsubscribe scheduled action when result is unsubscribed explicitly', () => {
const e1 = cold('---ab---|');
const t = time( '---|');
const expected = '------';
const unsub = '-----!';
const subs = '^ !';

const result = e1.delay(t, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
rxTestScheduler.maxFrames = 50;
rxTestScheduler.flush();
expect(rxTestScheduler.actions).to.have.length(0);
});
});
19 changes: 13 additions & 6 deletions src/operators/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Action } from '../scheduler/Action';
import { Notification } from '../Notification';
import { Observable } from '../Observable';
import { PartialObserver } from '../Observer';
import { TeardownLogic } from '../Subscription';
import { TeardownLogic, Subscription } from '../Subscription';
import { MonoTypeOperatorFunction } from '../interfaces';

/**
Expand Down Expand Up @@ -79,7 +79,7 @@ interface DelayState<T> {
*/
class DelaySubscriber<T> extends Subscriber<T> {
private queue: Array<DelayMessage<T>> = [];
private active: boolean = false;
private nextAction: Subscription = null;
private errored: boolean = false;

private static dispatch<T>(this: Action<DelayState<T>>, state: DelayState<T>): void {
Expand All @@ -97,7 +97,7 @@ class DelaySubscriber<T> extends Subscriber<T> {
this.schedule(state, delay);
} else {
this.unsubscribe();
source.active = false;
source.nextAction = null;
}
}

Expand All @@ -108,8 +108,7 @@ class DelaySubscriber<T> extends Subscriber<T> {
}

private _schedule(scheduler: IScheduler): void {
this.active = true;
scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
this.nextAction = scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
source: this, destination: this.destination, scheduler: scheduler
});
}
Expand All @@ -123,7 +122,7 @@ class DelaySubscriber<T> extends Subscriber<T> {
const message = new DelayMessage(scheduler.now() + this.delay, notification);
this.queue.push(message);

if (this.active === false) {
if (this.nextAction === null) {
this._schedule(scheduler);
}
}
Expand All @@ -141,6 +140,14 @@ class DelaySubscriber<T> extends Subscriber<T> {
protected _complete() {
this.scheduleNotification(Notification.createComplete());
}

unsubscribe(): void {
if (this.nextAction !== null) {
this.nextAction.unsubscribe();
this.nextAction = null;
}
super.unsubscribe();
}
}

class DelayMessage<T> {
Expand Down
10 changes: 9 additions & 1 deletion src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ export class VirtualTimeScheduler extends AsyncScheduler {
const {actions, maxFrames} = this;
let error: any, action: AsyncAction<any>;

while ((action = actions.shift()) && (this.frame = action.delay) <= maxFrames) {
while (actions.length > 0) {
action = actions[0];
if (action.delay > maxFrames) {
this.frame = maxFrames;
break;
}

actions.shift();
this.frame = action.delay;
if (error = action.execute(action.state, action.delay)) {
break;
}
Expand Down

0 comments on commit 51995e0

Please sign in to comment.