From 7bfaf74d6326cd308171f4f5e27d5a3c079f7b79 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 14 Oct 2024 09:43:15 +0100 Subject: [PATCH 01/47] refactor finalize run service --- .../app/v3/services/finalizeTaskRun.server.ts | 157 +++++++++--------- 1 file changed, 82 insertions(+), 75 deletions(-) diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index 797e802382..5ed5821f15 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -111,83 +111,90 @@ export class FinalizeTaskRunService extends BaseService { error?: TaskRunError; run: TaskRun; }) { - if (attemptStatus || error) { - const latestAttempt = await this._prisma.taskRunAttempt.findFirst({ - where: { taskRunId: run.id }, - orderBy: { id: "desc" }, - take: 1, + if (!attemptStatus && !error) { + logger.error("FinalizeTaskRunService: No attemptStatus or error provided", { runId: run.id }); + return; + } + + const latestAttempt = await this._prisma.taskRunAttempt.findFirst({ + where: { taskRunId: run.id }, + orderBy: { id: "desc" }, + take: 1, + }); + + if (latestAttempt) { + logger.debug("Finalizing run attempt", { + id: latestAttempt.id, + status: attemptStatus, + error, + }); + + await this._prisma.taskRunAttempt.update({ + where: { id: latestAttempt.id }, + data: { status: attemptStatus, error: error ? sanitizeError(error) : undefined }, }); - if (latestAttempt) { - logger.debug("Finalizing run attempt", { - id: latestAttempt.id, - status: attemptStatus, - error, - }); - - await this._prisma.taskRunAttempt.update({ - where: { id: latestAttempt.id }, - data: { status: attemptStatus, error: error ? sanitizeError(error) : undefined }, - }); - } else { - logger.debug("Finalizing run no attempt found", { - runId: run.id, - attemptStatus, - error, - }); - - if (!run.lockedById) { - logger.error( - "FinalizeTaskRunService: No lockedById, so can't get the BackgroundWorkerTask. Not creating an attempt.", - { runId: run.id } - ); - return; - } - - const workerTask = await this._prisma.backgroundWorkerTask.findFirst({ - select: { - id: true, - workerId: true, - runtimeEnvironmentId: true, - }, - where: { - id: run.lockedById, - }, - }); - - if (!workerTask) { - logger.error("FinalizeTaskRunService: No worker task found", { runId: run.id }); - return; - } - - const queue = await this._prisma.taskQueue.findUnique({ - where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: workerTask.runtimeEnvironmentId, - name: sanitizeQueueName(run.queue), - }, - }, - }); - - if (!queue) { - logger.error("FinalizeTaskRunService: No queue found", { runId: run.id }); - return; - } - - await this._prisma.taskRunAttempt.create({ - data: { - number: 1, - friendlyId: generateFriendlyId("attempt"), - taskRunId: run.id, - backgroundWorkerId: workerTask?.workerId, - backgroundWorkerTaskId: workerTask?.id, - queueId: queue.id, - runtimeEnvironmentId: workerTask.runtimeEnvironmentId, - status: attemptStatus, - error: error ? sanitizeError(error) : undefined, - }, - }); - } + return; } + + // There's no attempt, so create one + + logger.debug("Finalizing run no attempt found", { + runId: run.id, + attemptStatus, + error, + }); + + if (!run.lockedById) { + logger.error( + "FinalizeTaskRunService: No lockedById, so can't get the BackgroundWorkerTask. Not creating an attempt.", + { runId: run.id } + ); + return; + } + + const workerTask = await this._prisma.backgroundWorkerTask.findFirst({ + select: { + id: true, + workerId: true, + runtimeEnvironmentId: true, + }, + where: { + id: run.lockedById, + }, + }); + + if (!workerTask) { + logger.error("FinalizeTaskRunService: No worker task found", { runId: run.id }); + return; + } + + const queue = await this._prisma.taskQueue.findUnique({ + where: { + runtimeEnvironmentId_name: { + runtimeEnvironmentId: workerTask.runtimeEnvironmentId, + name: sanitizeQueueName(run.queue), + }, + }, + }); + + if (!queue) { + logger.error("FinalizeTaskRunService: No queue found", { runId: run.id }); + return; + } + + await this._prisma.taskRunAttempt.create({ + data: { + number: 1, + friendlyId: generateFriendlyId("attempt"), + taskRunId: run.id, + backgroundWorkerId: workerTask?.workerId, + backgroundWorkerTaskId: workerTask?.id, + queueId: queue.id, + runtimeEnvironmentId: workerTask.runtimeEnvironmentId, + status: attemptStatus, + error: error ? sanitizeError(error) : undefined, + }, + }); } } From e81d5bc8a72128f27d78ff3423efe2e30fff0d4a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 14 Oct 2024 09:44:26 +0100 Subject: [PATCH 02/47] refactor complete attempt service --- .../app/v3/services/completeAttempt.server.ts | 371 ++++++++++-------- 1 file changed, 215 insertions(+), 156 deletions(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 3df51c5cf9..696c3a38f3 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -1,5 +1,6 @@ import { Attributes } from "@opentelemetry/api"; import { + type ExceptionEventProperties, TaskRunContext, TaskRunExecution, TaskRunExecutionResult, @@ -194,8 +195,6 @@ export class CompleteAttemptService extends BaseService { env ); - // The cancel service handles ACK - return "COMPLETED"; } @@ -214,182 +213,107 @@ export class CompleteAttemptService extends BaseService { const environment = env ?? (await this.#getEnvironment(execution.environment.id)); if (completion.retry !== undefined && taskRunAttempt.number < MAX_TASK_RUN_ATTEMPTS) { - const retryAt = new Date(completion.retry.timestamp); - - // Retry the task run - await eventRepository.recordEvent(`Retry #${execution.attempt.number} delay`, { - taskSlug: taskRunAttempt.taskRun.taskIdentifier, + return await this.#retryAttempt({ + execution, + executionRetry: completion.retry, + taskRunAttempt, environment, - attributes: { - metadata: this.#generateMetadataAttributesForNextAttempt(execution), - properties: { - retryAt: retryAt.toISOString(), - }, - runId: taskRunAttempt.taskRun.friendlyId, - style: { - icon: "schedule-attempt", - }, - queueId: taskRunAttempt.queueId, - queueName: taskRunAttempt.taskRun.queue, - }, - context: taskRunAttempt.taskRun.traceContext as Record, - spanIdSeed: `retry-${taskRunAttempt.number + 1}`, - endTime: retryAt, + checkpoint, + supportsRetryCheckpoints, }); + } - logger.debug("Retrying", { - taskRun: taskRunAttempt.taskRun.friendlyId, - retry: completion.retry, - }); + // The attempt has failed and we won't retry - await this._prisma.taskRun.update({ - where: { - id: taskRunAttempt.taskRunId, - }, - data: { - status: "RETRYING_AFTER_FAILURE", + // Now we need to "complete" the task run event/span + await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, { + endTime: new Date(), + attributes: { + isError: true, + }, + events: [ + { + name: "exception", + time: new Date(), + properties: { + exception: createExceptionPropertiesFromError(sanitizedError), + }, }, - }); - - if (environment.type === "DEVELOPMENT") { - // This is already an EXECUTE message so we can just NACK - await marqs?.nackMessage(taskRunAttempt.taskRunId, completion.retry.timestamp); - return "RETRIED"; - } - - if (!checkpoint) { - await this.#retryAttempt({ - run: taskRunAttempt.taskRun, - retry: completion.retry, - supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, - supportsRetryCheckpoints, - }); - - return "RETRIED"; - } + ], + }); - const createCheckpoint = new CreateCheckpointService(this._prisma); - const checkpointCreateResult = await createCheckpoint.call({ - attemptFriendlyId: execution.attempt.id, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "RETRYING_AFTER_FAILURE", - attemptNumber: execution.attempt.number, - }, + // If the error is a graceful exit timeout, we need to fail the task run and all incomplete spans + if ( + sanitizedError.type === "INTERNAL_ERROR" && + sanitizedError.code === "GRACEFUL_EXIT_TIMEOUT" + ) { + const finalizeService = new FinalizeTaskRunService(); + await finalizeService.call({ + id: taskRunAttempt.taskRunId, + status: "SYSTEM_FAILURE", + completedAt: new Date(), }); - if (!checkpointCreateResult.success) { - logger.error("Failed to create checkpoint", { checkpoint, execution: execution.run.id }); - - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status: "SYSTEM_FAILURE", - completedAt: new Date(), - }); - - return "COMPLETED"; - } - - await this.#retryAttempt({ - run: taskRunAttempt.taskRun, - retry: completion.retry, - checkpointEventId: checkpointCreateResult.event.id, - supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, - supportsRetryCheckpoints, + // We need to fail all incomplete spans + const inProgressEvents = await eventRepository.queryIncompleteEvents({ + attemptId: execution.attempt.id, }); - return "RETRIED"; - } else { - // Now we need to "complete" the task run event/span - await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, { - endTime: new Date(), - attributes: { - isError: true, - }, - events: [ - { - name: "exception", - time: new Date(), - properties: { - exception: createExceptionPropertiesFromError(sanitizedError), - }, - }, - ], + logger.debug("Failing in-progress events", { + inProgressEvents: inProgressEvents.map((event) => event.id), }); - if ( - sanitizedError.type === "INTERNAL_ERROR" && - sanitizedError.code === "GRACEFUL_EXIT_TIMEOUT" - ) { - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status: "SYSTEM_FAILURE", - completedAt: new Date(), - }); - - // We need to fail all incomplete spans - const inProgressEvents = await eventRepository.queryIncompleteEvents({ - attemptId: execution.attempt.id, - }); + const exception = { + type: "Graceful exit timeout", + message: sanitizedError.message, + } satisfies ExceptionEventProperties; + + await Promise.all( + inProgressEvents.map((event) => { + return eventRepository.crashEvent({ + event: event, + crashedAt: new Date(), + exception, + }); + }) + ); - logger.debug("Failing in-progress events", { - inProgressEvents: inProgressEvents.map((event) => event.id), - }); + return "COMPLETED"; + } - const exception = { - type: "Graceful exit timeout", - message: sanitizedError.message, - }; - - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.crashEvent({ - event: event, - crashedAt: new Date(), - exception, - }); - }) - ); - } else { - await this._prisma.taskRun.update({ - where: { - id: taskRunAttempt.taskRunId, - }, - data: { - error: sanitizedError, - }, - }); + await this._prisma.taskRun.update({ + where: { + id: taskRunAttempt.taskRunId, + }, + data: { + error: sanitizedError, + }, + }); - const status = - sanitizedError.type === "INTERNAL_ERROR" && - sanitizedError.code === "MAX_DURATION_EXCEEDED" - ? "TIMED_OUT" - : "COMPLETED_WITH_ERRORS"; + const status = + sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED" + ? "TIMED_OUT" + : "COMPLETED_WITH_ERRORS"; - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status, - completedAt: new Date(), - }); - } + const finalizeService = new FinalizeTaskRunService(); + await finalizeService.call({ + id: taskRunAttempt.taskRunId, + status, + completedAt: new Date(), + }); - return "COMPLETED"; - } + return "COMPLETED"; } - async #retryAttempt({ + async #enqueueReattempt({ run, - retry, + executionRetry, checkpointEventId, supportsLazyAttempts, supportsRetryCheckpoints, }: { run: TaskRun; - retry: TaskRunExecutionRetry; + executionRetry: TaskRunExecutionRetry; checkpointEventId?: string; supportsLazyAttempts: boolean; supportsRetryCheckpoints?: boolean; @@ -404,12 +328,12 @@ export class CompleteAttemptService extends BaseService { checkpointEventId: supportsRetryCheckpoints ? checkpointEventId : undefined, retryCheckpointsDisabled: !supportsRetryCheckpoints, }, - retry.timestamp + executionRetry.timestamp ); }; const retryDirectly = () => { - return RetryAttemptService.enqueue(run.id, this._prisma, new Date(retry.timestamp)); + return RetryAttemptService.enqueue(run.id, this._prisma, new Date(executionRetry.timestamp)); }; // There's a checkpoint, so we need to go through the queue @@ -432,7 +356,7 @@ export class CompleteAttemptService extends BaseService { } // Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold - if (!supportsRetryCheckpoints && retry.delay >= env.CHECKPOINT_THRESHOLD_IN_MS) { + if (!supportsRetryCheckpoints && executionRetry.delay >= env.CHECKPOINT_THRESHOLD_IN_MS) { await retryViaQueue(); return; } @@ -441,6 +365,141 @@ export class CompleteAttemptService extends BaseService { await retryDirectly(); } + async #retryAttempt({ + execution, + executionRetry, + taskRunAttempt, + environment, + checkpoint, + supportsRetryCheckpoints, + }: { + execution: TaskRunExecution; + executionRetry: TaskRunExecutionRetry; + taskRunAttempt: NonNullable; + environment: AuthenticatedEnvironment; + checkpoint?: CheckpointData; + supportsRetryCheckpoints?: boolean; + }) { + const retryAt = new Date(executionRetry.timestamp); + + // Retry the task run + await eventRepository.recordEvent(`Retry #${execution.attempt.number} delay`, { + taskSlug: taskRunAttempt.taskRun.taskIdentifier, + environment, + attributes: { + metadata: this.#generateMetadataAttributesForNextAttempt(execution), + properties: { + retryAt: retryAt.toISOString(), + }, + runId: taskRunAttempt.taskRun.friendlyId, + style: { + icon: "schedule-attempt", + }, + queueId: taskRunAttempt.queueId, + queueName: taskRunAttempt.taskRun.queue, + }, + context: taskRunAttempt.taskRun.traceContext as Record, + spanIdSeed: `retry-${taskRunAttempt.number + 1}`, + endTime: retryAt, + }); + + logger.debug("Retrying", { + taskRun: taskRunAttempt.taskRun.friendlyId, + retry: executionRetry, + }); + + await this._prisma.taskRun.update({ + where: { + id: taskRunAttempt.taskRunId, + }, + data: { + status: "RETRYING_AFTER_FAILURE", + }, + }); + + if (environment.type === "DEVELOPMENT") { + // This is already an EXECUTE message so we can just NACK + await marqs?.nackMessage(taskRunAttempt.taskRunId, executionRetry.timestamp); + return "RETRIED"; + } + + if (checkpoint) { + // This is only here for backwards compat - we don't checkpoint between attempts anymore + return await this.#retryAttemptWithCheckpoint({ + execution, + taskRunAttempt, + executionRetry, + checkpoint, + supportsRetryCheckpoints, + }); + } + + await this.#enqueueReattempt({ + run: taskRunAttempt.taskRun, + executionRetry, + supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, + supportsRetryCheckpoints, + }); + + return "RETRIED"; + } + + async #retryAttemptWithCheckpoint({ + execution, + taskRunAttempt, + executionRetry, + checkpoint, + supportsRetryCheckpoints, + }: { + execution: TaskRunExecution; + taskRunAttempt: NonNullable; + executionRetry: TaskRunExecutionRetry; + checkpoint: CheckpointData; + supportsRetryCheckpoints?: boolean; + }) { + const createCheckpoint = new CreateCheckpointService(this._prisma); + const checkpointCreateResult = await createCheckpoint.call({ + attemptFriendlyId: execution.attempt.id, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "RETRYING_AFTER_FAILURE", + attemptNumber: execution.attempt.number, + }, + }); + + if (!checkpointCreateResult.success) { + logger.error("Failed to create reattempt checkpoint", { + checkpoint, + runId: execution.run.id, + attemptId: execution.attempt.id, + }); + + const finalizeService = new FinalizeTaskRunService(); + await finalizeService.call({ + id: taskRunAttempt.taskRunId, + status: "SYSTEM_FAILURE", + completedAt: new Date(), + error: { + type: "STRING_ERROR", + raw: "Failed to create reattempt checkpoint", + }, + }); + + return "COMPLETED" as const; + } + + await this.#enqueueReattempt({ + run: taskRunAttempt.taskRun, + executionRetry, + checkpointEventId: checkpointCreateResult.event.id, + supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, + supportsRetryCheckpoints, + }); + + return "RETRIED" as const; + } + #generateMetadataAttributesForNextAttempt(execution: TaskRunExecution) { const context = TaskRunContext.parse(execution); From 55cb6a90fa8c7e7d8884df16a041c65ab5883074 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 14 Oct 2024 09:48:09 +0100 Subject: [PATCH 03/47] remove separate graceful exit handling --- .../app/v3/services/completeAttempt.server.ts | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 696c3a38f3..80e6f982fb 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -1,6 +1,5 @@ import { Attributes } from "@opentelemetry/api"; import { - type ExceptionEventProperties, TaskRunContext, TaskRunExecution, TaskRunExecutionResult, @@ -242,45 +241,6 @@ export class CompleteAttemptService extends BaseService { ], }); - // If the error is a graceful exit timeout, we need to fail the task run and all incomplete spans - if ( - sanitizedError.type === "INTERNAL_ERROR" && - sanitizedError.code === "GRACEFUL_EXIT_TIMEOUT" - ) { - const finalizeService = new FinalizeTaskRunService(); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status: "SYSTEM_FAILURE", - completedAt: new Date(), - }); - - // We need to fail all incomplete spans - const inProgressEvents = await eventRepository.queryIncompleteEvents({ - attemptId: execution.attempt.id, - }); - - logger.debug("Failing in-progress events", { - inProgressEvents: inProgressEvents.map((event) => event.id), - }); - - const exception = { - type: "Graceful exit timeout", - message: sanitizedError.message, - } satisfies ExceptionEventProperties; - - await Promise.all( - inProgressEvents.map((event) => { - return eventRepository.crashEvent({ - event: event, - crashedAt: new Date(), - exception, - }); - }) - ); - - return "COMPLETED"; - } - await this._prisma.taskRun.update({ where: { id: taskRunAttempt.taskRunId, From 8699d6bbc92467f282f3021082527ecc4bf9793c Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 14 Oct 2024 23:09:50 +0100 Subject: [PATCH 04/47] refactor task status helpers --- apps/webapp/app/v3/taskStatus.ts | 125 ++++++++++++++++--------------- 1 file changed, 64 insertions(+), 61 deletions(-) diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index 498df8a267..00270b2371 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -1,72 +1,72 @@ import type { TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database"; -export const CANCELLABLE_RUN_STATUSES: TaskRunStatus[] = [ - "DELAYED", - "PENDING", - "WAITING_FOR_DEPLOY", - "EXECUTING", - "PAUSED", - "WAITING_TO_RESUME", - "PAUSED", - "RETRYING_AFTER_FAILURE", -]; -export const CANCELLABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = [ - "EXECUTING", - "PAUSED", - "PENDING", -]; - -export function isCancellableRunStatus(status: TaskRunStatus): boolean { - return CANCELLABLE_RUN_STATUSES.includes(status); -} -export function isCancellableAttemptStatus(status: TaskRunAttemptStatus): boolean { - return CANCELLABLE_ATTEMPT_STATUSES.includes(status); -} - -export const CRASHABLE_RUN_STATUSES: TaskRunStatus[] = CANCELLABLE_RUN_STATUSES; -export const CRASHABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = CANCELLABLE_ATTEMPT_STATUSES; - -export function isCrashableRunStatus(status: TaskRunStatus): boolean { - return CRASHABLE_RUN_STATUSES.includes(status); -} -export function isCrashableAttemptStatus(status: TaskRunAttemptStatus): boolean { - return CRASHABLE_ATTEMPT_STATUSES.includes(status); -} - export const FINAL_RUN_STATUSES = [ "CANCELED", + "INTERRUPTED", "COMPLETED_SUCCESSFULLY", "COMPLETED_WITH_ERRORS", - "INTERRUPTED", "SYSTEM_FAILURE", - "EXPIRED", "CRASHED", + "EXPIRED", "TIMED_OUT", ] satisfies TaskRunStatus[]; export type FINAL_RUN_STATUSES = (typeof FINAL_RUN_STATUSES)[number]; +export const NON_FINAL_RUN_STATUSES = [ + "DELAYED", + "PENDING", + "WAITING_FOR_DEPLOY", + "EXECUTING", + "WAITING_TO_RESUME", + "RETRYING_AFTER_FAILURE", + "PAUSED", +] satisfies TaskRunStatus[]; + +export type NON_FINAL_RUN_STATUSES = (typeof NON_FINAL_RUN_STATUSES)[number]; + export const FINAL_ATTEMPT_STATUSES = [ + "FAILED", "CANCELED", "COMPLETED", - "FAILED", ] satisfies TaskRunAttemptStatus[]; export type FINAL_ATTEMPT_STATUSES = (typeof FINAL_ATTEMPT_STATUSES)[number]; -export const FAILED_ATTEMPT_STATUSES = ["FAILED", "CANCELED"] satisfies TaskRunAttemptStatus[]; +export const NON_FINAL_ATTEMPT_STATUSES = [ + "PENDING", + "EXECUTING", + "PAUSED", +] satisfies TaskRunAttemptStatus[]; + +export type NON_FINAL_ATTEMPT_STATUSES = (typeof NON_FINAL_ATTEMPT_STATUSES)[number]; -export type FAILED_ATTEMPT_STATUSES = (typeof FAILED_ATTEMPT_STATUSES)[number]; +export const FAILED_RUN_STATUSES = [ + "INTERRUPTED", + "COMPLETED_WITH_ERRORS", + "SYSTEM_FAILURE", + "CRASHED", + "TIMED_OUT", +] satisfies TaskRunStatus[]; + +export const CANCELLABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; +export const CANCELLABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES; + +export const CRASHABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; +export const CRASHABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES; + +export const FAILABLE_RUN_STATUSES = [ + "EXECUTING", + "PENDING", + "WAITING_FOR_DEPLOY", + "RETRYING_AFTER_FAILURE", +] satisfies TaskRunStatus[]; export const FREEZABLE_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "RETRYING_AFTER_FAILURE"]; export const FREEZABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["EXECUTING", "FAILED"]; -export function isFreezableRunStatus(status: TaskRunStatus): boolean { - return FREEZABLE_RUN_STATUSES.includes(status); -} -export function isFreezableAttemptStatus(status: TaskRunAttemptStatus): boolean { - return FREEZABLE_ATTEMPT_STATUSES.includes(status); -} +export const RESTORABLE_RUN_STATUSES: TaskRunStatus[] = ["WAITING_TO_RESUME"]; +export const RESTORABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["PAUSED"]; export function isFinalRunStatus(status: TaskRunStatus): boolean { return FINAL_RUN_STATUSES.includes(status); @@ -75,8 +75,26 @@ export function isFinalAttemptStatus(status: TaskRunAttemptStatus): boolean { return FINAL_ATTEMPT_STATUSES.includes(status); } -export const RESTORABLE_RUN_STATUSES: TaskRunStatus[] = ["WAITING_TO_RESUME"]; -export const RESTORABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["PAUSED"]; +export function isCancellableRunStatus(status: TaskRunStatus): boolean { + return CANCELLABLE_RUN_STATUSES.includes(status); +} +export function isCancellableAttemptStatus(status: TaskRunAttemptStatus): boolean { + return CANCELLABLE_ATTEMPT_STATUSES.includes(status); +} + +export function isCrashableRunStatus(status: TaskRunStatus): boolean { + return CRASHABLE_RUN_STATUSES.includes(status); +} +export function isCrashableAttemptStatus(status: TaskRunAttemptStatus): boolean { + return CRASHABLE_ATTEMPT_STATUSES.includes(status); +} + +export function isFreezableRunStatus(status: TaskRunStatus): boolean { + return FREEZABLE_RUN_STATUSES.includes(status); +} +export function isFreezableAttemptStatus(status: TaskRunAttemptStatus): boolean { + return FREEZABLE_ATTEMPT_STATUSES.includes(status); +} export function isRestorableRunStatus(status: TaskRunStatus): boolean { return RESTORABLE_RUN_STATUSES.includes(status); @@ -85,21 +103,6 @@ export function isRestorableAttemptStatus(status: TaskRunAttemptStatus): boolean return RESTORABLE_ATTEMPT_STATUSES.includes(status); } -export const FAILABLE_RUN_STATUSES = [ - "EXECUTING", - "PENDING", - "WAITING_FOR_DEPLOY", - "RETRYING_AFTER_FAILURE", -] satisfies TaskRunStatus[]; - -export const FAILED_RUN_STATUSES = [ - "INTERRUPTED", - "COMPLETED_WITH_ERRORS", - "SYSTEM_FAILURE", - "CRASHED", - "TIMED_OUT", -] satisfies TaskRunStatus[]; - export function isFailedRunStatus(status: TaskRunStatus): boolean { return FAILED_RUN_STATUSES.includes(status); } From 0c37df74502670c8fded12eb989af7bf82e8ced7 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 14 Oct 2024 23:10:51 +0100 Subject: [PATCH 05/47] clearly separate statuses in prisma schema --- apps/webapp/app/v3/failedTaskRun.server.ts | 3 +++ apps/webapp/app/v3/services/completeAttempt.server.ts | 8 ++++++++ internal-packages/database/prisma/schema.prisma | 10 ++++++++++ 3 files changed, 21 insertions(+) diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index c322241df7..4024946454 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -5,6 +5,9 @@ import { BaseService } from "./services/baseService.server"; import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server"; import { FAILABLE_RUN_STATUSES } from "./taskStatus"; +/** + * + */ export class FailedTaskRunService extends BaseService { public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) { const isFriendlyId = anyRunId.startsWith("run_"); diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 80e6f982fb..4208cbe077 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -199,6 +199,14 @@ export class CompleteAttemptService extends BaseService { const sanitizedError = sanitizeError(completion.error); + // TODO: make this handle the case where the current attempt is unknown, with only a run id + + // 1. Get the task run + + // 2. Get the most recent attempt + + // 3. Get the retry config + await this._prisma.taskRunAttempt.update({ where: { id: taskRunAttempt.id }, data: { diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index d346d4584c..14a90cc526 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1795,6 +1795,10 @@ model TaskRun { } enum TaskRunStatus { + /// + /// NON-FINAL STATUSES + /// + /// Task has been scheduled to run in the future DELAYED /// Task is waiting to be executed by a worker @@ -1815,6 +1819,10 @@ enum TaskRunStatus { /// Task has been paused by the user, and can be resumed by the user PAUSED + /// + /// FINAL STATUSES + /// + /// Task has been canceled by the user CANCELED @@ -1949,9 +1957,11 @@ model TaskRunAttempt { } enum TaskRunAttemptStatus { + /// NON-FINAL PENDING EXECUTING PAUSED + /// FINAL FAILED CANCELED COMPLETED From f710326c5dae4d14e7b53fda4f0be920cc90742b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 14 Oct 2024 23:17:30 +0100 Subject: [PATCH 06/47] all non-final statuses should be failable --- apps/webapp/app/v3/taskStatus.ts | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index 00270b2371..bc9e1e9aff 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -55,12 +55,7 @@ export const CANCELLABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES; export const CRASHABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; export const CRASHABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES; -export const FAILABLE_RUN_STATUSES = [ - "EXECUTING", - "PENDING", - "WAITING_FOR_DEPLOY", - "RETRYING_AFTER_FAILURE", -] satisfies TaskRunStatus[]; +export const FAILABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; export const FREEZABLE_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "RETRYING_AFTER_FAILURE"]; export const FREEZABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["EXECUTING", "FAILED"]; @@ -75,6 +70,10 @@ export function isFinalAttemptStatus(status: TaskRunAttemptStatus): boolean { return FINAL_ATTEMPT_STATUSES.includes(status); } +export function isFailedRunStatus(status: TaskRunStatus): boolean { + return FAILED_RUN_STATUSES.includes(status); +} + export function isCancellableRunStatus(status: TaskRunStatus): boolean { return CANCELLABLE_RUN_STATUSES.includes(status); } @@ -102,7 +101,3 @@ export function isRestorableRunStatus(status: TaskRunStatus): boolean { export function isRestorableAttemptStatus(status: TaskRunAttemptStatus): boolean { return RESTORABLE_ATTEMPT_STATUSES.includes(status); } - -export function isFailedRunStatus(status: TaskRunStatus): boolean { - return FAILED_RUN_STATUSES.includes(status); -} From 0b39a4cf00009e6214296b6f8c9d6360f7442815 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:51:53 +0100 Subject: [PATCH 07/47] new import payload error code --- packages/core/src/v3/schemas/common.ts | 2 ++ packages/core/src/v3/workers/taskExecutor.ts | 21 +++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index c7e2bd8f77..5efa968a4d 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -98,6 +98,7 @@ export const TaskRunErrorCodes = { DISK_SPACE_EXCEEDED: "DISK_SPACE_EXCEEDED", POD_EVICTED: "POD_EVICTED", POD_UNKNOWN_ERROR: "POD_UNKNOWN_ERROR", + IMPORT_PAYLOAD_ERROR: "IMPORT_PAYLOAD_ERROR", } as const; export const TaskRunInternalError = z.object({ @@ -124,6 +125,7 @@ export const TaskRunInternalError = z.object({ "DISK_SPACE_EXCEEDED", "POD_EVICTED", "POD_UNKNOWN_ERROR", + "IMPORT_PAYLOAD_ERROR", ]), message: z.string().optional(), stackTrace: z.string().optional(), diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 7139e35225..77be5a06fd 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -92,9 +92,28 @@ export class TaskExecutor { try { const payloadPacket = await conditionallyImportPacket(originalPacket, this._tracer); - parsedPayload = await parsePacket(payloadPacket); + } catch (packetError) { + recordSpanException(span, packetError); + + return { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.IMPORT_PAYLOAD_ERROR, + message: + packetError instanceof Error + ? `${packetError.name}: ${packetError.message}` + : typeof packetError === "string" + ? packetError + : undefined, + stackTrace: packetError instanceof Error ? packetError.stack : undefined, + }, + } satisfies TaskRunExecutionResult; + } + try { if (execution.attempt.number === 1) { await this.#callOnStartFunctions(parsedPayload, ctx, signal); } From f92d82fedf4ada996c5cf3e523738777f2ec206a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:53:15 +0100 Subject: [PATCH 08/47] store default retry config if none set on task --- .../cli-v3/src/entryPoints/deploy-index-worker.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts index dfaacb5d73..d1d2a612b6 100644 --- a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts @@ -99,6 +99,20 @@ const { buildManifest, importErrors, config } = await bootstrap(); let tasks = taskCatalog.listTaskManifests(); +// If the config has retry defaults, we need to apply them to all tasks that don't have any retry settings +if (config.retries?.default) { + tasks = tasks.map((task) => { + if (!task.retry) { + return { + ...task, + retries: config.retries?.default, + }; + } + + return task; + }); +} + // If the config has a machine preset, we need to apply it to all tasks that don't have a machine preset if (typeof config.machine === "string") { tasks = tasks.map((task) => { From 9350770fbfac1e9f18aebacf8eb0dbd405386bec Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 15 Oct 2024 13:41:21 +0100 Subject: [PATCH 09/47] failed run service now respects retries --- apps/webapp/app/v3/failedTaskRun.server.ts | 143 +++++++++++++++++- .../v3/marqs/sharedQueueConsumer.server.ts | 43 +++--- .../app/v3/services/completeAttempt.server.ts | 22 ++- apps/webapp/app/v3/taskStatus.ts | 4 + 4 files changed, 173 insertions(+), 39 deletions(-) diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index 4024946454..5cc34388c3 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -1,15 +1,39 @@ -import { sanitizeError, TaskRunFailedExecutionResult } from "@trigger.dev/core/v3"; +import { + calculateNextRetryDelay, + RetryOptions, + sanitizeError, + TaskRunExecution, + TaskRunExecutionRetry, + TaskRunFailedExecutionResult, +} from "@trigger.dev/core/v3"; import { logger } from "~/services/logger.server"; import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server"; import { BaseService } from "./services/baseService.server"; import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server"; -import { FAILABLE_RUN_STATUSES } from "./taskStatus"; +import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus"; +import { Prisma } from "@trigger.dev/database"; +import { CompleteAttemptService } from "./services/completeAttempt.server"; +import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server"; +import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server"; + +const includeAttempts = { + attempts: { + orderBy: { + createdAt: "desc", + }, + take: 1, + }, + lockedBy: true, +} satisfies Prisma.TaskRunInclude; + +type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{ + include: typeof includeAttempts; +}>; -/** - * - */ export class FailedTaskRunService extends BaseService { public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) { + logger.debug("[FailedTaskRunService] Handling failed task run", { anyRunId, completion }); + const isFriendlyId = anyRunId.startsWith("run_"); const taskRun = await this._prisma.taskRun.findUnique({ @@ -17,6 +41,7 @@ export class FailedTaskRunService extends BaseService { friendlyId: isFriendlyId ? anyRunId : undefined, id: !isFriendlyId ? anyRunId : undefined, }, + include: includeAttempts, }); if (!taskRun) { @@ -28,7 +53,7 @@ export class FailedTaskRunService extends BaseService { return; } - if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) { + if (!isFailableRunStatus(taskRun.status)) { logger.error("[FailedTaskRunService] Task run is not in a failable state", { taskRun, completion, @@ -37,7 +62,28 @@ export class FailedTaskRunService extends BaseService { return; } - // No more retries, we need to fail the task run + const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion); + + if (retriableExecution) { + logger.debug("[FailedTaskRunService] Completing attempt", { taskRun, completion }); + + const executionRetry = + completion.retry ?? (await this.#getExecutionRetry(taskRun, retriableExecution)); + + const completeAttempt = new CompleteAttemptService(this._prisma); + await completeAttempt.call({ + completion: { + ...completion, + retry: executionRetry, + }, + execution: retriableExecution, + isSystemFailure: true, + }); + + return; + } + + // No retriable execution, so we need to fail the task run logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion }); const finalizeService = new FinalizeTaskRunService(); @@ -66,4 +112,87 @@ export class FailedTaskRunService extends BaseService { ], }); } + + async #getRetriableAttemptExecution( + run: TaskRunWithAttempts, + completion: TaskRunFailedExecutionResult + ): Promise { + let attempt = run.attempts[0]; + + // We need to create an attempt if: + // - None exists yet + // - The last attempt has a final status, e.g. we failed between attempts + if (!attempt || isFinalAttemptStatus(attempt.status)) { + logger.error("[FailedTaskRunService] No attempts found", { + run, + completion, + }); + + const createAttempt = new CreateTaskRunAttemptService(this._prisma); + + try { + const { execution } = await createAttempt.call(run.id); + return execution; + } catch (error) { + logger.error("[FailedTaskRunService] Failed to create attempt", { + run, + completion, + error, + }); + + return; + } + } + + // We already have an attempt with non-final status, let's use it + try { + const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt( + attempt.id, + undefined, + undefined, + true + ); + return executionPayload?.execution; + } catch (error) { + logger.error("[FailedTaskRunService] Failed to get execution payload", { + run, + completion, + error, + }); + + return; + } + } + + async #getExecutionRetry( + run: TaskRunWithAttempts, + execution: TaskRunExecution + ): Promise { + const parsedRetryConfig = RetryOptions.safeParse(run.lockedBy?.retryConfig); + + if (!parsedRetryConfig.success) { + logger.error("[FailedTaskRunService] Invalid retry config", { + run, + execution, + }); + + return; + } + + const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number); + + if (!delay) { + logger.debug("[FailedTaskRunService] No more retries", { + run, + execution, + }); + + return; + } + + return { + timestamp: Date.now() + delay, + delay, + }; + } } diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index a663011e4a..4b454ccc9e 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -945,7 +945,8 @@ class SharedQueueTasks { async getExecutionPayloadFromAttempt( id: string, setToExecuting?: boolean, - isRetrying?: boolean + isRetrying?: boolean, + skipStatusChecks?: boolean ): Promise { const attempt = await prisma.taskRunAttempt.findUnique({ where: { @@ -979,27 +980,29 @@ class SharedQueueTasks { return; } - switch (attempt.status) { - case "CANCELED": - case "EXECUTING": { - logger.error("Invalid attempt status for execution payload retrieval", { - attemptId: id, - status: attempt.status, - }); - return; + if (!skipStatusChecks) { + switch (attempt.status) { + case "CANCELED": + case "EXECUTING": { + logger.error("Invalid attempt status for execution payload retrieval", { + attemptId: id, + status: attempt.status, + }); + return; + } } - } - switch (attempt.taskRun.status) { - case "CANCELED": - case "EXECUTING": - case "INTERRUPTED": { - logger.error("Invalid run status for execution payload retrieval", { - attemptId: id, - runId: attempt.taskRunId, - status: attempt.taskRun.status, - }); - return; + switch (attempt.taskRun.status) { + case "CANCELED": + case "EXECUTING": + case "INTERRUPTED": { + logger.error("Invalid run status for execution payload retrieval", { + attemptId: id, + runId: attempt.taskRunId, + status: attempt.taskRun.status, + }); + return; + } } } diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 4208cbe077..7dac5905d0 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -39,12 +39,14 @@ export class CompleteAttemptService extends BaseService { env, checkpoint, supportsRetryCheckpoints, + isSystemFailure, }: { completion: TaskRunExecutionResult; execution: TaskRunExecution; env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; supportsRetryCheckpoints?: boolean; + isSystemFailure?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { const taskRunAttempt = await findAttempt(this._prisma, execution.attempt.id); @@ -110,6 +112,7 @@ export class CompleteAttemptService extends BaseService { env, checkpoint, supportsRetryCheckpoints, + isSystemFailure, }); } } @@ -170,6 +173,7 @@ export class CompleteAttemptService extends BaseService { env, checkpoint, supportsRetryCheckpoints, + isSystemFailure, }: { completion: TaskRunFailedExecutionResult; execution: TaskRunExecution; @@ -177,6 +181,7 @@ export class CompleteAttemptService extends BaseService { env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; supportsRetryCheckpoints?: boolean; + isSystemFailure?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { if ( completion.error.type === "INTERNAL_ERROR" && @@ -199,14 +204,6 @@ export class CompleteAttemptService extends BaseService { const sanitizedError = sanitizeError(completion.error); - // TODO: make this handle the case where the current attempt is unknown, with only a run id - - // 1. Get the task run - - // 2. Get the most recent attempt - - // 3. Get the retry config - await this._prisma.taskRunAttempt.update({ where: { id: taskRunAttempt.id }, data: { @@ -258,10 +255,11 @@ export class CompleteAttemptService extends BaseService { }, }); - const status = - sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED" - ? "TIMED_OUT" - : "COMPLETED_WITH_ERRORS"; + const status = isSystemFailure + ? "SYSTEM_FAILURE" + : sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED" + ? "TIMED_OUT" + : "COMPLETED_WITH_ERRORS"; const finalizeService = new FinalizeTaskRunService(); await finalizeService.call({ diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index bc9e1e9aff..1930c5d438 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -88,6 +88,10 @@ export function isCrashableAttemptStatus(status: TaskRunAttemptStatus): boolean return CRASHABLE_ATTEMPT_STATUSES.includes(status); } +export function isFailableRunStatus(status: TaskRunStatus): boolean { + return FAILABLE_RUN_STATUSES.includes(status); +} + export function isFreezableRunStatus(status: TaskRunStatus): boolean { return FREEZABLE_RUN_STATUSES.includes(status); } From 9f6696d25d31c078fea7fc991749714aad28f6b1 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 15 Oct 2024 15:13:59 +0100 Subject: [PATCH 10/47] fix merged task retry config indexing --- .../src/entryPoints/deploy-index-worker.ts | 9 +++++---- .../cli-v3/src/entryPoints/dev-index-worker.ts | 17 ++++++++++++++++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts index d1d2a612b6..73de86535d 100644 --- a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts @@ -3,6 +3,7 @@ import { type HandleErrorFunction, indexerToWorkerMessages, taskCatalog, + type TaskManifest, TriggerConfig, } from "@trigger.dev/core/v3"; import { @@ -105,8 +106,8 @@ if (config.retries?.default) { if (!task.retry) { return { ...task, - retries: config.retries?.default, - }; + retry: config.retries?.default, + } satisfies TaskManifest; } return task; @@ -122,7 +123,7 @@ if (typeof config.machine === "string") { machine: { preset: config.machine, }, - }; + } satisfies TaskManifest; } return task; @@ -136,7 +137,7 @@ if (typeof config.maxDuration === "number") { return { ...task, maxDuration: config.maxDuration, - }; + } satisfies TaskManifest; } return task; diff --git a/packages/cli-v3/src/entryPoints/dev-index-worker.ts b/packages/cli-v3/src/entryPoints/dev-index-worker.ts index 0420665907..9e6e8e05e9 100644 --- a/packages/cli-v3/src/entryPoints/dev-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-index-worker.ts @@ -3,6 +3,7 @@ import { type HandleErrorFunction, indexerToWorkerMessages, taskCatalog, + type TaskManifest, TriggerConfig, } from "@trigger.dev/core/v3"; import { @@ -99,6 +100,20 @@ const { buildManifest, importErrors, config } = await bootstrap(); let tasks = taskCatalog.listTaskManifests(); +// If the config has retry defaults, we need to apply them to all tasks that don't have any retry settings +if (config.retries?.default) { + tasks = tasks.map((task) => { + if (!task.retry) { + return { + ...task, + retry: config.retries?.default, + } satisfies TaskManifest; + } + + return task; + }); +} + // If the config has a maxDuration, we need to apply it to all tasks that don't have a maxDuration if (typeof config.maxDuration === "number") { tasks = tasks.map((task) => { @@ -106,7 +121,7 @@ if (typeof config.maxDuration === "number") { return { ...task, maxDuration: config.maxDuration, - }; + } satisfies TaskManifest; } return task; From aee555a465404695943257f83d6346df75391a55 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 15 Oct 2024 15:56:58 +0100 Subject: [PATCH 11/47] some errors should never be retried --- .../app/v3/services/completeAttempt.server.ts | 7 ++- packages/core/src/v3/errors.ts | 50 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 7dac5905d0..94bc47a67e 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -8,6 +8,7 @@ import { TaskRunSuccessfulExecutionResult, flattenAttributes, sanitizeError, + shouldRetryError, } from "@trigger.dev/core/v3"; import { $transaction, PrismaClientOrTransaction } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -216,7 +217,11 @@ export class CompleteAttemptService extends BaseService { const environment = env ?? (await this.#getEnvironment(execution.environment.id)); - if (completion.retry !== undefined && taskRunAttempt.number < MAX_TASK_RUN_ATTEMPTS) { + if ( + shouldRetryError(completion.error) && + completion.retry !== undefined && + taskRunAttempt.number < MAX_TASK_RUN_ATTEMPTS + ) { return await this.#retryAttempt({ execution, executionRetry: completion.retry, diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index f63f48b99f..acf830ed1b 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -10,6 +10,7 @@ import { import { TaskMetadataFailedToParseData } from "./schemas/messages.js"; import { links } from "./links.js"; import { ExceptionEventProperties } from "./schemas/openTelemetry.js"; +import { assertExhaustive } from "../utils.js"; export class AbortTaskRunError extends Error { constructor(message: string) { @@ -137,6 +138,55 @@ export function sanitizeError(error: TaskRunError): TaskRunError { } } +export function shouldRetryError(error: TaskRunError): boolean { + switch (error.type) { + case "INTERNAL_ERROR": { + switch (error.code) { + case "COULD_NOT_FIND_EXECUTOR": + case "COULD_NOT_FIND_TASK": + case "COULD_NOT_IMPORT_TASK": + case "CONFIGURED_INCORRECTLY": + case "TASK_ALREADY_RUNNING": + case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE": + case "TASK_PROCESS_SIGKILL_TIMEOUT": + case "TASK_PROCESS_OOM_KILLED": + case "TASK_PROCESS_MAYBE_OOM_KILLED": + case "TASK_RUN_CANCELLED": + case "TASK_OUTPUT_ERROR": + case "MAX_DURATION_EXCEEDED": + case "DISK_SPACE_EXCEEDED": + return false; + + case "GRACEFUL_EXIT_TIMEOUT": + case "HANDLE_ERROR_ERROR": + case "IMPORT_PAYLOAD_ERROR": + case "POD_EVICTED": + case "POD_UNKNOWN_ERROR": + case "TASK_EXECUTION_ABORTED": + case "TASK_EXECUTION_FAILED": + case "TASK_RUN_CRASHED": + case "TASK_RUN_HEARTBEAT_TIMEOUT": + return true; + + default: + assertExhaustive(error.code); + } + } + case "STRING_ERROR": { + return true; + } + case "BUILT_IN_ERROR": { + return true; + } + case "CUSTOM_ERROR": { + return true; + } + default: { + assertExhaustive(error); + } + } +} + export function correctErrorStackTrace( stackTrace: string, projectDir?: string, From 008de3bf591fb0b0113ddf8d706308061cfed330 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 15 Oct 2024 16:12:51 +0100 Subject: [PATCH 12/47] finalize run service takes care of acks now --- apps/webapp/app/v3/services/crashTaskRun.server.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/webapp/app/v3/services/crashTaskRun.server.ts b/apps/webapp/app/v3/services/crashTaskRun.server.ts index 6a337c000e..5ac2664936 100644 --- a/apps/webapp/app/v3/services/crashTaskRun.server.ts +++ b/apps/webapp/app/v3/services/crashTaskRun.server.ts @@ -1,6 +1,5 @@ import { TaskRun, TaskRunAttempt } from "@trigger.dev/database"; import { eventRepository } from "../eventRepository.server"; -import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -140,8 +139,6 @@ export class CrashTaskRunService extends BaseService { span.setAttribute("taskRunId", run.id); span.setAttribute("attemptId", attempt.id); - await marqs?.acknowledgeMessage(run.id); - await this._prisma.taskRunAttempt.update({ where: { id: attempt.id, From fc59ce23f24c6acb83612bd2ade9a6133c872f74 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 15 Oct 2024 16:26:58 +0100 Subject: [PATCH 13/47] execution payload helper now with single object arg --- apps/webapp/app/v3/failedTaskRun.server.ts | 11 ++++--- apps/webapp/app/v3/handleSocketIo.server.ts | 5 +++- .../v3/marqs/sharedQueueConsumer.server.ts | 29 ++++++++++++------- .../app/v3/services/resumeAttempt.server.ts | 6 ++-- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index 5cc34388c3..d9b90b4bc4 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -146,12 +146,11 @@ export class FailedTaskRunService extends BaseService { // We already have an attempt with non-final status, let's use it try { - const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt( - attempt.id, - undefined, - undefined, - true - ); + const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ + id: attempt.id, + skipStatusChecks: true, + }); + return executionPayload?.execution; } catch (error) { logger.error("[FailedTaskRunService] Failed to get execution payload", { diff --git a/apps/webapp/app/v3/handleSocketIo.server.ts b/apps/webapp/app/v3/handleSocketIo.server.ts index 065d2bc168..dd12d69bb4 100644 --- a/apps/webapp/app/v3/handleSocketIo.server.ts +++ b/apps/webapp/app/v3/handleSocketIo.server.ts @@ -195,7 +195,10 @@ function createCoordinatorNamespace(io: Server) { const service = new CreateTaskRunAttemptService(); const { attempt } = await service.call(message.runId, environment, false); - const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt(attempt.id, true); + const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ + id: attempt.id, + setToExecuting: true, + }); if (!payload) { logger.error("Failed to retrieve payload after attempt creation", message); diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 4b454ccc9e..f91dbd689a 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -718,9 +718,9 @@ export class SharedQueueConsumer { completions.push(completion); - const executionPayload = await this._tasks.getExecutionPayloadFromAttempt( - completedAttempt.id - ); + const executionPayload = await this._tasks.getExecutionPayloadFromAttempt({ + id: completedAttempt.id, + }); if (!executionPayload) { await this.#ackAndDoMoreWork(message.messageId); @@ -942,12 +942,17 @@ class SharedQueueTasks { } } - async getExecutionPayloadFromAttempt( - id: string, - setToExecuting?: boolean, - isRetrying?: boolean, - skipStatusChecks?: boolean - ): Promise { + async getExecutionPayloadFromAttempt({ + id, + setToExecuting, + isRetrying, + skipStatusChecks, + }: { + id: string; + setToExecuting?: boolean; + isRetrying?: boolean; + skipStatusChecks?: boolean; + }): Promise { const attempt = await prisma.taskRunAttempt.findUnique({ where: { id, @@ -1153,7 +1158,11 @@ class SharedQueueTasks { return; } - return this.getExecutionPayloadFromAttempt(latestAttempt.id, setToExecuting, isRetrying); + return this.getExecutionPayloadFromAttempt({ + id: latestAttempt.id, + setToExecuting, + isRetrying, + }); } async getLazyAttemptPayload( diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index f02d9e4ccc..d620a7005e 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -207,9 +207,9 @@ export class ResumeAttemptService extends BaseService { completions.push(completion); - const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt( - completedAttempt.id - ); + const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ + id: completedAttempt.id, + }); if (!executionPayload) { logger.error("Failed to get execution payload", { From ccda672953a34ba331c66b9f1d804d7ab48ca6f1 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 21 Oct 2024 14:59:04 +0100 Subject: [PATCH 14/47] internal error code enum export --- packages/core/src/v3/schemas/common.ts | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index 5efa968a4d..b7068bd223 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -77,30 +77,6 @@ export const TaskRunStringError = z.object({ export type TaskRunStringError = z.infer; -export const TaskRunErrorCodes = { - COULD_NOT_FIND_EXECUTOR: "COULD_NOT_FIND_EXECUTOR", - COULD_NOT_FIND_TASK: "COULD_NOT_FIND_TASK", - COULD_NOT_IMPORT_TASK: "COULD_NOT_IMPORT_TASK", - CONFIGURED_INCORRECTLY: "CONFIGURED_INCORRECTLY", - TASK_ALREADY_RUNNING: "TASK_ALREADY_RUNNING", - TASK_EXECUTION_FAILED: "TASK_EXECUTION_FAILED", - TASK_EXECUTION_ABORTED: "TASK_EXECUTION_ABORTED", - TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE", - TASK_PROCESS_SIGKILL_TIMEOUT: "TASK_PROCESS_SIGKILL_TIMEOUT", - TASK_PROCESS_OOM_KILLED: "TASK_PROCESS_OOM_KILLED", - TASK_PROCESS_MAYBE_OOM_KILLED: "TASK_PROCESS_MAYBE_OOM_KILLED", - TASK_RUN_CANCELLED: "TASK_RUN_CANCELLED", - TASK_OUTPUT_ERROR: "TASK_OUTPUT_ERROR", - HANDLE_ERROR_ERROR: "HANDLE_ERROR_ERROR", - GRACEFUL_EXIT_TIMEOUT: "GRACEFUL_EXIT_TIMEOUT", - TASK_RUN_CRASHED: "TASK_RUN_CRASHED", - MAX_DURATION_EXCEEDED: "MAX_DURATION_EXCEEDED", - DISK_SPACE_EXCEEDED: "DISK_SPACE_EXCEEDED", - POD_EVICTED: "POD_EVICTED", - POD_UNKNOWN_ERROR: "POD_UNKNOWN_ERROR", - IMPORT_PAYLOAD_ERROR: "IMPORT_PAYLOAD_ERROR", -} as const; - export const TaskRunInternalError = z.object({ type: z.literal("INTERNAL_ERROR"), code: z.enum([ @@ -131,6 +107,8 @@ export const TaskRunInternalError = z.object({ stackTrace: z.string().optional(), }); +export const TaskRunErrorCodes = TaskRunInternalError.shape.code.enum; + export type TaskRunInternalError = z.infer; export const TaskRunError = z.discriminatedUnion("type", [ From 78ef8342602d2e517aecd21ef65e9fec315187e3 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:05:44 +0100 Subject: [PATCH 15/47] unify failed and crashed run retries --- apps/webapp/app/v3/failedTaskRun.server.ts | 88 ++++++++++++++----- .../app/v3/services/completeAttempt.server.ts | 26 ++++-- .../app/v3/services/crashTaskRun.server.ts | 76 +++++++++++----- apps/webapp/app/v3/taskStatus.ts | 2 + 4 files changed, 140 insertions(+), 52 deletions(-) diff --git a/apps/webapp/app/v3/failedTaskRun.server.ts b/apps/webapp/app/v3/failedTaskRun.server.ts index d9b90b4bc4..9ba54138e6 100644 --- a/apps/webapp/app/v3/failedTaskRun.server.ts +++ b/apps/webapp/app/v3/failedTaskRun.server.ts @@ -36,12 +36,11 @@ export class FailedTaskRunService extends BaseService { const isFriendlyId = anyRunId.startsWith("run_"); - const taskRun = await this._prisma.taskRun.findUnique({ + const taskRun = await this._prisma.taskRun.findFirst({ where: { friendlyId: isFriendlyId ? anyRunId : undefined, id: !isFriendlyId ? anyRunId : undefined, }, - include: includeAttempts, }); if (!taskRun) { @@ -62,24 +61,13 @@ export class FailedTaskRunService extends BaseService { return; } - const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion); - - if (retriableExecution) { - logger.debug("[FailedTaskRunService] Completing attempt", { taskRun, completion }); - - const executionRetry = - completion.retry ?? (await this.#getExecutionRetry(taskRun, retriableExecution)); - - const completeAttempt = new CompleteAttemptService(this._prisma); - await completeAttempt.call({ - completion: { - ...completion, - retry: executionRetry, - }, - execution: retriableExecution, - isSystemFailure: true, - }); + const retryHelper = new FailedTaskRunRetryHelper(this._prisma); + const retryResult = await retryHelper.call({ + runId: taskRun.id, + completion, + }); + if (retryResult !== undefined) { return; } @@ -112,6 +100,58 @@ export class FailedTaskRunService extends BaseService { ], }); } +} + +export class FailedTaskRunRetryHelper extends BaseService { + async call({ + runId, + completion, + isCrash, + }: { + runId: string; + completion: TaskRunFailedExecutionResult; + isCrash?: boolean; + }) { + const taskRun = await this._prisma.taskRun.findFirst({ + where: { + id: runId, + }, + include: includeAttempts, + }); + + if (!taskRun) { + logger.error("[FailedTaskRunRetryHelper] Task run not found", { + runId, + completion, + }); + + return; + } + + const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion); + + if (!retriableExecution) { + return; + } + + logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion }); + + const executionRetry = + completion.retry ?? (await this.#getExecutionRetry(taskRun, retriableExecution)); + + const completeAttempt = new CompleteAttemptService(this._prisma); + const completeResult = await completeAttempt.call({ + completion: { + ...completion, + retry: executionRetry, + }, + execution: retriableExecution, + isSystemFailure: !isCrash, + isCrash, + }); + + return completeResult; + } async #getRetriableAttemptExecution( run: TaskRunWithAttempts, @@ -123,7 +163,7 @@ export class FailedTaskRunService extends BaseService { // - None exists yet // - The last attempt has a final status, e.g. we failed between attempts if (!attempt || isFinalAttemptStatus(attempt.status)) { - logger.error("[FailedTaskRunService] No attempts found", { + logger.error("[FailedTaskRunRetryHelper] No attempts found", { run, completion, }); @@ -134,7 +174,7 @@ export class FailedTaskRunService extends BaseService { const { execution } = await createAttempt.call(run.id); return execution; } catch (error) { - logger.error("[FailedTaskRunService] Failed to create attempt", { + logger.error("[FailedTaskRunRetryHelper] Failed to create attempt", { run, completion, error, @@ -153,7 +193,7 @@ export class FailedTaskRunService extends BaseService { return executionPayload?.execution; } catch (error) { - logger.error("[FailedTaskRunService] Failed to get execution payload", { + logger.error("[FailedTaskRunRetryHelper] Failed to get execution payload", { run, completion, error, @@ -170,7 +210,7 @@ export class FailedTaskRunService extends BaseService { const parsedRetryConfig = RetryOptions.safeParse(run.lockedBy?.retryConfig); if (!parsedRetryConfig.success) { - logger.error("[FailedTaskRunService] Invalid retry config", { + logger.error("[FailedTaskRunRetryHelper] Invalid retry config", { run, execution, }); @@ -181,7 +221,7 @@ export class FailedTaskRunService extends BaseService { const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number); if (!delay) { - logger.debug("[FailedTaskRunService] No more retries", { + logger.debug("[FailedTaskRunRetryHelper] No more retries", { run, execution, }); diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 94bc47a67e..ef91535b4c 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -22,7 +22,7 @@ import { MAX_TASK_RUN_ATTEMPTS } from "~/consts"; import { CreateCheckpointService } from "./createCheckpoint.server"; import { TaskRun } from "@trigger.dev/database"; import { RetryAttemptService } from "./retryAttempt.server"; -import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; +import { FAILED_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { env } from "~/env.server"; @@ -41,6 +41,7 @@ export class CompleteAttemptService extends BaseService { checkpoint, supportsRetryCheckpoints, isSystemFailure, + isCrash, }: { completion: TaskRunExecutionResult; execution: TaskRunExecution; @@ -48,6 +49,7 @@ export class CompleteAttemptService extends BaseService { checkpoint?: CheckpointData; supportsRetryCheckpoints?: boolean; isSystemFailure?: boolean; + isCrash?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { const taskRunAttempt = await findAttempt(this._prisma, execution.attempt.id); @@ -114,6 +116,7 @@ export class CompleteAttemptService extends BaseService { checkpoint, supportsRetryCheckpoints, isSystemFailure, + isCrash, }); } } @@ -175,6 +178,7 @@ export class CompleteAttemptService extends BaseService { checkpoint, supportsRetryCheckpoints, isSystemFailure, + isCrash, }: { completion: TaskRunFailedExecutionResult; execution: TaskRunExecution; @@ -183,6 +187,7 @@ export class CompleteAttemptService extends BaseService { checkpoint?: CheckpointData; supportsRetryCheckpoints?: boolean; isSystemFailure?: boolean; + isCrash?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { if ( completion.error.type === "INTERNAL_ERROR" && @@ -260,11 +265,20 @@ export class CompleteAttemptService extends BaseService { }, }); - const status = isSystemFailure - ? "SYSTEM_FAILURE" - : sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED" - ? "TIMED_OUT" - : "COMPLETED_WITH_ERRORS"; + let status: FAILED_RUN_STATUSES; + + if (isSystemFailure) { + status = "SYSTEM_FAILURE"; + } else if (isCrash) { + status = "CRASHED"; + } else if ( + sanitizedError.type === "INTERNAL_ERROR" && + sanitizedError.code === "MAX_DURATION_EXCEEDED" + ) { + status = "TIMED_OUT"; + } else { + status = "COMPLETED_WITH_ERRORS"; + } const finalizeService = new FinalizeTaskRunService(); await finalizeService.call({ diff --git a/apps/webapp/app/v3/services/crashTaskRun.server.ts b/apps/webapp/app/v3/services/crashTaskRun.server.ts index 5ac2664936..3dbc736a74 100644 --- a/apps/webapp/app/v3/services/crashTaskRun.server.ts +++ b/apps/webapp/app/v3/services/crashTaskRun.server.ts @@ -6,6 +6,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus"; import { sanitizeError, TaskRunInternalError } from "@trigger.dev/core/v3"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; +import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; export type CrashTaskRunServiceOptions = { reason?: string; @@ -35,16 +36,43 @@ export class CrashTaskRunService extends BaseService { }); if (!taskRun) { - logger.error("Task run not found", { runId }); + logger.error("[CrashTaskRunService] Task run not found", { runId }); return; } // Make sure the task run is in a crashable state if (!opts.overrideCompletion && !isCrashableRunStatus(taskRun.status)) { - logger.error("Task run is not in a crashable state", { runId, status: taskRun.status }); + logger.error("[CrashTaskRunService] Task run is not in a crashable state", { + runId, + status: taskRun.status, + }); + return; + } + + logger.debug("[CrashTaskRunService] Completing attempt", { runId, options }); + + const retryHelper = new FailedTaskRunRetryHelper(this._prisma); + const retryResult = await retryHelper.call({ + runId, + completion: { + ok: false, + id: runId, + error: { + type: "INTERNAL_ERROR", + code: opts.errorCode ?? "TASK_RUN_CRASHED", + message: opts.reason, + stackTrace: opts.logs, + }, + }, + }); + + if (retryResult === "RETRIED") { + logger.debug("[CrashTaskRunService] Retried task run", { runId }); return; } + logger.debug("[CrashTaskRunService] Overriding completion", { runId, options }); + const finalizeService = new FinalizeTaskRunService(); const crashedTaskRun = await finalizeService.call({ id: taskRun.id, @@ -86,7 +114,7 @@ export class CrashTaskRunService extends BaseService { options?.overrideCompletion ); - logger.debug("Crashing in-progress events", { + logger.debug("[CrashTaskRunService] Crashing in-progress events", { inProgressEvents: inProgressEvents.map((event) => event.id), }); @@ -135,25 +163,29 @@ export class CrashTaskRunService extends BaseService { code?: TaskRunInternalError["code"]; } ) { - return await this.traceWithEnv("failAttempt()", environment, async (span) => { - span.setAttribute("taskRunId", run.id); - span.setAttribute("attemptId", attempt.id); + return await this.traceWithEnv( + "[CrashTaskRunService] failAttempt()", + environment, + async (span) => { + span.setAttribute("taskRunId", run.id); + span.setAttribute("attemptId", attempt.id); - await this._prisma.taskRunAttempt.update({ - where: { - id: attempt.id, - }, - data: { - status: "FAILED", - completedAt: failedAt, - error: sanitizeError({ - type: "INTERNAL_ERROR", - code: error.code ?? "TASK_RUN_CRASHED", - message: error.reason, - stackTrace: error.logs, - }), - }, - }); - }); + await this._prisma.taskRunAttempt.update({ + where: { + id: attempt.id, + }, + data: { + status: "FAILED", + completedAt: failedAt, + error: sanitizeError({ + type: "INTERNAL_ERROR", + code: error.code ?? "TASK_RUN_CRASHED", + message: error.reason, + stackTrace: error.logs, + }), + }, + }); + } + ); } } diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index 1930c5d438..c831fa29e4 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -49,6 +49,8 @@ export const FAILED_RUN_STATUSES = [ "TIMED_OUT", ] satisfies TaskRunStatus[]; +export type FAILED_RUN_STATUSES = (typeof FAILED_RUN_STATUSES)[number]; + export const CANCELLABLE_RUN_STATUSES = NON_FINAL_RUN_STATUSES; export const CANCELLABLE_ATTEMPT_STATUSES = NON_FINAL_ATTEMPT_STATUSES; From cba5885b42c5585f09e2504bb0eaebd83a6b365a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 16 Oct 2024 22:42:27 +0100 Subject: [PATCH 16/47] Prevent uncaught socket ack exceptions (#1415) * catch all the remaining socket acks that could possibly throw * wrap the remaining handlers in try catch --- apps/coordinator/src/index.ts | 701 ++++++++++++++++++++-------------- 1 file changed, 407 insertions(+), 294 deletions(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 02e59483b6..e2a7ba3259 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -536,7 +536,11 @@ class TaskCoordinator { socket.on("TEST", (message, callback) => { logger.log("Handling TEST", { eventName: "TEST", ...getSocketMetadata(), ...message }); - callback(); + try { + callback(); + } catch (error) { + logger.error("TEST error", { error }); + } }); // Deprecated: Only workers without support for lazy attempts use this @@ -669,13 +673,25 @@ class TaskCoordinator { log.log("Handling READY_FOR_RESUME"); - updateAttemptFriendlyId(message.attemptFriendlyId); + try { + updateAttemptFriendlyId(message.attemptFriendlyId); - if (message.version === "v2") { - updateAttemptNumber(message.attemptNumber); - } + if (message.version === "v2") { + updateAttemptNumber(message.attemptNumber); + } + + this.#platformSocket?.send("READY_FOR_RESUME", { ...message, version: "v1" }); + } catch (error) { + log.error("READY_FOR_RESUME error", { error }); - this.#platformSocket?.send("READY_FOR_RESUME", { ...message, version: "v1" }); + await crashRun({ + name: "ReadyForResumeError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + + return; + } }); // MARK: RUN COMPLETED @@ -692,100 +708,112 @@ class TaskCoordinator { log.log("Handling TASK_RUN_COMPLETED"); - const { completion, execution } = message; + try { + const { completion, execution } = message; - // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + // Cancel all in-progress checkpoints (if any) + this.#cancelCheckpoint(socket.data.runId); - await chaosMonkey.call({ throwErrors: false }); + await chaosMonkey.call({ throwErrors: false }); - const completeWithoutCheckpoint = (shouldExit: boolean) => { - const supportsRetryCheckpoints = message.version === "v1"; + const completeWithoutCheckpoint = (shouldExit: boolean) => { + const supportsRetryCheckpoints = message.version === "v1"; - this.#platformSocket?.send("TASK_RUN_COMPLETED", { - version: supportsRetryCheckpoints ? "v1" : "v2", - execution, - completion, - }); - callback({ willCheckpointAndRestore: false, shouldExit }); - }; + this.#platformSocket?.send("TASK_RUN_COMPLETED", { + version: supportsRetryCheckpoints ? "v1" : "v2", + execution, + completion, + }); + callback({ willCheckpointAndRestore: false, shouldExit }); + }; - if (completion.ok) { - completeWithoutCheckpoint(true); - return; - } + if (completion.ok) { + completeWithoutCheckpoint(true); + return; + } - if ( - completion.error.type === "INTERNAL_ERROR" && - completion.error.code === "TASK_RUN_CANCELLED" - ) { - completeWithoutCheckpoint(true); - return; - } + if ( + completion.error.type === "INTERNAL_ERROR" && + completion.error.code === "TASK_RUN_CANCELLED" + ) { + completeWithoutCheckpoint(true); + return; + } - if (completion.retry === undefined) { - completeWithoutCheckpoint(true); - return; - } + if (completion.retry === undefined) { + completeWithoutCheckpoint(true); + return; + } - if (completion.retry.delay < this.#delayThresholdInMs) { - completeWithoutCheckpoint(false); + if (completion.retry.delay < this.#delayThresholdInMs) { + completeWithoutCheckpoint(false); - // Prevents runs that fail fast from never sending a heartbeat - this.#sendRunHeartbeat(socket.data.runId); + // Prevents runs that fail fast from never sending a heartbeat + this.#sendRunHeartbeat(socket.data.runId); - return; - } + return; + } - if (message.version === "v2") { - completeWithoutCheckpoint(true); - return; - } + if (message.version === "v2") { + completeWithoutCheckpoint(true); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - if (!willCheckpointAndRestore) { - completeWithoutCheckpoint(false); - return; - } + if (!willCheckpointAndRestore) { + completeWithoutCheckpoint(false); + return; + } - // The worker will then put itself in a checkpointable state - callback({ willCheckpointAndRestore: true, shouldExit: false }); + // The worker will then put itself in a checkpointable state + callback({ willCheckpointAndRestore: true, shouldExit: false }); - const ready = await readyToCheckpoint("RETRY"); + const ready = await readyToCheckpoint("RETRY"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); - return; - } + return; + } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - shouldHeartbeat: true, - }); + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + shouldHeartbeat: true, + }); - if (!checkpoint) { - log.error("Failed to checkpoint"); - completeWithoutCheckpoint(false); - return; - } + if (!checkpoint) { + log.error("Failed to checkpoint"); + completeWithoutCheckpoint(false); + return; + } - log.addFields({ checkpoint }); + log.addFields({ checkpoint }); - this.#platformSocket?.send("TASK_RUN_COMPLETED", { - version: "v1", - execution, - completion, - checkpoint, - }); + this.#platformSocket?.send("TASK_RUN_COMPLETED", { + version: "v1", + execution, + completion, + checkpoint, + }); - if (!checkpoint.docker || !willSimulate) { - exitRun(); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("TASK_RUN_COMPLETED error", { error }); + + await crashRun({ + name: "TaskRunCompletedError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + + return; } }); @@ -802,15 +830,21 @@ class TaskCoordinator { log.log("Handling TASK_RUN_FAILED_TO_RUN"); - // Cancel all in-progress checkpoints (if any) - this.#cancelCheckpoint(socket.data.runId); + try { + // Cancel all in-progress checkpoints (if any) + this.#cancelCheckpoint(socket.data.runId); + + this.#platformSocket?.send("TASK_RUN_FAILED_TO_RUN", { + version: "v1", + completion, + }); - this.#platformSocket?.send("TASK_RUN_FAILED_TO_RUN", { - version: "v1", - completion, - }); + exitRun(); + } catch (error) { + log.error("TASK_RUN_FAILED_TO_RUN error", { error }); - exitRun(); + return; + } }); // MARK: CHECKPOINT @@ -823,14 +857,20 @@ class TaskCoordinator { log.log("Handling READY_FOR_CHECKPOINT"); - const checkpointable = this.#checkpointableTasks.get(socket.data.runId); + try { + const checkpointable = this.#checkpointableTasks.get(socket.data.runId); + + if (!checkpointable) { + log.error("No checkpoint scheduled"); + return; + } + + checkpointable.resolve(); + } catch (error) { + log.error("READY_FOR_CHECKPOINT error", { error }); - if (!checkpointable) { - log.error("No checkpoint scheduled"); return; } - - checkpointable.resolve(); }); // MARK: CXX CHECKPOINT @@ -843,15 +883,19 @@ class TaskCoordinator { log.log("Handling CANCEL_CHECKPOINT"); - if (message.version === "v1") { - this.#cancelCheckpoint(socket.data.runId); - // v1 has no callback - return; - } + try { + if (message.version === "v1") { + this.#cancelCheckpoint(socket.data.runId); + // v1 has no callback + return; + } - const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId); + const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId); - callback({ version: "v2", checkpointCanceled }); + callback({ version: "v2", checkpointCanceled }); + } catch (error) { + log.error("CANCEL_CHECKPOINT error", { error }); + } }); // MARK: DURATION WAIT @@ -864,66 +908,78 @@ class TaskCoordinator { log.log("Handling WAIT_FOR_DURATION"); - await chaosMonkey.call({ throwErrors: false }); + try { + await chaosMonkey.call({ throwErrors: false }); - if (checkpointInProgress()) { - log.error("Checkpoint already in progress"); - callback({ willCheckpointAndRestore: false }); - return; - } + if (checkpointInProgress()) { + log.error("Checkpoint already in progress"); + callback({ willCheckpointAndRestore: false }); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - callback({ willCheckpointAndRestore }); + callback({ willCheckpointAndRestore }); - if (!willCheckpointAndRestore) { - return; - } + if (!willCheckpointAndRestore) { + return; + } - const ready = await readyToCheckpoint("WAIT_FOR_DURATION"); + const ready = await readyToCheckpoint("WAIT_FOR_DURATION"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); - return; - } + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); + return; + } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }); - if (!checkpoint) { - // The task container will keep running until the wait duration has elapsed - log.error("Failed to checkpoint"); - return; - } + if (!checkpoint) { + // The task container will keep running until the wait duration has elapsed + log.error("Failed to checkpoint"); + return; + } - log.addFields({ checkpoint }); + log.addFields({ checkpoint }); - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_DURATION", - ms: message.ms, - now: message.now, - }, - }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_DURATION", + ms: message.ms, + now: message.now, + }, + }); - if (ack?.keepRunAlive) { - log.log("keeping run alive after duration checkpoint"); - return; - } + if (ack?.keepRunAlive) { + log.log("keeping run alive after duration checkpoint"); + return; + } - if (!checkpoint.docker || !willSimulate) { - exitRun(); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("WAIT_FOR_DURATION error", { error }); + + await crashRun({ + name: "WaitForDurationError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + + return; } }); @@ -937,74 +993,86 @@ class TaskCoordinator { log.log("Handling WAIT_FOR_TASK"); - await chaosMonkey.call({ throwErrors: false }); + try { + await chaosMonkey.call({ throwErrors: false }); - if (checkpointInProgress()) { - log.error("Checkpoint already in progress"); - callback({ willCheckpointAndRestore: false }); - return; - } + if (checkpointInProgress()) { + log.error("Checkpoint already in progress"); + callback({ willCheckpointAndRestore: false }); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - callback({ willCheckpointAndRestore }); + callback({ willCheckpointAndRestore }); - if (!willCheckpointAndRestore) { - return; - } + if (!willCheckpointAndRestore) { + return; + } - // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits - if (message.version === "v2") { - const ready = await readyToCheckpoint("WAIT_FOR_TASK"); + // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits + if (message.version === "v2") { + const ready = await readyToCheckpoint("WAIT_FOR_TASK"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); + return; + } + } + + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }); + + if (!checkpoint) { + log.error("Failed to checkpoint"); return; } - } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + log.addFields({ checkpoint }); - if (!checkpoint) { - log.error("Failed to checkpoint"); - return; - } + log.log("WAIT_FOR_TASK checkpoint created"); - log.addFields({ checkpoint }); + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage"); - log.log("WAIT_FOR_TASK checkpoint created"); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_TASK", + friendlyId: message.friendlyId, + }, + }); - //setting this means we can only resume from a checkpoint - socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage"); + if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; + log.log("keeping run alive after task checkpoint"); + return; + } - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_TASK", - friendlyId: message.friendlyId, - }, - }); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("WAIT_FOR_TASK error", { error }); - if (ack?.keepRunAlive) { - socket.data.requiresCheckpointResumeWithMessage = undefined; - log.log("keeping run alive after task checkpoint"); - return; - } + await crashRun({ + name: "WaitForTaskError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); - if (!checkpoint.docker || !willSimulate) { - exitRun(); + return; } }); @@ -1018,75 +1086,87 @@ class TaskCoordinator { log.log("Handling WAIT_FOR_BATCH", message); - await chaosMonkey.call({ throwErrors: false }); + try { + await chaosMonkey.call({ throwErrors: false }); - if (checkpointInProgress()) { - log.error("Checkpoint already in progress"); - callback({ willCheckpointAndRestore: false }); - return; - } + if (checkpointInProgress()) { + log.error("Checkpoint already in progress"); + callback({ willCheckpointAndRestore: false }); + return; + } - const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); + const { canCheckpoint, willSimulate } = await this.#checkpointer.init(); - const willCheckpointAndRestore = canCheckpoint || willSimulate; + const willCheckpointAndRestore = canCheckpoint || willSimulate; - callback({ willCheckpointAndRestore }); + callback({ willCheckpointAndRestore }); - if (!willCheckpointAndRestore) { - return; - } + if (!willCheckpointAndRestore) { + return; + } - // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits - if (message.version === "v2") { - const ready = await readyToCheckpoint("WAIT_FOR_BATCH"); + // Workers with v1 schemas don't signal when they're ready to checkpoint for dependency waits + if (message.version === "v2") { + const ready = await readyToCheckpoint("WAIT_FOR_BATCH"); - if (!ready.success) { - log.error("Failed to become checkpointable", { reason: ready.reason }); + if (!ready.success) { + log.error("Failed to become checkpointable", { reason: ready.reason }); + return; + } + } + + const checkpoint = await this.#checkpointer.checkpointAndPush({ + runId: socket.data.runId, + projectRef: socket.data.projectRef, + deploymentVersion: socket.data.deploymentVersion, + attemptNumber: getAttemptNumber(), + }); + + if (!checkpoint) { + log.error("Failed to checkpoint"); return; } - } - const checkpoint = await this.#checkpointer.checkpointAndPush({ - runId: socket.data.runId, - projectRef: socket.data.projectRef, - deploymentVersion: socket.data.deploymentVersion, - attemptNumber: getAttemptNumber(), - }); + log.addFields({ checkpoint }); - if (!checkpoint) { - log.error("Failed to checkpoint"); - return; - } + log.log("WAIT_FOR_BATCH checkpoint created"); - log.addFields({ checkpoint }); + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + log.log("WAIT_FOR_BATCH set checkpoint"); - log.log("WAIT_FOR_BATCH checkpoint created"); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { + version: "v1", + runId: socket.data.runId, + attemptFriendlyId: message.attemptFriendlyId, + docker: checkpoint.docker, + location: checkpoint.location, + reason: { + type: "WAIT_FOR_BATCH", + batchFriendlyId: message.batchFriendlyId, + runFriendlyIds: message.runFriendlyIds, + }, + }); - //setting this means we can only resume from a checkpoint - socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - log.log("WAIT_FOR_BATCH set checkpoint"); + if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; + log.log("keeping run alive after batch checkpoint"); + return; + } - const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { - version: "v1", - runId: socket.data.runId, - attemptFriendlyId: message.attemptFriendlyId, - docker: checkpoint.docker, - location: checkpoint.location, - reason: { - type: "WAIT_FOR_BATCH", - batchFriendlyId: message.batchFriendlyId, - runFriendlyIds: message.runFriendlyIds, - }, - }); + if (!checkpoint.docker || !willSimulate) { + exitRun(); + } + } catch (error) { + log.error("WAIT_FOR_BATCH error", { error }); - if (ack?.keepRunAlive) { - socket.data.requiresCheckpointResumeWithMessage = undefined; - log.log("keeping run alive after batch checkpoint"); - return; - } + await crashRun({ + name: "WaitForBatchError", + message: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); - if (!checkpoint.docker || !willSimulate) { - exitRun(); + return; } }); @@ -1100,24 +1180,29 @@ class TaskCoordinator { log.log("Handling INDEX_TASKS"); - const workerAck = await this.#platformSocket?.sendWithAck("CREATE_WORKER", { - version: "v2", - projectRef: socket.data.projectRef, - envId: socket.data.envId, - deploymentId: message.deploymentId, - metadata: { - contentHash: socket.data.contentHash, - packageVersion: message.packageVersion, - tasks: message.tasks, - }, - supportsLazyAttempts: message.version !== "v1" && message.supportsLazyAttempts, - }); + try { + const workerAck = await this.#platformSocket?.sendWithAck("CREATE_WORKER", { + version: "v2", + projectRef: socket.data.projectRef, + envId: socket.data.envId, + deploymentId: message.deploymentId, + metadata: { + contentHash: socket.data.contentHash, + packageVersion: message.packageVersion, + tasks: message.tasks, + }, + supportsLazyAttempts: message.version !== "v1" && message.supportsLazyAttempts, + }); - if (!workerAck) { - log.debug("no worker ack while indexing"); - } + if (!workerAck) { + log.debug("no worker ack while indexing"); + } - callback({ success: !!workerAck?.success }); + callback({ success: !!workerAck?.success }); + } catch (error) { + log.error("INDEX_TASKS error", { error }); + callback({ success: false }); + } }); // MARK: INDEX FAILED @@ -1130,11 +1215,15 @@ class TaskCoordinator { log.log("Handling INDEXING_FAILED"); - this.#platformSocket?.send("INDEXING_FAILED", { - version: "v1", - deploymentId: message.deploymentId, - error: message.error, - }); + try { + this.#platformSocket?.send("INDEXING_FAILED", { + version: "v1", + deploymentId: message.deploymentId, + error: message.error, + }); + } catch (error) { + log.error("INDEXING_FAILED error", { error }); + } }); // MARK: CREATE ATTEMPT @@ -1147,26 +1236,38 @@ class TaskCoordinator { log.log("Handling CREATE_TASK_RUN_ATTEMPT"); - await chaosMonkey.call({ throwErrors: false }); - - const createAttempt = await this.#platformSocket?.sendWithAck("CREATE_TASK_RUN_ATTEMPT", { - runId: message.runId, - envId: socket.data.envId, - }); + try { + await chaosMonkey.call({ throwErrors: false }); + + const createAttempt = await this.#platformSocket?.sendWithAck( + "CREATE_TASK_RUN_ATTEMPT", + { + runId: message.runId, + envId: socket.data.envId, + } + ); - if (!createAttempt?.success) { - log.debug("no ack while creating attempt", { reason: createAttempt?.reason }); - callback({ success: false, reason: createAttempt?.reason }); - return; - } + if (!createAttempt?.success) { + log.debug("no ack while creating attempt", { reason: createAttempt?.reason }); + callback({ success: false, reason: createAttempt?.reason }); + return; + } - updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id); - updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number); + updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id); + updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number); - callback({ - success: true, - executionPayload: createAttempt.executionPayload, - }); + callback({ + success: true, + executionPayload: createAttempt.executionPayload, + }); + } catch (error) { + log.error("CREATE_TASK_RUN_ATTEMPT error", { error }); + callback({ + success: false, + reason: + error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error", + }); + } }); socket.on("UNRECOVERABLE_ERROR", async (message) => { @@ -1178,7 +1279,11 @@ class TaskCoordinator { log.log("Handling UNRECOVERABLE_ERROR"); - await crashRun(message.error); + try { + await crashRun(message.error); + } catch (error) { + log.error("UNRECOVERABLE_ERROR error", { error }); + } }); socket.on("SET_STATE", async (message) => { @@ -1190,20 +1295,28 @@ class TaskCoordinator { log.log("Handling SET_STATE"); - if (message.attemptFriendlyId) { - updateAttemptFriendlyId(message.attemptFriendlyId); - } + try { + if (message.attemptFriendlyId) { + updateAttemptFriendlyId(message.attemptFriendlyId); + } - if (message.attemptNumber) { - updateAttemptNumber(message.attemptNumber); + if (message.attemptNumber) { + updateAttemptNumber(message.attemptNumber); + } + } catch (error) { + log.error("SET_STATE error", { error }); } }); }, onDisconnect: async (socket, handler, sender, logger) => { - this.#platformSocket?.send("LOG", { - metadata: socket.data, - text: "disconnect", - }); + try { + this.#platformSocket?.send("LOG", { + metadata: socket.data, + text: "disconnect", + }); + } catch (error) { + logger.error("onDisconnect error", { error }); + } }, handlers: { TASK_HEARTBEAT: async (message) => { From 3ad51f2c3ade479e8976e250b4b5e62311814f13 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Thu, 17 Oct 2024 10:54:38 +0100 Subject: [PATCH 17/47] New onboarding question (#1404) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Updated “Twitter” to be “X (Twitter)” * added Textarea to storybook * Updated textarea styling to match input field * WIP adding new text field to org creation page * Added description to field * Submit feedback to Plain when an org signs up * Formatting improvement * type improvement * removed userId * Moved submitting to Plain into its own file * Change orgName with name * use sendToPlain function for the help & feedback email form * use name not orgName * import cleanup * Downgrading plan form uses sendToPlain * Get the userId from requireUser only * Added whitespace-pre-wrap to the message property on the run page * use requireUserId * Removed old Plain submit code --- apps/webapp/app/components/Feedback.tsx | 6 +- .../app/components/primitives/TextArea.tsx | 2 +- .../webapp/app/routes/_app.orgs.new/route.tsx | 120 ++++++++++++------ .../app/routes/confirm-basic-details.tsx | 2 +- apps/webapp/app/routes/resources.feedback.ts | 81 ++---------- .../route.tsx | 2 +- ...ces.orgs.$organizationSlug.select-plan.tsx | 68 +--------- .../app/routes/storybook.textarea/route.tsx | 17 +++ apps/webapp/app/routes/storybook/route.tsx | 12 +- apps/webapp/app/utils/plain.server.ts | 59 +++++++++ 10 files changed, 188 insertions(+), 181 deletions(-) create mode 100644 apps/webapp/app/routes/storybook.textarea/route.tsx create mode 100644 apps/webapp/app/utils/plain.server.ts diff --git a/apps/webapp/app/components/Feedback.tsx b/apps/webapp/app/components/Feedback.tsx index 3c9b389750..5a5472a976 100644 --- a/apps/webapp/app/components/Feedback.tsx +++ b/apps/webapp/app/components/Feedback.tsx @@ -1,8 +1,9 @@ import { conform, useForm } from "@conform-to/react"; import { parse } from "@conform-to/zod"; -import { EnvelopeIcon, LightBulbIcon } from "@heroicons/react/24/solid"; +import { InformationCircleIcon } from "@heroicons/react/20/solid"; +import { EnvelopeIcon } from "@heroicons/react/24/solid"; import { Form, useActionData, useLocation, useNavigation } from "@remix-run/react"; -import { type ReactNode, useState, useEffect } from "react"; +import { type ReactNode, useEffect, useState } from "react"; import { type FeedbackType, feedbackTypeLabel, schema } from "~/routes/resources.feedback"; import { Button } from "./primitives/Buttons"; import { Dialog, DialogContent, DialogHeader, DialogTrigger } from "./primitives/Dialog"; @@ -16,7 +17,6 @@ import { Label } from "./primitives/Label"; import { Paragraph } from "./primitives/Paragraph"; import { Select, SelectItem } from "./primitives/Select"; import { TextArea } from "./primitives/TextArea"; -import { InformationCircleIcon } from "@heroicons/react/20/solid"; import { TextLink } from "./primitives/TextLink"; type FeedbackProps = { diff --git a/apps/webapp/app/components/primitives/TextArea.tsx b/apps/webapp/app/components/primitives/TextArea.tsx index 7d543e2ec5..f5350a510b 100644 --- a/apps/webapp/app/components/primitives/TextArea.tsx +++ b/apps/webapp/app/components/primitives/TextArea.tsx @@ -8,7 +8,7 @@ export function TextArea({ className, rows, ...props }: TextAreaProps) { {...props} rows={rows ?? 6} className={cn( - "placeholder:text-muted-foreground w-full rounded-md border border-tertiary bg-tertiary px-3 text-sm text-text-bright transition focus-custom file:border-0 file:bg-transparent file:text-base file:font-medium hover:border-charcoal-600 focus:border-transparent focus:ring-0 disabled:cursor-not-allowed disabled:opacity-50", + "placeholder:text-muted-foreground w-full rounded border border-charcoal-800 bg-charcoal-750 px-3 text-sm text-text-bright transition focus-custom focus-custom file:border-0 file:bg-transparent file:text-base file:font-medium hover:border-charcoal-600 hover:bg-charcoal-650 disabled:cursor-not-allowed disabled:opacity-50", className )} /> diff --git a/apps/webapp/app/routes/_app.orgs.new/route.tsx b/apps/webapp/app/routes/_app.orgs.new/route.tsx index d99286dd05..571c6163be 100644 --- a/apps/webapp/app/routes/_app.orgs.new/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.new/route.tsx @@ -4,6 +4,7 @@ import { RadioGroup } from "@radix-ui/react-radio-group"; import type { ActionFunction, LoaderFunctionArgs } from "@remix-run/node"; import { json, redirect } from "@remix-run/node"; import { Form, useActionData, useNavigation } from "@remix-run/react"; +import { uiComponent } from "@team-plain/typescript-sdk"; import { typedjson, useTypedLoaderData } from "remix-typedjson"; import { z } from "zod"; import { MainCenteredContainer } from "~/components/layout/AppLayout"; @@ -17,21 +18,25 @@ import { Input } from "~/components/primitives/Input"; import { InputGroup } from "~/components/primitives/InputGroup"; import { Label } from "~/components/primitives/Label"; import { RadioGroupItem } from "~/components/primitives/RadioButton"; +import { TextArea } from "~/components/primitives/TextArea"; import { useFeatures } from "~/hooks/useFeatures"; import { createOrganization } from "~/models/organization.server"; import { NewOrganizationPresenter } from "~/presenters/NewOrganizationPresenter.server"; -import { requireUserId } from "~/services/session.server"; +import { logger } from "~/services/logger.server"; +import { requireUser, requireUserId } from "~/services/session.server"; import { organizationPath, rootPath } from "~/utils/pathBuilder"; +import { sendToPlain } from "~/utils/plain.server"; const schema = z.object({ orgName: z.string().min(3).max(50), companySize: z.string().optional(), + whyUseUs: z.string().optional(), }); export const loader = async ({ request }: LoaderFunctionArgs) => { const userId = await requireUserId(request); const presenter = new NewOrganizationPresenter(); - const { hasOrganizations } = await presenter.call({ userId }); + const { hasOrganizations } = await presenter.call({ userId: userId }); return typedjson({ hasOrganizations, @@ -39,8 +44,7 @@ export const loader = async ({ request }: LoaderFunctionArgs) => { }; export const action: ActionFunction = async ({ request }) => { - const userId = await requireUserId(request); - + const user = await requireUser(request); const formData = await request.formData(); const submission = parse(formData, { schema }); @@ -51,10 +55,41 @@ export const action: ActionFunction = async ({ request }) => { try { const organization = await createOrganization({ title: submission.value.orgName, - userId, + userId: user.id, companySize: submission.value.companySize ?? null, }); + const whyUseUs = formData.get("whyUseUs"); + + if (whyUseUs) { + try { + await sendToPlain({ + userId: user.id, + email: user.email, + name: user.name ?? user.displayName ?? user.email, + title: "New org feedback", + components: [ + uiComponent.text({ + text: `${submission.value.orgName} just created a new organization.`, + }), + uiComponent.divider({ spacingSize: "M" }), + uiComponent.text({ + size: "L", + color: "NORMAL", + text: "What problem are you trying to solve?", + }), + uiComponent.text({ + size: "L", + color: "NORMAL", + text: whyUseUs.toString(), + }), + ], + }); + } catch (error) { + logger.error("Error sending data to Plain when creating an org:", { error }); + } + } + return redirect(organizationPath(organization)); } catch (error: any) { return json({ errors: { body: error.message } }, { status: 400 }); @@ -97,39 +132,48 @@ export default function NewOrganizationPage() { {orgName.error} {isManagedCloud && ( - - - - - - - - - + <> + + + + + + + + + + + +