From 558731af55c4e05f10de618225a9efa77f8a2dab Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 17 Sep 2024 11:13:07 +0100 Subject: [PATCH 1/7] Fix for run filtering not working with some special characters (double encoded) --- apps/webapp/app/hooks/useSearchParam.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/hooks/useSearchParam.ts b/apps/webapp/app/hooks/useSearchParam.ts index 0ed81fb3e5..c0f939abcc 100644 --- a/apps/webapp/app/hooks/useSearchParam.ts +++ b/apps/webapp/app/hooks/useSearchParam.ts @@ -18,13 +18,13 @@ export function useSearchParams() { } if (typeof value === "string") { - search.set(param, encodeURIComponent(value)); + search.set(param, value); continue; } search.delete(param); for (const v of value) { - search.append(param, encodeURIComponent(v)); + search.append(param, v); } } }, From 7d9d6538f72b563879571d00159fc62679fcdc6b Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 17 Sep 2024 12:36:28 +0100 Subject: [PATCH 2/7] Add the full dependentTaskAttempt to a ResumeBatchRunService log --- apps/webapp/app/v3/services/resumeBatchRun.server.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index f0a9b57f9a..b7e6d0ca61 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -132,7 +132,9 @@ export class ResumeBatchRunService extends BaseService { if (wasUpdated) { logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", { batchRunId: batchRun.id, - dependentTaskAttemptId: batchRun.dependentTaskAttempt.id, + dependentTaskAttempt: batchRun.dependentTaskAttempt, + checkpointEventId: batchRun.checkpointEventId, + hasCheckpointEvent: !!batchRun.checkpointEventId, }); await marqs?.replaceMessage(dependentRun.id, { type: "RESUME", From ed83fbe6f866230d58982833bc03343875756409 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 17 Sep 2024 15:56:54 +0100 Subject: [PATCH 3/7] Added RESUME_AFTER_DEPENDENCY_WITH_ACK --- apps/coordinator/src/index.ts | 27 ++++++++++++ .../v3/marqs/sharedQueueConsumer.server.ts | 43 +++++++++++++++---- packages/core/src/v3/schemas/messages.ts | 42 ++++++++++++------ 3 files changed, 90 insertions(+), 22 deletions(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index 0d922174f3..c1932af2c7 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -162,6 +162,33 @@ class TaskCoordinator { taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); }, + RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => { + const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); + + if (!taskSocket) { + logger.log("Socket for attempt not found", { + attemptFriendlyId: message.attemptFriendlyId, + }); + return { + success: false, + error: { + name: "SocketNotFoundError", + message: "Socket for attempt not found", + }, + }; + } + + await chaosMonkey.call(); + + // In case the task resumed faster than we could checkpoint + this.#cancelCheckpoint(message.runId); + + taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); + + return { + success: true, + }; + }, RESUME_AFTER_DURATION: async (message) => { const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 606d9ca923..7437b04087 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -725,20 +725,47 @@ export class SharedQueueConsumer { } try { - logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", { - runId: resumableAttempt.taskRunId, - attemptId: resumableAttempt.id, - }); - - // The attempt should still be running so we can broadcast to all coordinators to resume immediately - socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", { - version: "v1", + const resumeMessage = { + version: "v1" as const, runId: resumableAttempt.taskRunId, attemptId: resumableAttempt.id, attemptFriendlyId: resumableAttempt.friendlyId, completions, executions, + }; + + logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message }); + + // The attempt should still be running so we can broadcast to all coordinators to resume immediately + const responses = await socketIo.coordinatorNamespace + .timeout(10_000) + .emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage); + + logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", { + resumeMessage, + responses, + message, }); + + if (responses.length === 0) { + logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", { + resumeMessage, + message, + }); + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 1_000); + return; + } + + const failed = responses.filter((response) => !response.success); + if (failed.length > 0) { + logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", { + resumeMessage, + failed, + message, + }); + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 1_000); + return; + } } catch (e) { if (e instanceof Error) { this._currentSpan?.recordException(e); diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 5ce65de9fc..be57331f68 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -15,6 +15,21 @@ import { WaitReason, } from "./schemas.js"; +const ackCallbackResult = z.discriminatedUnion("success", [ + z.object({ + success: z.literal(false), + error: z.object({ + name: z.string(), + message: z.string(), + stack: z.string().optional(), + stderr: z.string().optional(), + }), + }), + z.object({ + success: z.literal(true), + }), +]); + export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [ z.object({ type: z.literal("CANCEL_ATTEMPT"), @@ -269,20 +284,7 @@ export const PlatformToProviderMessages = { projectId: z.string(), deploymentId: z.string(), }), - callback: z.discriminatedUnion("success", [ - z.object({ - success: z.literal(false), - error: z.object({ - name: z.string(), - message: z.string(), - stack: z.string().optional(), - stderr: z.string().optional(), - }), - }), - z.object({ - success: z.literal(true), - }), - ]), + callback: ackCallbackResult, }, RESTORE: { message: z.object({ @@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = { }; export const PlatformToCoordinatorMessages = { + /** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */ RESUME_AFTER_DEPENDENCY: { message: z.object({ version: z.literal("v1").default("v1"), @@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = { executions: TaskRunExecution.array(), }), }, + RESUME_AFTER_DEPENDENCY_WITH_ACK: { + message: z.object({ + version: z.literal("v1").default("v1"), + runId: z.string(), + attemptId: z.string(), + attemptFriendlyId: z.string(), + completions: TaskRunExecutionResult.array(), + executions: TaskRunExecution.array(), + }), + callback: ackCallbackResult, + }, RESUME_AFTER_DURATION: { message: z.object({ version: z.literal("v1").default("v1"), From 72562447357d1cdc1e651378155e9dfc7f10fc44 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 17 Sep 2024 17:00:12 +0100 Subject: [PATCH 4/7] Set the delay to 5s --- apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 7437b04087..61a05b6ee2 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -752,7 +752,7 @@ export class SharedQueueConsumer { resumeMessage, message, }); - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 1_000); + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); return; } @@ -763,7 +763,7 @@ export class SharedQueueConsumer { failed, message, }); - await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 1_000); + await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000); return; } } catch (e) { From 2dd5d4675f3984befda259b591757a868195a508 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 17 Sep 2024 22:21:02 +0100 Subject: [PATCH 5/7] =?UTF-8?q?If=20a=20checkpoint=20has=20been=20created,?= =?UTF-8?q?=20the=20coordinator=20won=E2=80=99t=20continue=20the=20run=20w?= =?UTF-8?q?ith=20RESUME=5FAFTER=5FDEPENDENCY=5FWITH=5FACK?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/coordinator/src/index.ts | 28 ++++++++++++++++++++++++ packages/core/src/v3/schemas/messages.ts | 1 + 2 files changed, 29 insertions(+) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index c1932af2c7..cfcb24ba34 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -178,6 +178,22 @@ class TaskCoordinator { }; } + //if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue + if (taskSocket.data.requiresCheckpointResumeWithMessage) { + logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", { + socketData: taskSocket.data, + }); + + return { + success: false, + error: { + name: "CheckpointMessagePresentError", + message: + "Checkpoint message is present, so we need to kill the process and resume from the queue.", + }, + }; + } + await chaosMonkey.call(); // In case the task resumed faster than we could checkpoint @@ -819,6 +835,12 @@ class TaskCoordinator { return; } + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", { + requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage, + }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", attemptFriendlyId: message.attemptFriendlyId, @@ -889,6 +911,12 @@ class TaskCoordinator { return; } + //setting this means we can only resume from a checkpoint + socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; + logger.log("WAIT_FOR_BATCH set requiresCheckpointResumeWithMessage", { + requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage, + }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", attemptFriendlyId: message.attemptFriendlyId, diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index be57331f68..746848640e 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -861,6 +861,7 @@ export const ProdWorkerSocketData = z.object({ podName: z.string(), deploymentId: z.string(), deploymentVersion: z.string(), + requiresCheckpointResumeWithMessage: z.string().optional(), }); export const CoordinatorSocketData = z.object({ From 0efed1427cbcba539473dd2e96b8149c24527fa3 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 18 Sep 2024 09:27:11 +0100 Subject: [PATCH 6/7] =?UTF-8?q?If=20we=E2=80=99re=20keeping=20the=20run=20?= =?UTF-8?q?alive=20then=20set=20socket.data.requiresCheckpointResumeWithMe?= =?UTF-8?q?ssage=20to=20undefined?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/coordinator/src/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index cfcb24ba34..d6efc95765 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -853,6 +853,7 @@ class TaskCoordinator { }); if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId }); return; } @@ -930,6 +931,7 @@ class TaskCoordinator { }); if (ack?.keepRunAlive) { + socket.data.requiresCheckpointResumeWithMessage = undefined; logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId }); return; } From 9dadee235ae38c05bb72cf2588d88cc5e7f6115c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 18 Sep 2024 10:23:56 +0100 Subject: [PATCH 7/7] Log out the data before and after setting socket.data.requiresCheckpointResumeWithMessage --- apps/coordinator/src/index.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index d6efc95765..15cd87ab6d 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -835,10 +835,16 @@ class TaskCoordinator { return; } + logger.log("WAIT_FOR_TASK checkpoint created", { + checkpoint, + socketData: socket.data, + }); + //setting this means we can only resume from a checkpoint socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", { - requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage, + checkpoint, + socketData: socket.data, }); const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { @@ -912,10 +918,16 @@ class TaskCoordinator { return; } + logger.log("WAIT_FOR_BATCH checkpoint created", { + checkpoint, + socketData: socket.data, + }); + //setting this means we can only resume from a checkpoint socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - logger.log("WAIT_FOR_BATCH set requiresCheckpointResumeWithMessage", { - requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage, + logger.log("WAIT_FOR_BATCH set checkpoint", { + checkpoint, + socketData: socket.data, }); const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {