Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Hotfix] Add observedGeneration to the status of CRDs #979

Merged
merged 6 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11277,6 +11277,11 @@ spec:
each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
10 changes: 10 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11772,6 +11772,11 @@ spec:
type: string
message:
type: string
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayJob.
format: int64
type: integer
rayClusterName:
type: string
rayClusterStatus:
Expand Down Expand Up @@ -11816,6 +11821,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
15 changes: 15 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11872,6 +11872,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down Expand Up @@ -11905,6 +11910,11 @@ spec:
type: object
type: array
type: object
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayService.
format: int64
type: integer
pendingServiceStatus:
description: Pending Service Status indicates a RayCluster will be
created or is being created.
Expand Down Expand Up @@ -11983,6 +11993,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1alpha1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ type RayClusterStatus struct {
Head HeadInfo `json:"head,omitempty"`
// Reason provides more information about current State
Reason string `json:"reason,omitempty"`
// observedGeneration is the most recent generation observed for this RayCluster. It corresponds to the
// RayCluster's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// HeadInfo gives info about head
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type RayJobStatus struct {
// Represents time when the job was ended.
EndTime *metav1.Time `json:"endTime,omitempty"`
RayClusterStatus RayClusterStatus `json:"rayClusterStatus,omitempty"`
// observedGeneration is the most recent generation observed for this RayJob. It corresponds to the
// RayJob's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ type RayServiceStatuses struct {
PendingServiceStatus RayServiceStatus `json:"pendingServiceStatus,omitempty"`
// ServiceStatus indicates the current RayService status.
ServiceStatus ServiceStatus `json:"serviceStatus,omitempty"`
// observedGeneration is the most recent generation observed for this RayService. It corresponds to the
// RayService's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

type RayServiceStatus struct {
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11277,6 +11277,11 @@ spec:
each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
10 changes: 10 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11772,6 +11772,11 @@ spec:
type: string
message:
type: string
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayJob.
format: int64
type: integer
rayClusterName:
type: string
rayClusterStatus:
Expand Down Expand Up @@ -11816,6 +11821,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
15 changes: 15 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11872,6 +11872,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down Expand Up @@ -11905,6 +11910,11 @@ spec:
type: object
type: array
type: object
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayService.
format: int64
type: integer
pendingServiceStatus:
description: Pending Service Status indicates a RayCluster will be
created or is being created.
Expand Down Expand Up @@ -11983,6 +11993,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down
29 changes: 8 additions & 21 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand All @@ -25,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -828,31 +827,19 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
}

func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster) error {
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
instance.Status.ObservedGeneration = instance.ObjectMeta.Generation

runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name}
if err := r.List(context.TODO(), &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
}

count := utils.CalculateAvailableReplicas(runtimePods)
if instance.Status.AvailableWorkerReplicas != count {
instance.Status.AvailableWorkerReplicas = count
}

count = utils.CalculateDesiredReplicas(instance)
if instance.Status.DesiredWorkerReplicas != count {
instance.Status.DesiredWorkerReplicas = count
}

count = utils.CalculateMinReplicas(instance)
if instance.Status.MinWorkerReplicas != count {
instance.Status.MinWorkerReplicas = count
}

count = utils.CalculateMaxReplicas(instance)
if instance.Status.MaxWorkerReplicas != count {
instance.Status.MaxWorkerReplicas = count
}
instance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
instance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(instance)
instance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(instance)
instance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(instance)

// validation for the RayStartParam for the state.
isValid, err := common.ValidateHeadRayStartParams(instance.Spec.HeadGroupSpec)
Expand Down
55 changes: 55 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,3 +966,58 @@ func TestGetHeadServiceIP(t *testing.T) {
})
}
}

func TestUpdateStatusObservedGeneration(t *testing.T) {
setupTest(t)
defer tearDown(t)

// Create a new scheme with CRDs, Pod, Service schemes.
newScheme := runtime.NewScheme()
_ = rayiov1alpha1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

// To update the status of RayCluster with `r.Status().Update()`,
// initialize the runtimeObjects with appropriate context. In KubeRay, the `ClusterIP`
// and `TargetPort` fields are typically set by the cluster's control plane.
headService, err := common.BuildServiceForHeadPod(*testRayCluster, nil, nil)
assert.Nil(t, err, "Failed to build head service.")
headService.Spec.ClusterIP = headNodeIP
for i, port := range headService.Spec.Ports {
headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port}
}
runtimeObjects := append(testPods, headService, testRayCluster)

