-
Notifications
You must be signed in to change notification settings - Fork 432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RayJob] Fix RayJob status reconciliation #1539
Conversation
@z103cb @kevin85421 could you please review? For the submission id issue upon retry, pointed out in #1480, I suggest we create a separate issue so we can take it from there. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@astefanutti thank you for adding the example. This PR is LGTM
. I have valided that this PR fixes the underlying issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the contribution! Would you mind adding some tests once we achieve the consensus for the code change? Thanks!
@@ -126,15 +131,25 @@ func main() { | |||
LeaderElectionID: "ray-operator-leader", | |||
} | |||
|
|||
selectorsByObject, err := cacheSelectors() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind explaining why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the informer for batch Jobs resources watches all the Jobs, cluster-wide most of the time. This is not needed, as KubeRay is only interested in the Jobs it creates, and it can raise scalability concerns both for KubeRay (size of the informer cache) and the API server / etcd (watch events).
controller-runtime provide that mechanism to narrow the scope of the informer caches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between
cache.MultiNamespacedCacheBuilder(watchNamespaces)
and
cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As controller-runtime currently wires the cache options, using:
cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts)
is the only way to get the SelectorsByObject
option set on the cache when it's instantiated by the manager. That may be changed once this option is exposed at the manager options level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to take a look at what is SelectorsByObject
before I resolve this thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a helpful thread!
Currently the informer for batch Jobs resources watches all the Jobs, cluster-wide most of the time. This is not needed, as KubeRay is only interested in the Jobs it creates, and it can raise scalability concerns both for KubeRay (size of the informer cache) and the API server / etcd (watch events).
controller-runtime provide that mechanism to narrow the scope of the informer caches.
It would be great to include a summary of this as code comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inlined as code comment. PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the informer for batch Jobs resources watches all the Jobs, cluster-wide most of the time.
I want to double-check this statement. Are you saying that the informer still watches K8s Jobs from all namespaces even if we specify only one namespace in watchNamespaces
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant that when the watch-namespace
CLI option is not provided, which I assumed is the most general case, then the informer watches all the Kubernetes Jobs for all namespaces.
}, | ||
} | ||
|
||
// Set the ownership in order to do the garbage collection by k8s. | ||
if err := ctrl.SetControllerReference(rayClusterInstance, job, r.Scheme); err != nil { | ||
if err := ctrl.SetControllerReference(rayJobInstance, job, r.Scheme); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might implement retry logic for RayJob in the future. If RayJob retries, it will spawn a new RayCluster and a new K8s Job. Therefore, the controller for the K8s Job should be the RayCluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will require more thoughts, depending on the evolution of the retry mechanism, and the RayCluster reuse case. Yet at the moment, as a user, I find it surprising that the Job is not cascade-deleted when I delete the RayJob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet at the moment, as a user, I find it surprising that the Job is not cascade-deleted when I delete the RayJob.
Really? It should be deleted. If not, it is a bug. RayJob is the controller reference of the RayCluster, and the RayCluster is the controller reference of the Kubernetes Job. Hence, I expected that the K8s Job should be removed if RayJob is deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in the case the RayJob does not own the RayCluster, that is when it's created with a clusterSelector
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in the case the RayJob does not own the RayCluster, that is when it's created with a clusterSelector field.
Got it. We do not encourage users to use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may deprecate it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can ignore the comment at https://github.com/ray-project/kuberay/pull/1539/files#r1365854531. I hadn't seen your reply when I wrote mine.
Template: submitterTemplate, | ||
// The `ray job submit` command fails when submission is retried with the same job ID. | ||
// Let's disable backoff retry, until job submission retry effectively works. | ||
BackoffLimit: pointer.Int32(0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reduce it, but there will still be cases where the RayCluster doesn't receive the request from the K8s Job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may depending on the evolution of the retry mechanism longer term, but pragmatically, in the short term, I guess it depends on how likely pre-submission failures are compared to (recoverable) post-submission ones. If the former are more likely, then it'd probably be useful to keep retrying. So maybe a middle ground is to reduce the number of retries from the default, six, to something like two or three. Still it feels like it should be possible to re-submit a job, leveraging the override option from the Ray job manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe a middle ground is to reduce the number of retries from the default, six, to something like two or three.
Makes sense.
Still it feels like it should be possible to re-submit a job, leveraging the override option from the Ray job manager.
I cannot get the point. Would you mind explaining it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still it feels like it should be possible to re-submit a job, leveraging the override option from the Ray job manager.
I cannot get the point. Would you mind explaining it?
If the submitter job fails "post-submission", i.e., after the first successful ray job submit
command attempt, any subsequent retry will fail with the Job with submission_id [...] already exists.
error. If the command could be changed to ray job submit --overwrite=true
, then it'll fix the issue.
It seems the Ray Job manager has an overwrite
option: https://github.com/ray-project/ray/blob/56affb7e4b5af8b1da7a322fdc6ecc7e2258d522/dashboard/modules/job/job_manager.py#L919, though it's not currently exposed in the API and CLI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider exposing an overwrite
argument in Ray, but we'll need to think about the design and questions about what happens to the old job. For now I agree with disabling retries.
The default submitter pod template has RestartPolicyNever
, do you happen to know why it was still getting retried before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replied here: #1539 (review)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default submitter pod template has RestartPolicyNever, do you happen to know why it was still getting retried before this PR?
@architkulkarni RestartPolicy
controls the kubelet behaviour when a container exits within a Pod. With RestartPolicyNever
the Pod is failed, while with RestartPolicyAlways
for example, the container is restarted in-place. BackoffLimit
on the hand controls the Job controller behaviour on Pod failure, specifically the maximum number of time a Job Pod should be recreated before failing the Job. So you can have RestartPolicyNever
and still have the ray job submission retried when BackoffLimit
(which defaults to 6) is not zero.
metadata: | ||
name: rayjob-cluster-selector | ||
spec: | ||
clusterSelector: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not encourage users to use clusterSelector
. The better option is to use ray job submit
to submit a ray job to a RayCluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so I guess this sample, updated without clusterSelector
, would not add much value compared to the existing ones, and should be removed. @z103cb you OK with that?
I think for as long the option exists in the operator (i.e is not officially deprecated), we should keep this there. Perhaps adding adding a comment in the sample to the effect "Please don't use this, we will deperecate it in the future" would be OK. @kevin85421 is the encouragement documented somewhere in the operator? I am wondering if I have missed it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reply https://github.com/ray-project/kuberay/pull/1539/files#r1366631875:
We specify the submission ID to prevent the same job from being submitted twice in certain edge cases. Any job submission with a duplicate ID will be rejected.
The Kubernetes Job backoff limit aims to ensure that each Ray job is submitted to the RayCluster exactly once. KubeRay has very limited information about the failure: is the failure related to the accessibility of the Ray head?
- Case 1: If the failure is not related to the accessibility of the Ray head, the possible causes may be:
- Case 1-1 application-level: Users' Ray scripts have some bugs. In this case, we should not submit a new job with different submission ID or overwrite it.
- Case 1-2 system-level: For example, a job fails due to the crash of a Ray worker Pod. Users should handle the retry from the application-level logic.
Your sentences are mostly clear. Here's a slightly refined version:
- Case 2: The Ray head is inaccessible.
- Case 2-1: Ray head becomes inaccessible before receiving the job submission request: This situation might arise from a network issue between the K8s Job and the Ray head. The K8s Job's
backoffLimit
is designed to handle such cases. - Case 2-2: Ray head crashes after receiving the job submission: Since GCS fault tolerance only supports Ray Serve, the job's information will be lost if the Ray head crashes. Consequently, using the same submission ID will not lead to a rejection. This can be relieved by the
backoffLimit
or we may implement a retry to delete RayCluster + Job and restart a new RayCluster + Job. - Case 2-3: The network between Ray head and K8s Job is disconnected after the Ray head begins executing the job: Job submissions with the same submission ID will be rejected to ensure that only one job runs in the cluster at any given time.
- Case 2-1: Ray head becomes inaccessible before receiving the job submission request: This situation might arise from a network issue between the K8s Job and the Ray head. The K8s Job's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @astefanutti @z103cb,
Could you please
(1) Remove the sample YAML: This encourages users to use clusterSelector
, which we discourage and may deprecate in the future. Including it in the YAML could complicate potential deprecation.
(2) Set the BackoffLimit
to non-zero.
I don't want to introduce any new changes related to clusterSelector, such as changing the controller reference or checking the length of ClusterSelector, but the points from you guys also make sense. I am fine with the related code changes, but please remove the sample YAML before the community has a consensus of RayJob in the future.
I will start working on RayJob next week. I hope to begin once this PR is merged to avoid any potential conflicts you might have to resolve. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix! My main question is about the decision to only reconcile if the job is pending or running.
Also, could you say more about the cause of #1478 "When submitting a KubeRay job (ray job CRD) to be executed on a previously provisioned cluster, the ray job status is not set to v1alpha1.JobStatusSucceeded." and how this PR fixes it?
clusterSelector: | ||
ray.io/cluster: long-running-cluster | ||
entrypoint: python /home/ray/samples/sample_code.py | ||
# entrypoint: python /home/ray/samples/fail_fast.py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# entrypoint: python /home/ray/samples/fail_fast.py |
Template: submitterTemplate, | ||
// The `ray job submit` command fails when submission is retried with the same job ID. | ||
// Let's disable backoff retry, until job submission retry effectively works. | ||
BackoffLimit: pointer.Int32(0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider exposing an overwrite
argument in Ray, but we'll need to think about the design and questions about what happens to the old job. For now I agree with disabling retries.
The default submitter pod template has RestartPolicyNever
, do you happen to know why it was still getting retried before this PR?
@@ -126,15 +131,25 @@ func main() { | |||
LeaderElectionID: "ray-operator-leader", | |||
} | |||
|
|||
selectorsByObject, err := cacheSelectors() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a helpful thread!
Currently the informer for batch Jobs resources watches all the Jobs, cluster-wide most of the time. This is not needed, as KubeRay is only interested in the Jobs it creates, and it can raise scalability concerns both for KubeRay (size of the informer cache) and the API server / etcd (watch events).
controller-runtime provide that mechanism to narrow the scope of the informer caches.
It would be great to include a summary of this as code comment.
} | ||
// Otherwise only reconcile the RayJob upon new events for watched resources | ||
// to avoid infinite reconciliation. | ||
return ctrl.Result{}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember there was some previous discussion at #850 saying a controller should always unconditionally reconcile at some frequency (there it was every few minutes)
What are your thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only changes the behaviour for RayJobs in terminal state. To be on the safe side, this keeps the requeueing / polling logic for RayJobs in "RayCluster deployment" phase, as well as for RayJobs in running state.
Generally, it's a desired property from the performance / scalability standpoint to have controllers converging into a stationary state, that reacts to events, rather than entering an infinite loop, with a polling rate as small as RayJobDefaultRequeueDuration
, which defaults to 3s.
For terminal states, I think it's safe to assume that no events are going to be missed, otherwise it's a bug that can be fixed. And to mitigate any discrepancy, the SyncPeriod
manager option could be changed from the default, 10 hours, to something of an order of magnitude lower, even down to a few 10s of minutes.
6316bcd
to
7629061
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The sample has been removed.
The |
@kevin85421 if the apiVersion: batch/v1
kind: Job
spec:
template:
spec:
restartPolicy: Never
containers:
- name: ray-job-submitter
command:
- ray
- job
- submit
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName:
operator: In
values: [<already exists code>] So the Job would not be retry at all in that case we know it's unrecoverable. I can check what are the Ray job CLI return codes if you think that's worth exploring. |
@architkulkarni the cause was that early return statement, removed with 383aaf1, that was preventing the status update from happening, so the Id in the Job status, which is relied on in the remaining logic, was never persisted. |
@astefanutti Thank you for this recommendation! This feature was first introduced in K8s 1.25 as an alpha feature, so this would be a breaking change for KubeRay. We may consider to revisit this after 3 K8s major releases. |
@@ -324,7 +327,14 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) | |||
} | |||
} | |||
} | |||
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil | |||
|
|||
if isJobPendingOrRunning(rayJobInstance.Status.JobStatus) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possibility that JobStatus
is empty at this point, causing it to return ctrl.Result{}, nil
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only path, other than when the RayJob is in a terminal state, is the reconciliation cycle where the Kubernetes Job is created. However, now that this Job is watched, further reconciliation is guaranteed.
That being said, I've added an extra change in f7b639b that initialises the Job status, instead of eventually have it updated once the ray job has started. This is generally a best practice, and it'll make sure periodic reconciliation will be triggered when isJobPendingOrRunning
is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
I agree. Right now the Ray job CLI doesn't return any special error codes (the CLI code just raises a Python exception, which I believe is return code 1), but I think we can update Ray to return a special error code here and the change should be relatively uncontroversial |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! (pending Kai-Hsun's last question about JobStatus being empty)
Ideally, this should be the case, but I believe there's a significant challenge for Ray to return a meaningful exit code at this moment. For example, if the Ray C++ binary exits the process with a non-zero exit code, not all code paths in Ray's Python code catch that exit code. |
That's true for propagating error codes in general, but here we're talking about just returning an error for the specific case of submitting a job_id which has already been submitted (which we already have a check for). We could have the job manager raise a special exception here instead of |
Understood. I hadn't noticed that. I thought that we wanted to use exit codes to categorize all cases as mentioned in #1539 (review). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank @astefanutti for this contribution! I will merge this PR. Would you mind opening a follow-up PR to add tests for this PR?
LGTM |
@kevin85421 Thanks. Sure, I'll work on the tests ASAP. |
This PR adds e2e tests for the operator, following up #1539 (comment). This e2e testing enables to assert Kuberay resources, in a way that cannot be achieved with existing tests (envtest). The e2e tests execution output is post-processed using gotestfmt: The logs and events are gathered and uploaded are part of the workflow run:
This PR adds e2e test following up ray-project#1539 and leverages ray-project#1575. It also factorises configuration across the RayJob e2e tests.
Why are these changes needed?
This PR fixes a number of issues related to the reconciliation of the RayJob status.
It also sets RayJob resources as owners of the Kubernetes Jobs that submit Ray jobs via the CLI.
It also addresses the issue of systematic failure upon retry of the job submission reported in #1480, by disabling submission retry altogether, until a proper solution is found.
The simplest solution would be to expose the job override option to the CLI, though this requires more thoughts and can be addressed in a separate PR:
https://github.com/ray-project/ray/blob/56affb7e4b5af8b1da7a322fdc6ecc7e2258d522/dashboard/modules/job/job_manager.py#L919
Related issue numbers
Fixes #1478.
Fixes #1480.
Checks