Skip to content

Commit

Permalink
Minor refactoring and more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Dec 7, 2024
1 parent 4149e90 commit 0c71374
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions src/localQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ export class LocalQueue {
* Initialized to `true` so on error we don't enable refetch delay.
*/
let refetchDelayThresholdSurpassed = true;
/** How many jobs did we fetch? (Initialize to zero in case of error.) */
let jobCount = 0;
const refetchDelayOptions =
this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay;
Expand Down Expand Up @@ -592,7 +593,7 @@ export class LocalQueue {
this.withPgClient,
this.tasks,
this.workerPool.id,
null,
null, // `flagsToSkip` is not set, see `LocalQueue.getJob`
this.getJobBatchSize,
);

Expand All @@ -609,7 +610,7 @@ export class LocalQueue {
// If refetch delay is disabled, we've met the requirement
!refetchDelayOptions ||
// If we fetched more than (**not** equal to) `threshold` jobs, we've met the requirement
jobCount > Math.floor(refetchDelayOptions.threshold ?? 0);
jobCount > (refetchDelayOptions.threshold ?? 0);

// NOTE: we don't need to handle `this.mode === RELEASED` here because
// being in that mode guarantees the workerQueue is empty.
Expand Down Expand Up @@ -696,7 +697,11 @@ export class LocalQueue {
this.refetchDelayTimer = null;
}
this.refetchDelayActive = false;

if (aborted) {
// Force refetch because we've been notified of so many jobs!
this.refetchDelayFetchOnComplete = true;

this.compiledSharedOptions.events.emit("localQueue:refetchDelay:abort", {
localQueue: this,
count: this.refetchDelayCounter,
Expand All @@ -713,7 +718,7 @@ export class LocalQueue {

if (this.mode === POLLING && this.refetchDelayFetchOnComplete) {
// Cancel poll, do now
if (this.fetchTimer) {
if (this.fetchTimer != null) {
clearTimeout(this.fetchTimer);
this.fetchTimer = null;
}
Expand All @@ -730,7 +735,6 @@ export class LocalQueue {
return false;
}
if (this.refetchDelayCounter >= this.refetchDelayAbortThreshold) {
this.refetchDelayFetchOnComplete = true;
this.refetchDelayCompleteOrAbort(true);
}
return true;
Expand All @@ -754,14 +758,16 @@ export class LocalQueue {
}
}

// If you refactor this to be a method rather than a property, make sure that you `.bind(this)` to it.
// If you refactor this to be a method rather than a property, make sure that
// you `.bind(this)` to it.
public getJob: GetJobFunction = (workerId, flagsToSkip) => {
if (this.mode === RELEASED) {
return undefined;
}

// Cannot batch if there's flags
if (flagsToSkip !== null) {
// PERF: we could actually batch for similar flags, I guess.
const jobsPromise = batchGetJobs(
this.compiledSharedOptions,
this.withPgClient,
Expand Down Expand Up @@ -829,7 +835,7 @@ export class LocalQueue {
break;
}
case WAITING: {
if (this.ttlExpiredTimer) {
if (this.ttlExpiredTimer != null) {
clearTimeout(this.ttlExpiredTimer);
this.ttlExpiredTimer = null;
}
Expand All @@ -839,7 +845,7 @@ export class LocalQueue {
break;
}
case TTL_EXPIRED: {
// No action necessary, jobs are already returned
// No action necessary, jobs are already returned, no jobs, no pending workers
break;
}
case STARTING: {
Expand Down

0 comments on commit 0c71374

Please sign in to comment.