// To facilitate testing, we set an impossible value for ObservedGeneration.
// Note that ObjectMeta's `Generation` and `ResourceVersion` don't behave properly in the fake client.
// [Ref] https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/client/fake
testRayCluster.Status.ObservedGeneration = -1

// Initialize a fake client with newScheme and runtimeObjects.
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()

// Verify the initial values of `Generation` and `ObservedGeneration`.
namespacedName := types.NamespacedName{
Name: instanceName,
Namespace: namespaceStr,
}
cluster := rayiov1alpha1.RayCluster{}
err = fakeClient.Get(context.Background(), namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster")
assert.Equal(t, int64(-1), cluster.Status.ObservedGeneration)
assert.Equal(t, int64(0), cluster.ObjectMeta.Generation)

// Initialize RayCluster reconciler.
testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

// Compare the values of `Generation` and `ObservedGeneration` to check if they match.
err = testRayClusterReconciler.updateStatus(testRayCluster)
assert.Nil(t, err)
err = fakeClient.Get(context.Background(), namespacedName, &cluster)
assert.Nil(t, err)
assert.Equal(t, cluster.ObjectMeta.Generation, cluster.Status.ObservedGeneration)
}
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1alpha1.
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
}

// TODO (kevin85421): ObservedGeneration should be used to determine whether update this CR or not.
rayJob.Status.ObservedGeneration = rayJob.ObjectMeta.Generation

if errStatus := r.Status().Update(ctx, rayJob); errStatus != nil {
return fmtErrors.Errorf("combined error: %v %v", err, errStatus)
}
Expand Down
29 changes: 12 additions & 17 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (

var _ = Context("Inside the default namespace", func() {
ctx := context.TODO()
var workerPods corev1.PodList

myRayJob := &rayiov1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-test",
Expand Down Expand Up @@ -176,19 +174,6 @@ var _ = Context("Inside the default namespace", func() {
},
}

myRayCluster := &rayiov1alpha1.RayCluster{}

myRayJobWithClusterSelector := &rayiov1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-test-2",
Namespace: "default",
},
Spec: rayiov1alpha1.RayJobSpec{
Entrypoint: "sleep 999",
ClusterSelector: map[string]string{},
},
}

Describe("When creating a rayjob", func() {
It("should create a rayjob object", func() {
err := k8sClient.Create(ctx, myRayJob)
Expand All @@ -205,14 +190,15 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getRayClusterNameForRayJob(ctx, myRayJob),
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName)

myRayCluster := &rayiov1alpha1.RayCluster{}
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Status.RayClusterName, Namespace: "default"}, myRayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name)
})

It("should create more than 1 worker", func() {
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: myRayJob.Status.RayClusterName, common.RayNodeGroupLabelKey: "small-group"}
workerPods := corev1.PodList{}
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items))
Expand All @@ -231,7 +217,16 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getRayClusterNameForRayJob(ctx, myRayJob),
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName)

myRayJobWithClusterSelector := &rayiov1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-test-2",
Namespace: "default",
},
Spec: rayiov1alpha1.RayJobSpec{
Entrypoint: "sleep 999",
ClusterSelector: map[string]string{},
},
}
myRayJobWithClusterSelector.Spec.ClusterSelector[RayJobDefaultClusterSelectorKey] = myRayJob.Status.RayClusterName

err := k8sClient.Create(ctx, myRayJobWithClusterSelector)
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
}
r.cleanUpServeConfigCache(rayServiceInstance)

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
rayServiceInstance.Status.ObservedGeneration = rayServiceInstance.ObjectMeta.Generation

logger.Info("Reconciling the cluster component.")
// Find active and pending ray cluster objects given current service name.
var activeRayClusterInstance *rayv1alpha1.RayCluster
Expand Down
6 changes: 2 additions & 4 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ var _ = Context("Inside the default namespace", func() {
ctx := context.TODO()
var workerPods corev1.PodList

var numReplicas int32
var numCpus float64
numReplicas = 1
numCpus = 0.1
var numReplicas int32 = 1
var numCpus float64 = 0.1

myRayService := &rayiov1alpha1.RayService{
ObjectMeta: metav1.ObjectMeta{
Expand Down