Skip to content

Commit

Permalink
Prioritize finishing waited runs (#1375)
Browse files Browse the repository at this point in the history
* If a tree node is missing, estimate the size as zero

* Task to test prioritizing finishing existing runs after triggerAndWaits

* When requeuing a run with a checkpoint, put it in the queue with the parent run time so it’s correctly prioritized

* The same change but if there’s no checkpoint
  • Loading branch information
matt-aitken authored Oct 2, 2024
1 parent ceabfba commit 0bf500f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
6 changes: 4 additions & 2 deletions apps/webapp/app/components/primitives/TreeView/TreeView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,11 @@ export function useTree<TData, TFilterValue>({
getItemKey: (index) => state.visibleNodeIds[index],
getScrollElement: () => parentRef.current,
estimateSize: (index: number) => {
const treeItem = tree[index];
if (!treeItem) return 0;
return estimatedRowHeight({
node: tree[index],
state: state.nodes[tree[index].id],
node: treeItem,
state: state.nodes[treeItem.id],
index,
});
},
Expand Down
27 changes: 16 additions & 11 deletions apps/webapp/app/v3/services/resumeTaskDependency.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ export class ResumeTaskDependencyService extends BaseService {
environmentId: dependency.taskRun.runtimeEnvironment.id,
environmentType: dependency.taskRun.runtimeEnvironment.type,
},
dependentRun.concurrencyKey ?? undefined
dependentRun.concurrencyKey ?? undefined,
dependentRun.createdAt.getTime()
);
} else {
logger.debug("Task dependency resume: Attempt is not paused or there's no checkpoint event", {
Expand All @@ -84,16 +85,20 @@ export class ResumeTaskDependencyService extends BaseService {
return;
}

await marqs?.replaceMessage(dependentRun.id, {
type: "RESUME",
completedAttemptIds: [sourceTaskAttemptId],
resumableAttemptId: dependency.dependentAttempt.id,
checkpointEventId: dependency.checkpointEventId ?? undefined,
taskIdentifier: dependency.taskRun.taskIdentifier,
projectId: dependency.taskRun.runtimeEnvironment.projectId,
environmentId: dependency.taskRun.runtimeEnvironment.id,
environmentType: dependency.taskRun.runtimeEnvironment.type,
});
await marqs?.replaceMessage(
dependentRun.id,
{
type: "RESUME",
completedAttemptIds: [sourceTaskAttemptId],
resumableAttemptId: dependency.dependentAttempt.id,
checkpointEventId: dependency.checkpointEventId ?? undefined,
taskIdentifier: dependency.taskRun.taskIdentifier,
projectId: dependency.taskRun.runtimeEnvironment.projectId,
environmentId: dependency.taskRun.runtimeEnvironment.id,
environmentType: dependency.taskRun.runtimeEnvironment.type,
},
dependentRun.createdAt.getTime()
);
}
}

Expand Down
27 changes: 27 additions & 0 deletions references/hello-world/src/trigger/prioritize-continuing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { logger, task, wait } from "@trigger.dev/sdk/v3";

export const prioritizeContinuing = task({
id: "prioritize-continuing",
run: async ({ count }: { count: number }) => {
await prioritizeContinuingChild.batchTrigger(
Array.from({ length: count }, (_, i) => ({ payload: {} as any }))
);
},
});

export const prioritizeContinuingChild = task({
id: "prioritize-continuing-child",
queue: {
concurrencyLimit: 1,
},
run: async () => {
await fixedLengthTask.triggerAndWait({ waitSeconds: 1 });
},
});

export const fixedLengthTask = task({
id: "fixedLengthTask",
run: async ({ waitSeconds }: { waitSeconds: number }) => {
await new Promise((resolve) => setTimeout(resolve, waitSeconds * 1000));
},
});

0 comments on commit 0bf500f

Please sign in to comment.