Skip to content

Commit

Permalink
Refactor AsyncQueue's "delayed scheduling" support and add cancellati…
Browse files Browse the repository at this point in the history
…on. (#489)

* Introduces a DelayedOperation helper class in AsyncQueue to encapsulate
  delayed op logic.
* Adds cancellation support which I want to use in
  #412
* Updates the idle timer in persistent_stream.ts to use new cancellation support.
* Remove delayedOperationsCount in favor of keeping delayedOperations populated
  correctly at all times.
* Fixes a preexisting issue in AsyncQueue.schedule() where the returned promise
  would always be resolved with undefined, instead of the result of your op.
  Also remove an AnyDuringMigration usage.
  • Loading branch information
mikelehen authored Feb 6, 2018
1 parent 2db58a0 commit a1e346f
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 86 deletions.
36 changes: 25 additions & 11 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { Connection, Stream } from './connection';
import { JsonProtoSerializer } from './serializer';
import { WatchChange } from './watch_change';
import { isNullOrUndefined } from '../util/types';
import { CancelablePromise } from '../util/promise';

const LOG_TAG = 'PersistentStream';

Expand Down Expand Up @@ -154,7 +155,7 @@ export abstract class PersistentStream<
ListenerType extends PersistentStreamListener
> {
private state: PersistentStreamState;
private idle = false;
private inactivityTimerPromise: CancelablePromise<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
Expand Down Expand Up @@ -245,16 +246,25 @@ export abstract class PersistentStream<
}

/**
* Initializes the idle timer. If no write takes place within one minute, the
* WebChannel stream will be closed.
* Marks this stream as idle. If no further actions are performed on the
* stream for one minute, the stream will automatically close itself and
* notify the stream's onClose() handler with Status.OK. The stream will then
* be in a !isStarted() state, requiring the caller to start the stream again
* before further use.
*
* Only streams that are in state 'Open' can be marked idle, as all other
* states imply pending network operations.
*/
markIdle(): void {
this.idle = true;
this.queue
.schedule(() => {
return this.handleIdleCloseTimer();
}, IDLE_TIMEOUT_MS)
.catch((err: FirestoreError) => {
// Starts the idle time if we are in state 'Open' and are not yet already
// running a timer (in which case the previous idle timeout still applies).
if (this.isOpen() && this.inactivityTimerPromise === null) {
this.inactivityTimerPromise = this.queue.scheduleWithDelay(
() => this.handleIdleCloseTimer(),
IDLE_TIMEOUT_MS
);

this.inactivityTimerPromise.catch((err: FirestoreError) => {
// When the AsyncQueue gets drained during testing, pending Promises
// (including these idle checks) will get rejected. We special-case
// these cancelled idle checks to make sure that these specific Promise
Expand All @@ -266,6 +276,7 @@ export abstract class PersistentStream<
}`
);
});
}
}

/** Sends a message to the underlying stream. */
Expand All @@ -276,7 +287,7 @@ export abstract class PersistentStream<

/** Called by the idle timer when the stream should close due to inactivity. */
private handleIdleCloseTimer(): Promise<void> {
if (this.isOpen() && this.idle) {
if (this.isOpen()) {
// When timing out an idle stream there's no reason to force the stream into backoff when
// it restarts so set the stream state to Initial instead of Error.
return this.close(PersistentStreamState.Initial);
Expand All @@ -286,7 +297,10 @@ export abstract class PersistentStream<

/** Marks the stream as active again. */
private cancelIdleCheck() {
this.idle = false;
if (this.inactivityTimerPromise) {
this.inactivityTimerPromise.cancel();
this.inactivityTimerPromise = null;
}
}

/**
Expand Down
221 changes: 146 additions & 75 deletions packages/firestore/src/util/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,120 @@
import { assert, fail } from './assert';
import * as log from './log';
import { AnyDuringMigration, AnyJs } from './misc';
import { Deferred } from './promise';
import { Deferred, CancelablePromise } from './promise';
import { Code, FirestoreError } from './error';

type DelayedOperation<T> = {
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
handle: any;
op: () => Promise<T>;
deferred: Deferred<T>;
};
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
type TimerHandle = any;

/**
* Represents an operation scheduled to be run in the future on an AsyncQueue.
*
* It is created via DelayedOperation.createAndSchedule().
*
* Supports cancellation (via cancel()) and early execution (via skipDelay()).
*/
class DelayedOperation<T> implements CancelablePromise<T> {
// handle for use with clearTimeout(), or null if the operation has been
// executed or canceled already.
private timerHandle: TimerHandle | null;

private readonly deferred = new Deferred<T>();

private constructor(
private asyncQueue: AsyncQueue,
private op: () => Promise<T>
) {}

/**
* Creates and returns a DelayedOperation that has been scheduled to be
* executed on the provided asyncQueue after the provided delayMs.
*/
static createAndSchedule<T>(
asyncQueue: AsyncQueue,
op: () => Promise<T>,
delayMs: number
): DelayedOperation<T> {
const delayedOp = new DelayedOperation(asyncQueue, op);
delayedOp.start(delayMs);
return delayedOp;
}

/**
* Starts the timer. This is called immediately after construction by
* createAndSchedule().
*/
private start(delayMs: number): void {
this.timerHandle = setTimeout(() => this.handleDelayElapsed(), delayMs);
}

/**
* Queues the operation to run immediately (if it hasn't already been run or
* canceled).
*/
skipDelay(): void {
return this.handleDelayElapsed();
}

/**
* Cancels the operation if it hasn't already been executed or canceled. The
* promise will be rejected.
*
* As long as the operation has not yet been run, calling cancel() provides a
* guarantee that the operation will not be run.
*/
cancel(reason?: string): void {
if (this.timerHandle !== null) {
this.clearTimeout();
this.deferred.reject(
new FirestoreError(
Code.CANCELLED,
'Operation cancelled' + (reason ? ': ' + reason : '')
)
);
}
}

// Promise implementation.
readonly [Symbol.toStringTag]: 'Promise';
then = this.deferred.promise.then.bind(this.deferred.promise);
catch = this.deferred.promise.catch.bind(this.deferred.promise);

private handleDelayElapsed(): void {
this.asyncQueue.schedule(() => {
if (this.timerHandle !== null) {
this.clearTimeout();
return this.op().then(result => {
return this.deferred.resolve(result);
});
} else {
return Promise.resolve();
}
});
}

private clearTimeout() {
if (this.timerHandle) {
clearTimeout(this.timerHandle);
this.timerHandle = null;
}
}
}

export class AsyncQueue {
// The last promise in the queue.
private tail: Promise<AnyJs | void> = Promise.resolve();

// A list with timeout handles and their respective deferred promises.
// Contains an entry for each operation that is queued to run in the future
// (i.e. it has a delay that has not yet elapsed). Prior to cleanup, this list
// may also contain entries that have already been run (in which case `handle` is
// null).
// (i.e. it has a delay that has not yet elapsed).
private delayedOperations: Array<DelayedOperation<AnyJs>> = [];

// The number of operations that are queued to be run in the future (i.e. they
// have a delay that has not yet elapsed). Unlike `delayedOperations`, this
// is guaranteed to only contain operations that have not yet been run.
//
// Visible for testing.
delayedOperationsCount = 0;
// have a delay that has not yet elapsed). Used for testing.
get delayedOperationsCount() {
return this.delayedOperations.length;
}

// visible for testing
failure: Error;
Expand All @@ -55,47 +142,10 @@ export class AsyncQueue {
/**
* Adds a new operation to the queue. Returns a promise that will be resolved
* when the promise returned by the new operation is (with its value).
*
* Can optionally specify a delay (in milliseconds) to wait before queuing the
* operation.
*/
schedule<T>(op: () => Promise<T>, delay?: number): Promise<T> {
if (this.failure) {
fail(
'AsyncQueue is already failed: ' +
(this.failure.stack || this.failure.message)
);
}

if ((delay || 0) > 0) {
this.delayedOperationsCount++;
const delayedOp: DelayedOperation<T> = {
handle: null,
op,
deferred: new Deferred<T>()
};
delayedOp.handle = setTimeout(() => {
this.scheduleInternal(() => {
return delayedOp.op().then(result => {
delayedOp.deferred.resolve(result);
});
});
delayedOp.handle = null;

this.delayedOperationsCount--;
if (this.delayedOperationsCount === 0) {
this.delayedOperations = [];
}
}, delay);
this.delayedOperations.push(delayedOp);
return delayedOp.deferred.promise;
} else {
return this.scheduleInternal(op);
}
}

private scheduleInternal<T>(op: () => Promise<T>): Promise<T> {
this.tail = this.tail.then(() => {
schedule<T>(op: () => Promise<T>): Promise<T> {
this.verifyNotFailed();
const newTail = this.tail.then(() => {
this.operationInProgress = true;
return op()
.catch(error => {
Expand All @@ -118,11 +168,45 @@ export class AsyncQueue {
// and return the rejected Promise.
throw error;
})
.then(() => {
.then(result => {
this.operationInProgress = false;
return result;
});
});
return this.tail as AnyDuringMigration;
this.tail = newTail;
return newTail;
}

/**
* Schedules an operation to be run on the AsyncQueue once the specified
* `delayMs` has elapsed. The returned DelayedOperationResult can be
* used to cancel the operation prior to its running.
*/
scheduleWithDelay<T>(
op: () => Promise<T>,
delayMs: number
): CancelablePromise<T> {
this.verifyNotFailed();

const delayedOp = DelayedOperation.createAndSchedule(this, op, delayMs);
this.delayedOperations.push(delayedOp);

delayedOp.catch(err => {}).then(() => {
// NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small.
const index = this.delayedOperations.indexOf(delayedOp);
assert(index >= 0, 'Delayed operation not found.');
this.delayedOperations.slice(index, 1);
});
return delayedOp;
}

private verifyNotFailed(): void {
if (this.failure) {
fail(
'AsyncQueue is already failed: ' +
(this.failure.stack || this.failure.message)
);
}
}

/**
Expand All @@ -143,26 +227,13 @@ export class AsyncQueue {
* scheduled with a delay can be rejected or queued for immediate execution.
*/
drain(executeDelayedTasks: boolean): Promise<void> {
this.delayedOperations.forEach(entry => {
if (entry.handle) {
clearTimeout(entry.handle);
if (executeDelayedTasks) {
this.scheduleInternal(entry.op).then(
entry.deferred.resolve,
entry.deferred.reject
);
} else {
entry.deferred.reject(
new FirestoreError(
Code.CANCELLED,
'Operation cancelled by shutdown'
)
);
}
this.delayedOperations.forEach(delayedOp => {
if (executeDelayedTasks) {
delayedOp.skipDelay();
} else {
delayedOp.cancel('shutdown');
}
});
this.delayedOperations = [];
this.delayedOperationsCount = 0;
return this.schedule(() => Promise.resolve());
}
}
4 changes: 4 additions & 0 deletions packages/firestore/src/util/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export interface Rejecter {
(reason?: Error): void;
}

export interface CancelablePromise<T> extends Promise<T> {
cancel(): void;
}

export class Deferred<R> {
promise: Promise<R>;
resolve: Resolver<R>;
Expand Down

0 comments on commit a1e346f

Please sign in to comment.