Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically reattempt after internal errors #1424

Merged
merged 49 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7bfaf74
refactor finalize run service
nicktrn Oct 14, 2024
e81d5bc
refactor complete attempt service
nicktrn Oct 14, 2024
55cb6a9
remove separate graceful exit handling
nicktrn Oct 14, 2024
8699d6b
refactor task status helpers
nicktrn Oct 14, 2024
0c37df7
clearly separate statuses in prisma schema
nicktrn Oct 14, 2024
f710326
all non-final statuses should be failable
nicktrn Oct 14, 2024
0b39a4c
new import payload error code
nicktrn Oct 15, 2024
f92d82f
store default retry config if none set on task
nicktrn Oct 15, 2024
9350770
failed run service now respects retries
nicktrn Oct 15, 2024
9f6696d
fix merged task retry config indexing
nicktrn Oct 15, 2024
aee555a
some errors should never be retried
nicktrn Oct 15, 2024
008de3b
finalize run service takes care of acks now
nicktrn Oct 15, 2024
fc59ce2
execution payload helper now with single object arg
nicktrn Oct 15, 2024
ccda672
internal error code enum export
nicktrn Oct 21, 2024
78ef834
unify failed and crashed run retries
nicktrn Oct 21, 2024
cba5885
Prevent uncaught socket ack exceptions (#1415)
nicktrn Oct 16, 2024
3ad51f2
New onboarding question (#1404)
samejr Oct 17, 2024
3eb2f97
Added a new Context page for the docs (#1416)
samejr Oct 17, 2024
0839824
Fix updating many environment variables at once (#1413)
yadavshubham01 Oct 17, 2024
43085ff
Move code example to the side menu
samejr Oct 17, 2024
8e4acc4
New docs example for creating a HN email summary
samejr Oct 17, 2024
45a34f9
doc: add instructions to create new reference project and run it loca…
tarunps Oct 18, 2024
1ee3860
Fix several restore and resume bugs (#1418)
nicktrn Oct 18, 2024
cb50c67
chore: Update version for release (#1410)
github-actions[bot] Oct 18, 2024
339f4ac
Release 3.0.13
nicktrn Oct 18, 2024
0921733
capture ffmpeg oom errors
nicktrn Oct 19, 2024
2e8bc83
respect maxAttempts=1 when failing before first attempt creation
nicktrn Oct 20, 2024
53551e0
request worker exit on fatal errors
nicktrn Oct 20, 2024
8111a9a
fix error code merge
nicktrn Oct 21, 2024
c5813c4
add new error code to should retry
nicktrn Oct 21, 2024
d3719b1
pretty segfault errors
nicktrn Oct 22, 2024
5df1807
pretty internal errors for attempt spans
nicktrn Oct 22, 2024
757cc21
decrease oom false positives
nicktrn Oct 22, 2024
38f4176
fix timeline event color for failed runs
nicktrn Oct 22, 2024
f3629af
auto-retry packet import and export
nicktrn Oct 22, 2024
2ae2a7c
add sdk version check and complete event while completing attempt
nicktrn Oct 22, 2024
eed71f9
all internal errors become crashes by default
nicktrn Oct 22, 2024
78a3d43
Merge remote-tracking branch 'origin/main' into feat/auto-reattempt
nicktrn Oct 22, 2024
8bb7861
use pretty error helpers exclusively
nicktrn Oct 22, 2024
a2293b6
error to debug log
nicktrn Oct 22, 2024
ab86276
zodfetch fixes
nicktrn Oct 22, 2024
56095a8
rename import payload to task input error
nicktrn Oct 22, 2024
0e1b21c
fix true non-zero exit error display
nicktrn Oct 22, 2024
d1c38de
fix retry config parsing
nicktrn Oct 22, 2024
cd8dffa
correctly mark crashes as crashed
nicktrn Oct 22, 2024
25a7578
add changeset
nicktrn Oct 23, 2024
6e1d4db
Merge remote-tracking branch 'origin/main' into feat/retry-internal-e…
nicktrn Oct 24, 2024
896edf1
remove non-zero exit comment
nicktrn Oct 24, 2024
d7b246e
pretend we don't support default default retry configs yet
nicktrn Oct 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/webapp/app/components/runs/v3/RunInspector.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import {
} from "~/utils/pathBuilder";
import { TraceSpan } from "~/utils/taskEvent";
import { SpanLink } from "~/v3/eventRepository.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline";
import { RunTag } from "./RunTag";
import { TaskRunStatusCombo } from "./TaskRunStatus";
Expand Down Expand Up @@ -479,6 +479,7 @@ function RunTimeline({ run }: { run: RawRun }) {
const updatedAt = new Date(run.updatedAt);

const isFinished = isFinalRunStatus(run.status);
const isError = isFailedRunStatus(run.status);

return (
<div className="min-w-fit max-w-80">
Expand Down Expand Up @@ -535,7 +536,7 @@ function RunTimeline({ run }: { run: RawRun }) {
<RunTimelineEvent
title="Finished"
subtitle={<DateTimeAccurate date={updatedAt} />}
state="complete"
state={isError ? "error" : "complete"}
/>
</>
) : (
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { eventRepository } from "~/v3/eventRepository.server";
import { machinePresetFromName } from "~/v3/machinePresets.server";
import { FINAL_ATTEMPT_STATUSES, isFinalRunStatus } from "~/v3/taskStatus";
import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { getMaxDuration } from "~/v3/utils/maxDuration";

Expand Down Expand Up @@ -294,6 +294,7 @@ export class SpanPresenter extends BasePresenter {
usageDurationMs: run.usageDurationMs,
isFinished,
isRunning: RUNNING_STATUSES.includes(run.status),
isError: isFailedRunStatus(run.status),
payload,
payloadType: run.payloadType,
output,
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
const service = new CreateTaskRunAttemptService();

try {
const { execution } = await service.call(runParam, authenticationResult.environment);
const { execution } = await service.call({
runId: runParam,
authenticatedEnv: authenticationResult.environment,
});

return json(execution, { status: 200 });
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ function RunTimeline({ run }: { run: SpanRun }) {
<RunTimelineEvent
title="Finished"
subtitle={<DateTimeAccurate date={run.updatedAt} />}
state="complete"
state={run.isError ? "error" : "complete"}
/>
</>
) : (
Expand Down
268 changes: 239 additions & 29 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
import { sanitizeError, TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
import {
calculateNextRetryDelay,
RetryOptions,
TaskRunExecution,
TaskRunExecutionRetry,
TaskRunFailedExecutionResult,
} from "@trigger.dev/core/v3";
import { logger } from "~/services/logger.server";
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
import { BaseService } from "./services/baseService.server";
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
import { FAILABLE_RUN_STATUSES } from "./taskStatus";
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
import type { Prisma, TaskRun } from "@trigger.dev/database";
import { CompleteAttemptService } from "./services/completeAttempt.server";
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
import * as semver from "semver";

const includeAttempts = {
attempts: {
orderBy: {
createdAt: "desc",
},
take: 1,
},
lockedBy: true, // task
lockedToVersion: true, // worker
} satisfies Prisma.TaskRunInclude;

type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{
include: typeof includeAttempts;
}>;

export class FailedTaskRunService extends BaseService {
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
logger.debug("[FailedTaskRunService] Handling failed task run", { anyRunId, completion });

const isFriendlyId = anyRunId.startsWith("run_");

const taskRun = await this._prisma.taskRun.findUnique({
const taskRun = await this._prisma.taskRun.findFirst({
where: {
friendlyId: isFriendlyId ? anyRunId : undefined,
id: !isFriendlyId ? anyRunId : undefined,
Expand All @@ -25,7 +51,7 @@ export class FailedTaskRunService extends BaseService {
return;
}

if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) {
if (!isFailableRunStatus(taskRun.status)) {
logger.error("[FailedTaskRunService] Task run is not in a failable state", {
taskRun,
completion,
Expand All @@ -34,33 +60,217 @@ export class FailedTaskRunService extends BaseService {
return;
}

// No more retries, we need to fail the task run
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });
const retryHelper = new FailedTaskRunRetryHelper(this._prisma);
const retryResult = await retryHelper.call({
runId: taskRun.id,
completion,
});

const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: taskRun.id,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
attemptStatus: "FAILED",
error: sanitizeError(completion.error),
logger.debug("[FailedTaskRunService] Completion result", {
runId: taskRun.id,
result: retryResult,
});
}
}

interface TaskRunWithWorker extends TaskRun {
lockedBy: { retryConfig: Prisma.JsonValue } | null;
lockedToVersion: { sdkVersion: string } | null;
}
nicktrn marked this conversation as resolved.
Show resolved Hide resolved

// Now we need to "complete" the task run event/span
await eventRepository.completeEvent(taskRun.spanId, {
endTime: new Date(),
attributes: {
isError: true,
export class FailedTaskRunRetryHelper extends BaseService {
async call({
runId,
completion,
isCrash,
}: {
runId: string;
completion: TaskRunFailedExecutionResult;
isCrash?: boolean;
}) {
const taskRun = await this._prisma.taskRun.findFirst({
where: {
id: runId,
},
events: [
{
name: "exception",
time: new Date(),
properties: {
exception: createExceptionPropertiesFromError(completion.error),
},
},
],
include: includeAttempts,
});

if (!taskRun) {
logger.error("[FailedTaskRunRetryHelper] Task run not found", {
runId,
completion,
});

return "NO_TASK_RUN";
}

const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion);

if (!retriableExecution) {
return "NO_EXECUTION";
}

logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion });

const executionRetry =
completion.retry ??
(await FailedTaskRunRetryHelper.getExecutionRetry({
run: taskRun,
execution: retriableExecution,
}));

const completeAttempt = new CompleteAttemptService(this._prisma);
const completeResult = await completeAttempt.call({
completion: {
...completion,
retry: executionRetry,
},
execution: retriableExecution,
isSystemFailure: !isCrash,
isCrash,
});

return completeResult;
}

async #getRetriableAttemptExecution(
run: TaskRunWithAttempts,
completion: TaskRunFailedExecutionResult
): Promise<TaskRunExecution | undefined> {
nicktrn marked this conversation as resolved.
Show resolved Hide resolved
let attempt = run.attempts[0];

// We need to create an attempt if:
// - None exists yet
// - The last attempt has a final status, e.g. we failed between attempts
if (!attempt || isFinalAttemptStatus(attempt.status)) {
logger.error("[FailedTaskRunRetryHelper] No attempts found", {
run,
completion,
});
nicktrn marked this conversation as resolved.
Show resolved Hide resolved

const createAttempt = new CreateTaskRunAttemptService(this._prisma);

try {
const { execution } = await createAttempt.call({
runId: run.id,
// This ensures we correctly respect `maxAttempts = 1` when failing before the first attempt was created
startAtZero: true,
});
return execution;
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to create attempt", {
run,
completion,
error,
});

return;
}
}

// We already have an attempt with non-final status, let's use it
try {
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
id: attempt.id,
skipStatusChecks: true,
});

return executionPayload?.execution;
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution payload", {
run,
completion,
error,
});

return;
}
}

static async getExecutionRetry({
run,
execution,
}: {
run: TaskRunWithWorker;
execution: TaskRunExecution;
}): Promise<TaskRunExecutionRetry | undefined> {
try {
const retryConfig = run.lockedBy?.retryConfig;

if (!retryConfig) {
if (!run.lockedToVersion) {
logger.error("[FailedTaskRunRetryHelper] Run not locked to version", {
run,
execution,
});

return;
}

const sdkVersion = run.lockedToVersion.sdkVersion ?? "0.0.0";
const isValid = semver.valid(sdkVersion);

if (!isValid) {
logger.error("[FailedTaskRunRetryHelper] Invalid SDK version", {
run,
execution,
});

return;
}
Comment on lines +210 to +220
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential null values for run.lockedToVersion to prevent runtime errors.

In the line where sdkVersion is assigned, run.lockedToVersion might be null, which would cause a runtime error when accessing sdkVersion. Since you check for run.lockedToVersion earlier, you might consider restructuring the code to ensure run.lockedToVersion is always defined before accessing its properties.

Apply this diff to safely access sdkVersion:

- const sdkVersion = run.lockedToVersion.sdkVersion ?? "0.0.0";
+ const sdkVersion = run.lockedToVersion?.sdkVersion ?? "0.0.0";
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const sdkVersion = run.lockedToVersion.sdkVersion ?? "0.0.0";
const isValid = semver.valid(sdkVersion);
if (!isValid) {
logger.error("[FailedTaskRunRetryHelper] Invalid SDK version", {
run,
execution,
});
return;
}
const sdkVersion = run.lockedToVersion?.sdkVersion ?? "0.0.0";
const isValid = semver.valid(sdkVersion);
if (!isValid) {
logger.error("[FailedTaskRunRetryHelper] Invalid SDK version", {
run,
execution,
});
return;
}


// With older SDK versions, tasks only have a retry config stored in the DB if it's explicitly defined on the task itself
// It won't get populated with retry.default in trigger.config.ts
if (semver.lt(sdkVersion, FailedTaskRunRetryHelper.DEFAULT_RETRY_CONFIG_SINCE_VERSION)) {
logger.warn(
"[FailedTaskRunRetryHelper] SDK version not recent enough to determine retry config",
{
run,
execution,
}
);

return;
}
}
nicktrn marked this conversation as resolved.
Show resolved Hide resolved

const parsedRetryConfig = RetryOptions.safeParse(retryConfig);

if (!parsedRetryConfig.success) {
logger.error("[FailedTaskRunRetryHelper] Invalid retry config", {
run,
execution,
});

return;
}

const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);

if (!delay) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});

return;
}
Comment on lines +259 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure accurate delay validation to avoid misinterpreting zero delays.

Using if (!delay) may incorrectly handle cases where calculateNextRetryDelay returns 0, which is a valid delay value representing an immediate retry. Since 0 is falsy in JavaScript, this condition would treat a zero delay as if there is no delay value, possibly skipping retries that should occur immediately.

Apply this diff to correct the condition:

- if (!delay) {
+ if (delay == null) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (!delay) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});
return;
}
if (delay == null) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});
return;
}


return {
timestamp: Date.now() + delay,
delay,
};
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
run,
execution,
error,
});

return;
}
}

// TODO: update this to the correct version
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.0.12";
nicktrn marked this conversation as resolved.
Show resolved Hide resolved
}
6 changes: 5 additions & 1 deletion apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ function createCoordinatorNamespace(io: Server) {
}

const service = new CreateTaskRunAttemptService();
const { attempt } = await service.call(message.runId, environment, false);
const { attempt } = await service.call({
runId: message.runId,
authenticatedEnv: environment,
setToExecuting: false,
});

const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
id: attempt.id,
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,10 @@ export class SharedQueueConsumer {
if (!deployment.worker.supportsLazyAttempts) {
try {
const service = new CreateTaskRunAttemptService();
await service.call(lockedTaskRun.friendlyId, undefined, false);
await service.call({
runId: lockedTaskRun.id,
setToExecuting: false,
});
} catch (error) {
logger.error("Failed to create task run attempt for outdate worker", {
error,
Expand Down
Loading