-
-
Notifications
You must be signed in to change notification settings - Fork 623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add acking to RESUME_AFTER_DEPENDENCY message to the coordinator #1313
Conversation
|
WalkthroughThe pull request introduces a new event handler, Changes
Sequence Diagram(s)sequenceDiagram
participant A as TaskCoordinator
participant B as TaskSocket
A->>B: Receive RESUME_AFTER_DEPENDENCY_WITH_ACK
B->>A: Retrieve task socket
A->>A: Check requiresCheckpointResumeWithMessage
A->>A: Cancel checkpoint if needed
A->>B: Emit RESUME_AFTER_DEPENDENCY
A->>B: Send success/error response
Recent review detailsConfiguration used: CodeRabbit UI Files selected for processing (1)
Additional comments not posted (2)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- apps/coordinator/src/index.ts (1 hunks)
- apps/webapp/app/hooks/useSearchParam.ts (1 hunks)
- apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1 hunks)
- apps/webapp/app/v3/services/resumeBatchRun.server.ts (1 hunks)
- packages/core/src/v3/schemas/messages.ts (4 hunks)
Additional comments not posted (6)
apps/webapp/app/v3/services/resumeBatchRun.server.ts (1)
135-137
: Improved logging for better traceability.The addition of
dependentTaskAttempt
,checkpointEventId
, andhasCheckpointEvent
to the logging statement enhances the traceability of the service's operations. This extra context can be valuable for debugging and monitoring the behavior of the service during the resumption of dependent runs.The boolean flag
hasCheckpointEvent
provides a clear indication of the checkpoint event's existence, making the logs more readable and informative.packages/core/src/v3/schemas/messages.ts (3)
18-31
: LGTM!The
ackCallbackResult
discriminated union is a good choice for representing the callback result. It allows for type narrowing based on thesuccess
property and has a well-defined error object structure.
287-287
: Good refactor!Using the
ackCallbackResult
type for the callback field improves code reusability and maintainability. It ensures consistency in the callback structure across different message types.
520-530
: Looks good!The new
RESUME_AFTER_DEPENDENCY_WITH_ACK
message type enhances reliability by adding acknowledgment to the existingRESUME_AFTER_DEPENDENCY
message. The message structure is consistent with other message types.apps/coordinator/src/index.ts (1)
165-191
: Approve the addition of theRESUME_AFTER_DEPENDENCY_WITH_ACK
event handler.The new event handler enhances the task coordination logic by providing a robust mechanism for acknowledging the resumption of tasks after dependencies are resolved. Key aspects include:
- Retrieving the task socket using
attemptFriendlyId
to ensure the task can be resumed.- Comprehensive error handling and logging to provide visibility into any issues during the resumption process.
- Invoking
chaosMonkey.call()
to introduce controlled disruption or testing, helping identify potential issues or edge cases.- Canceling any ongoing checkpoint associated with the task to ensure a clean resumption without interference from previous state.
- Emitting a
RESUME_AFTER_DEPENDENCY
event through the task socket to signal the task to resume execution.- Returning a success response to indicate the successful initiation of the resumption process.
Overall, these changes enhance the reliability and observability of the task resumption process.
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (1)
728-768
: Introduce robust task resumption mechanism with coordinator acknowledgmentThe code changes introduce a reliable mechanism for resuming tasks after dependency resolution. Here's how it works:
- The
resumeMessage
object is constructed with relevant information such asrunId
,attemptId
,attemptFriendlyId
,completions
, andexecutions
.- The
RESUME_AFTER_DEPENDENCY_WITH_ACK
event is broadcasted to the coordinator namespace usingemitWithAck
, passing theresumeMessage
. This notifies the coordinators about the dependency resolution and waits for their acknowledgment.- The responses from the coordinators are logged for monitoring and debugging purposes.
- If no responses are received within the 10-second timeout or any response indicates failure, an error is logged, and the message is nacked with a delay of 5 seconds using
#nackAndDoMoreWork
. This allows for retrying the operation after a short interval.The error handling and retry mechanism ensure that the consumer can handle scenarios where the coordinators are unresponsive or encounter failures.
Consider adding more detailed logging statements to capture the specific details of the failures or timeouts for easier debugging. For example, you could log the error messages or status codes received from the coordinators.
commit: |
…run with RESUME_AFTER_DEPENDENCY_WITH_ACK
…tResumeWithMessage to undefined
…intResumeWithMessage
We were just broadcasting
RESUME_AFTER_DEPENDENCY
to the coordinator before. If the coordinator crashed at the wrong time.I've added a new message called
RESUME_AFTER_DEPENDENCY_WITH_ACK
which requires an ack to be sent back. If it's not sent back then we nack the message in the queue with a 5 second delay.I've verified locally that this continues runs in normal situations, and if the coordinator is down.
Summary by CodeRabbit
New Features
Improvements