diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 0d922174f3..15cd87ab6d 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -162,6 +162,49 @@ class TaskCoordinator { taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); }, + RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => { + const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); + + if (!taskSocket) { + logger.log("Socket for attempt not found", { + attemptFriendlyId: message.attemptFriendlyId, + }); + return { + success: false, + error: { + name: "SocketNotFoundError", + message: "Socket for attempt not found", + }, + }; + } + + //if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue + if (taskSocket.data.requiresCheckpointResumeWithMessage) { + logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", { + socketData: taskSocket.data, + }); + + return { + success: false, + error: { + name: "CheckpointMessagePresentError", + message: + "Checkpoint message is present, so we need to kill the process and resume from the queue.", + }, + }; + } + + await chaosMonkey.call(); + + // In case the task resumed faster than we could checkpoint + this.#cancelCheckpoint(message.runId); + + taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); + + return { + success: true, + }; + }, RESUME_AFTER_DURATION: async (message) => { const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); @@ -792,6 +835,18 @@ class TaskCoordinator { return; } + logger.log("WAIT_FOR_TASK checkpoint created", { + checkpoint, + socketData: socket.data, + }); + + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", { + checkpoint, + socketData: socket.data, + }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", attemptFriendlyId: message.attemptFriendlyId, @@ -804,6 +859,7 @@ class TaskCoordinator { }); if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId }); return; } @@ -862,6 +918,18 @@ class TaskCoordinator { return; } + logger.log("WAIT_FOR_BATCH checkpoint created", { + checkpoint, + socketData: socket.data, + }); + + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + logger.log("WAIT_FOR_BATCH set checkpoint", { + checkpoint, + socketData: socket.data, + }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", attemptFriendlyId: message.attemptFriendlyId, @@ -875,6 +943,7 @@ class TaskCoordinator { }); if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId }); return; } diff --git a/apps/webapp/app/hooks/useSearchParam.ts b/apps/webapp/app/hooks/useSearchParam.ts index 0ed81fb3e5..c0f939abcc 100644 --- a/apps/webapp/app/hooks/useSearchParam.ts +++ b/apps/webapp/app/hooks/useSearchParam.ts @@ -18,13 +18,13 @@ export function useSearchParams() { } if (typeof value === "string") { - search.set(param, encodeURIComponent(value)); + search.set(param, value); continue; } search.delete(param); for (const v of value) { - search.append(param, encodeURIComponent(v)); + search.append(param, v); } } }, diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 606d9ca923..61a05b6ee2 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -725,20 +725,47 @@ export class SharedQueueConsumer { } try { - logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", { - runId: resumableAttempt.taskRunId, - attemptId: resumableAttempt.id, - }); - - // The attempt should still be running so we can broadcast to all coordinators to resume immediately - socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", { - version: "v1", + const resumeMessage = { + version: "v1" as const, runId: resumableAttempt.taskRunId, attemptId: resumableAttempt.id, attemptFriendlyId: resumableAttempt.friendlyId, completions, executions, + }; + + logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message }); + + // The attempt should still be running so we can broadcast to all coordinators to resume immediately + const responses = await socketIo.coordinatorNamespace + .timeout(10_000) + .emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage); + + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", { + resumeMessage, + responses, + message, }); + + if (responses.length === 0) { + logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", { + resumeMessage, + message, + }); + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); + return; + } + + const failed = responses.filter((response) => !response.success); + if (failed.length > 0) { + logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", { + resumeMessage, + failed, + message, + }); + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); + return; + } } catch (e) { if (e instanceof Error) { this._currentSpan?.recordException(e); diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index f0a9b57f9a..b7e6d0ca61 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -132,7 +132,9 @@ export class ResumeBatchRunService extends BaseService { if (wasUpdated) { logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", { batchRunId: batchRun.id, - dependentTaskAttemptId: batchRun.dependentTaskAttempt.id, + dependentTaskAttempt: batchRun.dependentTaskAttempt, + checkpointEventId: batchRun.checkpointEventId, + hasCheckpointEvent: !!batchRun.checkpointEventId, }); await marqs?.replaceMessage(dependentRun.id, { type: "RESUME", diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 5ce65de9fc..746848640e 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -15,6 +15,21 @@ import { WaitReason, } from "./schemas.js"; +const ackCallbackResult = z.discriminatedUnion("success", [ + z.object({ + success: z.literal(false), + error: z.object({ + name: z.string(), + message: z.string(), + stack: z.string().optional(), + stderr: z.string().optional(), + }), + }), + z.object({ + success: z.literal(true), + }), +]); + export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [ z.object({ type: z.literal("CANCEL_ATTEMPT"), @@ -269,20 +284,7 @@ export const PlatformToProviderMessages = { projectId: z.string(), deploymentId: z.string(), }), - callback: z.discriminatedUnion("success", [ - z.object({ - success: z.literal(false), - error: z.object({ - name: z.string(), - message: z.string(), - stack: z.string().optional(), - stderr: z.string().optional(), - }), - }), - z.object({ - success: z.literal(true), - }), - ]), + callback: ackCallbackResult, }, RESTORE: { message: z.object({ @@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = { }; export const PlatformToCoordinatorMessages = { + /** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */ RESUME_AFTER_DEPENDENCY: { message: z.object({ version: z.literal("v1").default("v1"), @@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = { executions: TaskRunExecution.array(), }), }, + RESUME_AFTER_DEPENDENCY_WITH_ACK: { + message: z.object({ + version: z.literal("v1").default("v1"), + runId: z.string(), + attemptId: z.string(), + attemptFriendlyId: z.string(), + completions: TaskRunExecutionResult.array(), + executions: TaskRunExecution.array(), + }), + callback: ackCallbackResult, + }, RESUME_AFTER_DURATION: { message: z.object({ version: z.literal("v1").default("v1"), @@ -847,6 +861,7 @@ export const ProdWorkerSocketData = z.object({ podName: z.string(), deploymentId: z.string(), deploymentVersion: z.string(), + requiresCheckpointResumeWithMessage: z.string().optional(), }); export const CoordinatorSocketData = z.object({