Skip to content

Commit

Permalink
Add acking to RESUME_AFTER_DEPENDENCY message to the coordinator (#1313)
Browse files Browse the repository at this point in the history
* Fix for run filtering not working with some special characters (double encoded)

* Add the full dependentTaskAttempt to a ResumeBatchRunService log

* Added RESUME_AFTER_DEPENDENCY_WITH_ACK

* Set the delay to 5s

* If a checkpoint has been created, the coordinator won’t continue the run with RESUME_AFTER_DEPENDENCY_WITH_ACK

* If we’re keeping the run alive then set socket.data.requiresCheckpointResumeWithMessage to undefined

* Log out the data before and after setting socket.data.requiresCheckpointResumeWithMessage
  • Loading branch information
matt-aitken authored Sep 18, 2024
1 parent 74c2076 commit 56a5b58
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 25 deletions.
69 changes: 69 additions & 0 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,49 @@ class TaskCoordinator {

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

if (!taskSocket) {
logger.log("Socket for attempt not found", {
attemptFriendlyId: message.attemptFriendlyId,
});
return {
success: false,
error: {
name: "SocketNotFoundError",
message: "Socket for attempt not found",
},
};
}

//if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue
if (taskSocket.data.requiresCheckpointResumeWithMessage) {
logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", {
socketData: taskSocket.data,
});

return {
success: false,
error: {
name: "CheckpointMessagePresentError",
message:
"Checkpoint message is present, so we need to kill the process and resume from the queue.",
},
};
}

await chaosMonkey.call();

// In case the task resumed faster than we could checkpoint
this.#cancelCheckpoint(message.runId);

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);

return {
success: true,
};
},
RESUME_AFTER_DURATION: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

Expand Down Expand Up @@ -792,6 +835,18 @@ class TaskCoordinator {
return;
}

logger.log("WAIT_FOR_TASK checkpoint created", {
checkpoint,
socketData: socket.data,
});

//setting this means we can only resume from a checkpoint
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", {
checkpoint,
socketData: socket.data,
});

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
attemptFriendlyId: message.attemptFriendlyId,
Expand All @@ -804,6 +859,7 @@ class TaskCoordinator {
});

if (ack?.keepRunAlive) {
socket.data.requiresCheckpointResumeWithMessage = undefined;
logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId });
return;
}
Expand Down Expand Up @@ -862,6 +918,18 @@ class TaskCoordinator {
return;
}

logger.log("WAIT_FOR_BATCH checkpoint created", {
checkpoint,
socketData: socket.data,
});

//setting this means we can only resume from a checkpoint
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
logger.log("WAIT_FOR_BATCH set checkpoint", {
checkpoint,
socketData: socket.data,
});

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
attemptFriendlyId: message.attemptFriendlyId,
Expand All @@ -875,6 +943,7 @@ class TaskCoordinator {
});

if (ack?.keepRunAlive) {
socket.data.requiresCheckpointResumeWithMessage = undefined;
logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId });
return;
}
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/hooks/useSearchParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ export function useSearchParams() {
}

if (typeof value === "string") {
search.set(param, encodeURIComponent(value));
search.set(param, value);
continue;
}

search.delete(param);
for (const v of value) {
search.append(param, encodeURIComponent(v));
search.append(param, v);
}
}
},
Expand Down
43 changes: 35 additions & 8 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -725,20 +725,47 @@ export class SharedQueueConsumer {
}

try {
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
});

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
const resumeMessage = {
version: "v1" as const,
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
attemptFriendlyId: resumableAttempt.friendlyId,
completions,
executions,
};

logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
const responses = await socketIo.coordinatorNamespace
.timeout(10_000)
.emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage);

logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", {
resumeMessage,
responses,
message,
});

if (responses.length === 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", {
resumeMessage,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}

const failed = responses.filter((response) => !response.success);
if (failed.length > 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
resumeMessage,
failed,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}
} catch (e) {
if (e instanceof Error) {
this._currentSpan?.recordException(e);
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/resumeBatchRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ export class ResumeBatchRunService extends BaseService {
if (wasUpdated) {
logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", {
batchRunId: batchRun.id,
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
dependentTaskAttempt: batchRun.dependentTaskAttempt,
checkpointEventId: batchRun.checkpointEventId,
hasCheckpointEvent: !!batchRun.checkpointEventId,
});
await marqs?.replaceMessage(dependentRun.id, {
type: "RESUME",
Expand Down
43 changes: 29 additions & 14 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@ import {
WaitReason,
} from "./schemas.js";

const ackCallbackResult = z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]);

export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [
z.object({
type: z.literal("CANCEL_ATTEMPT"),
Expand Down Expand Up @@ -269,20 +284,7 @@ export const PlatformToProviderMessages = {
projectId: z.string(),
deploymentId: z.string(),
}),
callback: z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]),
callback: ackCallbackResult,
},
RESTORE: {
message: z.object({
Expand Down Expand Up @@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = {
};

export const PlatformToCoordinatorMessages = {
/** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */
RESUME_AFTER_DEPENDENCY: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand All @@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = {
executions: TaskRunExecution.array(),
}),
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: {
message: z.object({
version: z.literal("v1").default("v1"),
runId: z.string(),
attemptId: z.string(),
attemptFriendlyId: z.string(),
completions: TaskRunExecutionResult.array(),
executions: TaskRunExecution.array(),
}),
callback: ackCallbackResult,
},
RESUME_AFTER_DURATION: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand Down Expand Up @@ -847,6 +861,7 @@ export const ProdWorkerSocketData = z.object({
podName: z.string(),
deploymentId: z.string(),
deploymentVersion: z.string(),
requiresCheckpointResumeWithMessage: z.string().optional(),
});

export const CoordinatorSocketData = z.object({
Expand Down

0 comments on commit 56a5b58

Please sign in to comment.