Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acking to RESUME_AFTER_DEPENDENCY message to the coordinator #1313

Merged
merged 7 commits into from
Sep 18, 2024
27 changes: 27 additions & 0 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,33 @@ class TaskCoordinator {

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

if (!taskSocket) {
logger.log("Socket for attempt not found", {
attemptFriendlyId: message.attemptFriendlyId,
});
return {
success: false,
error: {
name: "SocketNotFoundError",
message: "Socket for attempt not found",
},
};
}

await chaosMonkey.call();

// In case the task resumed faster than we could checkpoint
this.#cancelCheckpoint(message.runId);

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);

return {
success: true,
};
},
RESUME_AFTER_DURATION: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/hooks/useSearchParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ export function useSearchParams() {
}

if (typeof value === "string") {
search.set(param, encodeURIComponent(value));
search.set(param, value);
matt-aitken marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

search.delete(param);
for (const v of value) {
search.append(param, encodeURIComponent(v));
search.append(param, v);
matt-aitken marked this conversation as resolved.
Show resolved Hide resolved
matt-aitken marked this conversation as resolved.
Show resolved Hide resolved
}
}
},
Expand Down
43 changes: 35 additions & 8 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -725,20 +725,47 @@ export class SharedQueueConsumer {
}

try {
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
});

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
const resumeMessage = {
version: "v1" as const,
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
attemptFriendlyId: resumableAttempt.friendlyId,
completions,
executions,
};

logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
const responses = await socketIo.coordinatorNamespace
.timeout(10_000)
.emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage);

logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", {
resumeMessage,
responses,
message,
});

if (responses.length === 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", {
resumeMessage,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}

const failed = responses.filter((response) => !response.success);
if (failed.length > 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
resumeMessage,
failed,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}
} catch (e) {
if (e instanceof Error) {
this._currentSpan?.recordException(e);
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/resumeBatchRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ export class ResumeBatchRunService extends BaseService {
if (wasUpdated) {
logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", {
batchRunId: batchRun.id,
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
dependentTaskAttempt: batchRun.dependentTaskAttempt,
checkpointEventId: batchRun.checkpointEventId,
hasCheckpointEvent: !!batchRun.checkpointEventId,
});
await marqs?.replaceMessage(dependentRun.id, {
type: "RESUME",
Expand Down
42 changes: 28 additions & 14 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@ import {
WaitReason,
} from "./schemas.js";

const ackCallbackResult = z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]);

export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [
z.object({
type: z.literal("CANCEL_ATTEMPT"),
Expand Down Expand Up @@ -269,20 +284,7 @@ export const PlatformToProviderMessages = {
projectId: z.string(),
deploymentId: z.string(),
}),
callback: z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]),
callback: ackCallbackResult,
},
RESTORE: {
message: z.object({
Expand Down Expand Up @@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = {
};

export const PlatformToCoordinatorMessages = {
/** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */
matt-aitken marked this conversation as resolved.
Show resolved Hide resolved
RESUME_AFTER_DEPENDENCY: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand All @@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = {
executions: TaskRunExecution.array(),
}),
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: {
message: z.object({
version: z.literal("v1").default("v1"),
runId: z.string(),
attemptId: z.string(),
attemptFriendlyId: z.string(),
completions: TaskRunExecutionResult.array(),
executions: TaskRunExecution.array(),
}),
callback: ackCallbackResult,
},
RESUME_AFTER_DURATION: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand Down
Loading