-
Notifications
You must be signed in to change notification settings - Fork 909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor AsyncQueue's "delayed scheduling" support and add cancellation. #489
Conversation
* Introduces a DelayedOperation helper class in AsyncQueue to encapsulate delayed op logic. * Adds cancellation support which I want to use in #412 * Remove delayedOperationsCount in favor of keeping delayedOperations populated correctly at all times. * Fixes a preexisting issue in AsyncQueue.schedule() where the returned promise would always be resolved with undefined, instead of the result of your op. Also remove an AnyDuringMigration usage.
@@ -251,10 +251,10 @@ export abstract class PersistentStream< | |||
markIdle(): void { | |||
this.idle = true; | |||
this.queue | |||
.schedule(() => { | |||
.scheduleWithDelay(() => { | |||
return this.handleIdleCloseTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handleIdleCloseTimer
has its own logic to prevent itself from running. Do you think you could migrate this code to use this new functionality? We already cancel the timer in Android: https://cs.corp.google.com/piper///depot/google3/java/com/google/android/gmscore/dev/thick_client/firebase-firestore/src/com/google/firebase/firestore/remote/AbstractStream.java?rcl=184463491&l=362
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I noticed that. I was hoping to get away without growing this PR, but sure. :-) Please double-check that I did it right. I basically ported the Android logic.
return this.handleIdleCloseTimer(); | ||
}, IDLE_TIMEOUT_MS) | ||
.catch((err: FirestoreError) => { | ||
.promise.catch((err: FirestoreError) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to do something along the lines of https://github.com/alkemics/CancelablePromise/blob/master/CancelablePromise.js?
We might just want to expose .catch/.then, and keep .promise to make more advanced operations possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, done.
type DelayedOperation<T> = { | ||
/** External Result of scheduling a delayed operation. */ | ||
export interface DelayedOperationResult<T> { | ||
/** A promise that will resolve once the operation has been run. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and will get rejected when it is cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this went away.
private timerHandle: any; | ||
private readonly deferred = new Deferred<T>(); | ||
/** true if the operation has not been executed or cancelled yet. */ | ||
private pending = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separation between pending
and timerHandle
reads nicer (because of the variable names), but isn't strictly necessary. We could just rely on timerHandle and check whether it is null or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I intentionally didn't do that, but I don't feel strongly, so I dropped 'pending'.
} | ||
} | ||
|
||
private runIfNecessary(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to use runIfPending
to match the names in the rest of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged this method into scheduleNow (which got renamed to handleDelayElapesd)
const index = this.delayedOperations.push(delayedOp); | ||
|
||
delayedOp.promise.catch(err => {}).then(() => { | ||
this.delayedOperations.slice(index, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right. The index might have been changed if another promise got fulfilled before this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeesh. :-( Good catch, thank you!
@@ -143,26 +210,14 @@ export class AsyncQueue { | |||
* scheduled with a delay can be rejected or queued for immediate execution. | |||
*/ | |||
drain(executeDelayedTasks: boolean): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a future cleanup, it seems like we could now get rid of this method. Each caller could invalidate its tasks when we call .shutdown().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how that works with the executeDelayedTasks codepath... which I think we'll actually want to extend to be able to test timeouts better...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes my brain hurt. I've tried to leave constructive feedback on how you might consider making fewer callbacks but I'm not sure that advice would actually work. Take it with a grain of salt.
@@ -20,30 +20,103 @@ import { AnyDuringMigration, AnyJs } from './misc'; | |||
import { Deferred } from './promise'; | |||
import { Code, FirestoreError } from './error'; | |||
|
|||
type DelayedOperation<T> = { | |||
/** External Result of scheduling a delayed operation. */ | |||
export interface DelayedOperationResult<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to split the interface and the class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes (the interface is exposed outside AsyncQueue, while the implementation is not), but this went away in favor of Sebastian's CancelablePromise suggestion.
/** true if the operation has not been executed or cancelled yet. */ | ||
private pending = true; | ||
|
||
static createAndSchedule<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unnecessarily circuitous:
- scheduleWithDelay calls
- createAndSchedule
- which after a while calls scheduleNow(asyncQueue)
- which calls asyncQueue.schedule
It mixes state in closures and in the classes in ways that are pretty confusing.
Relatedly you have createAndSchedule
and scheduleNow
as two vastly different operations which just from their names I would surmise as having similar behavior.
I'd suggest the following changes:
- Make the asyncQueue explicitly a member of DelayedOperation
- Rename scheduleNow to conflict less with createAndSchedule. Maybe submit or resume?
- I'd consider just splitting creation of the op from the deferred scheduling so that you can get rid of createAndSchedule altogether. New up your DelayedOperation and then call delay on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Made
asyncQueue
a member. - Renamed
scheduleNow()
to a privatehandleDelayElapsed()
, and added a newskipDelay()
method. - I'm inclined to keep
createAndSchedule()
as it prevents the DelayedOperation from being in a created-but-not-yet-scheduled state. As-is,timerHandle
is initially defined and then becomes null once the op has been executed or canceled. I'd rather not have it go null=>defined=>null with null meaning two different things. The other option is to have the constructor automatically schedule the op, but I don't really like constructors with side-effects.
By the way, part of the original naming problem is that AsyncQueue.schedule()
is a bad name. It should be AsyncQueue.enqueue()
, and then scheduleNow()
could be enqueueNow()
. But I don't want to do that in this PR.
* already been run or cancelled. | ||
*/ | ||
scheduleNow(asyncQueue: AsyncQueue): void { | ||
this.clearTimeout(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be harmful to call clearTimeout
inside runIfNecessary
?
As it stands you have three different callbacks here:
- The lambda that fires when
setTimeout
calls you back - Which calls
scheduleNow
- Which calls
schedule
withrunIfNecessary.bind(this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. And I merged runIfNecessary into this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL.
@@ -251,10 +251,10 @@ export abstract class PersistentStream< | |||
markIdle(): void { | |||
this.idle = true; | |||
this.queue | |||
.schedule(() => { | |||
.scheduleWithDelay(() => { | |||
return this.handleIdleCloseTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I noticed that. I was hoping to get away without growing this PR, but sure. :-) Please double-check that I did it right. I basically ported the Android logic.
return this.handleIdleCloseTimer(); | ||
}, IDLE_TIMEOUT_MS) | ||
.catch((err: FirestoreError) => { | ||
.promise.catch((err: FirestoreError) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, done.
@@ -20,30 +20,103 @@ import { AnyDuringMigration, AnyJs } from './misc'; | |||
import { Deferred } from './promise'; | |||
import { Code, FirestoreError } from './error'; | |||
|
|||
type DelayedOperation<T> = { | |||
/** External Result of scheduling a delayed operation. */ | |||
export interface DelayedOperationResult<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes (the interface is exposed outside AsyncQueue, while the implementation is not), but this went away in favor of Sebastian's CancelablePromise suggestion.
type DelayedOperation<T> = { | ||
/** External Result of scheduling a delayed operation. */ | ||
export interface DelayedOperationResult<T> { | ||
/** A promise that will resolve once the operation has been run. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this went away.
private timerHandle: any; | ||
private readonly deferred = new Deferred<T>(); | ||
/** true if the operation has not been executed or cancelled yet. */ | ||
private pending = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I intentionally didn't do that, but I don't feel strongly, so I dropped 'pending'.
/** true if the operation has not been executed or cancelled yet. */ | ||
private pending = true; | ||
|
||
static createAndSchedule<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Made
asyncQueue
a member. - Renamed
scheduleNow()
to a privatehandleDelayElapsed()
, and added a newskipDelay()
method. - I'm inclined to keep
createAndSchedule()
as it prevents the DelayedOperation from being in a created-but-not-yet-scheduled state. As-is,timerHandle
is initially defined and then becomes null once the op has been executed or canceled. I'd rather not have it go null=>defined=>null with null meaning two different things. The other option is to have the constructor automatically schedule the op, but I don't really like constructors with side-effects.
By the way, part of the original naming problem is that AsyncQueue.schedule()
is a bad name. It should be AsyncQueue.enqueue()
, and then scheduleNow()
could be enqueueNow()
. But I don't want to do that in this PR.
* already been run or cancelled. | ||
*/ | ||
scheduleNow(asyncQueue: AsyncQueue): void { | ||
this.clearTimeout(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. And I merged runIfNecessary into this method.
} | ||
} | ||
|
||
private runIfNecessary(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged this method into scheduleNow (which got renamed to handleDelayElapesd)
@@ -143,26 +210,14 @@ export class AsyncQueue { | |||
* scheduled with a delay can be rejected or queued for immediate execution. | |||
*/ | |||
drain(executeDelayedTasks: boolean): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how that works with the executeDelayedTasks codepath... which I think we'll actually want to extend to be able to test timeouts better...
const index = this.delayedOperations.push(delayedOp); | ||
|
||
delayedOp.promise.catch(err => {}).then(() => { | ||
this.delayedOperations.slice(index, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeesh. :-( Good catch, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't seem to comment on Gil's suggestion and your response (maybe because it is now marked as outdated), but what do you think of this: https://gist.github.com/schmidt-sebastian/69aecafd8adffa87d369926dacf070f5
- It gets rid of the asyncQueue from the DelayedOperation (this is the main cleanup)
- It merges schedule and scheduleWithDelay()... but you might actually want to separate them, so please ignore if you like.
- It creates two runImmediately methods on both classes that both essentially do the same thing.
asyncQueue: AsyncQueue, | ||
op: () => Promise<T>, | ||
delayMs: number | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the return type here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks. Done. And opened a bug to make tslint check this for us: b/72988931
return this.handleIdleCloseTimer(); | ||
}, IDLE_TIMEOUT_MS) | ||
.catch((err: FirestoreError) => { | ||
if (this.isOpen() && this.inactivityTimerPromise === null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well copy the remaining comment as well:
// Starts the idle time if we are in state 'Open' and are not yet already running a timer (in
// which case the previous idle timeout still applies).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a stab at reworking this! I borrowed your start() helper method, as I think that's cleaner.
But the core of your change breaks a guarantee I'm trying to provide which is that if you call cancel() then the operation won't be run. With your change that's no longer true because it just cancels it from getting added to the AsyncQueue, and so if the timer has already elapsed (so it's been added to the AsyncQueue) and you call cancel(), the cancel() won't do anything and the operation will still end up being run.
Along those lines, "runImmediately()" is misleading since it only queues the operation and there may be lots of prior operations ahead of it... so it may not be immediate at all.
return this.handleIdleCloseTimer(); | ||
}, IDLE_TIMEOUT_MS) | ||
.catch((err: FirestoreError) => { | ||
if (this.isOpen() && this.inactivityTimerPromise === null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops, thanks!
asyncQueue: AsyncQueue, | ||
op: () => Promise<T>, | ||
delayMs: number | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks. Done. And opened a bug to make tslint check this for us: b/72988931
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
delayed op logic.
Add timeout to OnlineState tracking. #412
correctly at all times.
would always be resolved with undefined, instead of the result of your op.
Also remove an AnyDuringMigration usage.