Skip to content

Commit

Permalink
Shuffle tasks in the process state when executing, and sort them when…
Browse files Browse the repository at this point in the history
… serializing
  • Loading branch information
aliok committed Dec 30, 2023
1 parent f07e3aa commit 43f7434
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
16 changes: 15 additions & 1 deletion src/subcommand/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {ProcessFileHelper} from "../processFileHelper.js";
import {ErroredTask, ResolvedTask, TaskQueue} from "../queue/taskqueue.js";

import {SubCommand} from "../subcommand.js";
import {formatTimeStamp, now as getNow, nowTimestamp} from "../utils.js";
import {formatTimeStamp, now as getNow, nowTimestamp, shuffleDictionary, sortByKey} from "../utils.js";

type BaseResultType = any;
type BaseTaskSpec = TaskSpec;
Expand Down Expand Up @@ -173,6 +173,12 @@ export async function start(argv:Args) {
logger.info(`Checking if the errored tasks should be retried, according to retry count (${argv.retryCount}).`)
addErroredToUnresolved(logger, taskStore.errored, taskStore.unresolved, argv.retryCount);

// shuffle the unresolved tasks, so that we don't hit the same rate limit for each run
// otherwise, the queue will start with the same tasks each time.
// and if the queue is aborted, and restarted, it will start with the same tasks again.
// this will result in hitting the same rate limit again and again.
taskStore.unresolved = processState.unresolved = shuffleDictionary(taskStore.unresolved);

// now add the unresolved tasks to the queue
initializeQueue(taskQueue, taskStore.unresolved, context, command);

Expand Down Expand Up @@ -432,6 +438,7 @@ function saveProcessRunOutput(logger:winston.Logger, processFileHelper:ProcessFi
const processStateFilePath = processFileHelper.getProcessStateFilePath(processStateDir);

logger.info(`Writing process state to file: ${processStateFilePath}`);
sortProcessState(processState);
writeFileSync(processStateFilePath, JSON.stringify(processState, null, 2));

const now = nowFn();
Expand All @@ -457,6 +464,13 @@ function saveProcessRunOutput(logger:winston.Logger, processFileHelper:ProcessFi
outputStream.end();
}

export function sortProcessState(processState:ProcessState) {
processState.unresolved = sortByKey(processState.unresolved);
processState.resolved = sortByKey(processState.resolved);
processState.errored = sortByKey(processState.errored);
processState.archived = sortByKey(processState.archived);
}

const REQUIRED_OPTIONS_GROUP = "Required options";
type Args = GetBuiltOptionsType<typeof doAddArguments>;

Expand Down
3 changes: 2 additions & 1 deletion src/subcommand/requeueTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {GetBuiltOptionsType} from "../arguments.js";
import * as log from "../log.js";
import {ProcessFileHelper} from "../processFileHelper.js";
import {SubCommand} from "../subcommand.js";
import {ProcessState} from "./execute.js";
import {ProcessState, sortProcessState} from "./execute.js";

export const CommandDefinition:SubCommand = {
commandName: "requeue-tasks",
Expand Down Expand Up @@ -49,6 +49,7 @@ export async function start(argv:Args) {

processState.completionDate = null;
processState.completionError = null;
sortProcessState(processState);
writeFileSync(processStateFilePath, JSON.stringify(processState, null, 2));
}

Expand Down
18 changes: 18 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "date-fns";

import {format as doFormatDateTz} from "date-fns-tz";
import lodash from "lodash";

export function formatDate(d:Date):string {
return doFormatDate(d, "yyyy-MM-dd");
Expand Down Expand Up @@ -181,3 +182,20 @@ export function getRandomInt(min:number, max:number, randGenerator?:() => number
max = Math.floor(max);
return Math.floor(rand * (max - min + 1)) + min;
}

export function sortByKey<T>(dict:{ [key:string]:T }) {
const sorted:{ [key:string]:T } = {};
Object.keys(dict).sort().forEach(function (key) {
sorted[key] = dict[key];
});
return sorted;
}

export function shuffleDictionary<T>(dict:{ [key:string]:T }) {
const keys = lodash.shuffle(Object.keys(dict));
const shuffled:{ [key:string]:T } = {};
for (const key of keys) {
shuffled[key] = dict[key];
}
return shuffled;
}

0 comments on commit 43f7434

Please sign in to comment.