Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor AsyncQueue's "delayed scheduling" support and add cancellation. #489

Merged
merged 3 commits into from
Feb 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { Connection, Stream } from './connection';
import { JsonProtoSerializer } from './serializer';
import { WatchChange } from './watch_change';
import { isNullOrUndefined } from '../util/types';
import { CancelablePromise } from '../util/promise';

const LOG_TAG = 'PersistentStream';

Expand Down Expand Up @@ -154,7 +155,7 @@ export abstract class PersistentStream<
ListenerType extends PersistentStreamListener
> {
private state: PersistentStreamState;
private idle = false;
private inactivityTimerPromise: CancelablePromise<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
Expand Down Expand Up @@ -245,16 +246,25 @@ export abstract class PersistentStream<
}

/**
* Initializes the idle timer. If no write takes place within one minute, the
* WebChannel stream will be closed.
* Marks this stream as idle. If no further actions are performed on the
* stream for one minute, the stream will automatically close itself and
* notify the stream's onClose() handler with Status.OK. The stream will then
* be in a !isStarted() state, requiring the caller to start the stream again
* before further use.
*
* Only streams that are in state 'Open' can be marked idle, as all other
* states imply pending network operations.
*/
markIdle(): void {
this.idle = true;
this.queue
.schedule(() => {
return this.handleIdleCloseTimer();
}, IDLE_TIMEOUT_MS)
.catch((err: FirestoreError) => {
// Starts the idle time if we are in state 'Open' and are not yet already
// running a timer (in which case the previous idle timeout still applies).
if (this.isOpen() && this.inactivityTimerPromise === null) {
this.inactivityTimerPromise = this.queue.scheduleWithDelay(
() => this.handleIdleCloseTimer(),
IDLE_TIMEOUT_MS
);

this.inactivityTimerPromise.catch((err: FirestoreError) => {
// When the AsyncQueue gets drained during testing, pending Promises
// (including these idle checks) will get rejected. We special-case
// these cancelled idle checks to make sure that these specific Promise
Expand All @@ -266,6 +276,7 @@ export abstract class PersistentStream<
}`
);
});
}
}

/** Sends a message to the underlying stream. */
Expand All @@ -276,7 +287,7 @@ export abstract class PersistentStream<

/** Called by the idle timer when the stream should close due to inactivity. */
private handleIdleCloseTimer(): Promise<void> {
if (this.isOpen() && this.idle) {
if (this.isOpen()) {
// When timing out an idle stream there's no reason to force the stream into backoff when
// it restarts so set the stream state to Initial instead of Error.
return this.close(PersistentStreamState.Initial);
Expand All @@ -286,7 +297,10 @@ export abstract class PersistentStream<

/** Marks the stream as active again. */
private cancelIdleCheck() {
this.idle = false;
if (this.inactivityTimerPromise) {
this.inactivityTimerPromise.cancel();
this.inactivityTimerPromise = null;
}
}

/**
Expand Down
221 changes: 146 additions & 75 deletions packages/firestore/src/util/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,120 @@
import { assert, fail } from './assert';
import * as log from './log';
import { AnyDuringMigration, AnyJs } from './misc';
import { Deferred } from './promise';
import { Deferred, CancelablePromise } from './promise';
import { Code, FirestoreError } from './error';

type DelayedOperation<T> = {
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
handle: any;
op: () => Promise<T>;
deferred: Deferred<T>;
};
// tslint:disable-next-line:no-any Accept any return type from setTimeout().
type TimerHandle = any;

/**
* Represents an operation scheduled to be run in the future on an AsyncQueue.
*
* It is created via DelayedOperation.createAndSchedule().
*
* Supports cancellation (via cancel()) and early execution (via skipDelay()).
*/
class DelayedOperation<T> implements CancelablePromise<T> {
// handle for use with clearTimeout(), or null if the operation has been
// executed or canceled already.
private timerHandle: TimerHandle | null;

private readonly deferred = new Deferred<T>();

private constructor(
private asyncQueue: AsyncQueue,
private op: () => Promise<T>
) {}

/**
* Creates and returns a DelayedOperation that has been scheduled to be
* executed on the provided asyncQueue after the provided delayMs.
*/
static createAndSchedule<T>(
asyncQueue: AsyncQueue,
op: () => Promise<T>,
delayMs: number
): DelayedOperation<T> {
const delayedOp = new DelayedOperation(asyncQueue, op);
delayedOp.start(delayMs);
return delayedOp;
}

/**
* Starts the timer. This is called immediately after construction by
* createAndSchedule().
*/
private start(delayMs: number): void {
this.timerHandle = setTimeout(() => this.handleDelayElapsed(), delayMs);
}

/**
* Queues the operation to run immediately (if it hasn't already been run or
* canceled).
*/
skipDelay(): void {
return this.handleDelayElapsed();
}

/**
* Cancels the operation if it hasn't already been executed or canceled. The
* promise will be rejected.
*
* As long as the operation has not yet been run, calling cancel() provides a
* guarantee that the operation will not be run.
*/
cancel(reason?: string): void {
if (this.timerHandle !== null) {
this.clearTimeout();
this.deferred.reject(
new FirestoreError(
Code.CANCELLED,
'Operation cancelled' + (reason ? ': ' + reason : '')
)
);
}
}

// Promise implementation.
readonly [Symbol.toStringTag]: 'Promise';
then = this.deferred.promise.then.bind(this.deferred.promise);
catch = this.deferred.promise.catch.bind(this.deferred.promise);

private handleDelayElapsed(): void {
this.asyncQueue.schedule(() => {
if (this.timerHandle !== null) {
this.clearTimeout();
return this.op().then(result => {
return this.deferred.resolve(result);
});
} else {
return Promise.resolve();
}
});
}

private clearTimeout() {
if (this.timerHandle) {
clearTimeout(this.timerHandle);
this.timerHandle = null;
}
}
}

export class AsyncQueue {
// The last promise in the queue.
private tail: Promise<AnyJs | void> = Promise.resolve();

// A list with timeout handles and their respective deferred promises.
// Contains an entry for each operation that is queued to run in the future
// (i.e. it has a delay that has not yet elapsed). Prior to cleanup, this list
// may also contain entries that have already been run (in which case `handle` is
// null).
// (i.e. it has a delay that has not yet elapsed).
private delayedOperations: Array<DelayedOperation<AnyJs>> = [];

// The number of operations that are queued to be run in the future (i.e. they
// have a delay that has not yet elapsed). Unlike `delayedOperations`, this
// is guaranteed to only contain operations that have not yet been run.
//
// Visible for testing.
delayedOperationsCount = 0;
// have a delay that has not yet elapsed). Used for testing.
get delayedOperationsCount() {
return this.delayedOperations.length;
}

// visible for testing
failure: Error;
Expand All @@ -55,47 +142,10 @@ export class AsyncQueue {
/**
* Adds a new operation to the queue. Returns a promise that will be resolved
* when the promise returned by the new operation is (with its value).
*
* Can optionally specify a delay (in milliseconds) to wait before queuing the
* operation.
*/
schedule<T>(op: () => Promise<T>, delay?: number): Promise<T> {
if (this.failure) {
fail(
'AsyncQueue is already failed: ' +
(this.failure.stack || this.failure.message)
);
}

if ((delay || 0) > 0) {
this.delayedOperationsCount++;
const delayedOp: DelayedOperation<T> = {
handle: null,
op,
deferred: new Deferred<T>()
};
delayedOp.handle = setTimeout(() => {
this.scheduleInternal(() => {
return delayedOp.op().then(result => {
delayedOp.deferred.resolve(result);
});
});
delayedOp.handle = null;

this.delayedOperationsCount--;
if (this.delayedOperationsCount === 0) {
this.delayedOperations = [];
}
}, delay);
this.delayedOperations.push(delayedOp);
return delayedOp.deferred.promise;
} else {
return this.scheduleInternal(op);
}
}

private scheduleInternal<T>(op: () => Promise<T>): Promise<T> {
this.tail = this.tail.then(() => {
schedule<T>(op: () => Promise<T>): Promise<T> {
this.verifyNotFailed();
const newTail = this.tail.then(() => {
this.operationInProgress = true;
return op()
.catch(error => {
Expand All @@ -118,11 +168,45 @@ export class AsyncQueue {
// and return the rejected Promise.
throw error;
})
.then(() => {
.then(result => {
this.operationInProgress = false;
return result;
});
});
return this.tail as AnyDuringMigration;
this.tail = newTail;
return newTail;
}

/**
* Schedules an operation to be run on the AsyncQueue once the specified
* `delayMs` has elapsed. The returned DelayedOperationResult can be
* used to cancel the operation prior to its running.
*/
scheduleWithDelay<T>(
op: () => Promise<T>,
delayMs: number
): CancelablePromise<T> {
this.verifyNotFailed();

const delayedOp = DelayedOperation.createAndSchedule(this, op, delayMs);
this.delayedOperations.push(delayedOp);

delayedOp.catch(err => {}).then(() => {
// NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small.
const index = this.delayedOperations.indexOf(delayedOp);
assert(index >= 0, 'Delayed operation not found.');
this.delayedOperations.slice(index, 1);
});
return delayedOp;
}

private verifyNotFailed(): void {
if (this.failure) {
fail(
'AsyncQueue is already failed: ' +
(this.failure.stack || this.failure.message)
);
}
}

/**
Expand All @@ -143,26 +227,13 @@ export class AsyncQueue {
* scheduled with a delay can be rejected or queued for immediate execution.
*/
drain(executeDelayedTasks: boolean): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a future cleanup, it seems like we could now get rid of this method. Each caller could invalidate its tasks when we call .shutdown().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how that works with the executeDelayedTasks codepath... which I think we'll actually want to extend to be able to test timeouts better...

this.delayedOperations.forEach(entry => {
if (entry.handle) {
clearTimeout(entry.handle);
if (executeDelayedTasks) {
this.scheduleInternal(entry.op).then(
entry.deferred.resolve,
entry.deferred.reject
);
} else {
entry.deferred.reject(
new FirestoreError(
Code.CANCELLED,
'Operation cancelled by shutdown'
)
);
}
this.delayedOperations.forEach(delayedOp => {
if (executeDelayedTasks) {
delayedOp.skipDelay();
} else {
delayedOp.cancel('shutdown');
}
});
this.delayedOperations = [];
this.delayedOperationsCount = 0;
return this.schedule(() => Promise.resolve());
}
}
4 changes: 4 additions & 0 deletions packages/firestore/src/util/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export interface Rejecter {
(reason?: Error): void;
}

export interface CancelablePromise<T> extends Promise<T> {
cancel(): void;
}

export class Deferred<R> {
promise: Promise<R>;
resolve: Resolver<R>;
Expand Down