diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 7a34a3fa93..310fef81f3 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -782,17 +782,32 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster) } } + if err := r.updateEndpoints(instance); err != nil { + return err + } + + timeNow := metav1.Now() + instance.Status.LastUpdateTime = &timeNow + if err := r.Status().Update(context.Background(), instance); err != nil { + return err + } + + return nil +} + +func (r *RayClusterReconciler) updateEndpoints(instance *rayiov1alpha1.RayCluster) error { + // TODO: (@scarlet25151) There may be several K8s Services for a RayCluster. + // We assume we can find the right one by filtering Services with appropriate label selectors + // and picking the first one. We may need to select by name in the future if the Service naming is stable. rayHeadSvc := corev1.ServiceList{} - filterLabels = client.MatchingLabels{ + filterLabels := client.MatchingLabels{ common.RayClusterLabelKey: instance.Name, common.RayNodeTypeLabelKey: "head", } - // TODO: (@scarlet25151) for now there would be several kubernetes serivces related to - // one raycluster, we have used the label to select the headservice and pick the first one. - // we may need use Get method to select by name. if err := r.List(context.TODO(), &rayHeadSvc, client.InNamespace(instance.Namespace), filterLabels); err != nil { return err } + if len(rayHeadSvc.Items) != 0 { svc := rayHeadSvc.Items[0] if instance.Status.Endpoints == nil { @@ -800,16 +815,21 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster) } for _, port := range svc.Spec.Ports { if len(port.Name) == 0 { - r.Log.Info("updateStatus", "service port name is empty", port) + r.Log.Info("updateStatus", "service port's name is empty. Not adding it to RayCluster status.endpoints", port) continue } - instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.NodePort) + if port.NodePort != 0 { + instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.NodePort) + } else if port.TargetPort.IntVal != 0 { + instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.TargetPort.IntVal) + } else if port.TargetPort.StrVal != "" { + instance.Status.Endpoints[port.Name] = port.TargetPort.StrVal + } else { + r.Log.Info("updateStatus", "service port's targetPort is empty. Not adding it to RayCluster status.endpoints", port) + } } - } - timeNow := metav1.Now() - instance.Status.LastUpdateTime = &timeNow - if err := r.Status().Update(context.Background(), instance); err != nil { - return err + } else { + r.Log.Info("updateEndpoints", "unable to find a Service for this RayCluster. Not adding RayCluster status.endpoints", instance.Name, "Service selectors", filterLabels) } return nil diff --git a/ray-operator/controllers/ray/raycluster_controller_fake_test.go b/ray-operator/controllers/ray/raycluster_controller_fake_test.go index 37bf792ce4..a749e3dd57 100644 --- a/ray-operator/controllers/ray/raycluster_controller_fake_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_fake_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -56,6 +57,7 @@ var ( testPods []runtime.Object testRayCluster *rayiov1alpha1.RayCluster headSelector labels.Selector + testServices []runtime.Object workerSelector labels.Selector workersToDelete []string ) @@ -298,6 +300,26 @@ func setupTest(t *testing.T) { }, } + headService, err := common.BuildServiceForHeadPod(*testRayCluster) + if err != nil { + t.Errorf("failed to build head service: %v", err) + } + // K8s automatically sets TargetPort to the same value as Port. So we mimic that behavior here. + for i, port := range headService.Spec.Ports { + headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port} + } + dashboardService, err := common.BuildDashboardService(*testRayCluster) + if err != nil { + t.Errorf("failed to build dashboard service: %v", err) + } + for i, port := range dashboardService.Spec.Ports { + headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port} + } + testServices = []runtime.Object{ + headService, + dashboardService, + } + instanceReqValue := []string{instanceName} instanceReq, err := labels.NewRequirement( common.RayClusterLabelKey, @@ -730,3 +752,30 @@ func TestReconcile_AutoscalerRoleBinding(t *testing.T) { assert.Nil(t, err, "Fail to get autoscaler RoleBinding after reconciliation") } + +func TestUpdateEndpoints(t *testing.T) { + setupTest(t) + defer tearDown(t) + + fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(testServices...).Build() + + testRayClusterReconciler := &RayClusterReconciler{ + Client: fakeClient, + Recorder: &record.FakeRecorder{}, + Scheme: scheme.Scheme, + Log: ctrl.Log.WithName("controllers").WithName("RayCluster"), + } + + if err := testRayClusterReconciler.updateEndpoints(testRayCluster); err != nil { + t.Errorf("updateEndpoints failed: %v", err) + } + + expected := map[string]string{ + "client": "10001", + "dashboard": "8265", + "metrics": "8080", + "redis": "6379", + "serve": "8000", + } + assert.Equal(t, expected, testRayCluster.Status.Endpoints, "RayCluster status endpoints not updated") +}