Skip to content

Commit

Permalink
fix controller: use Service's TargetPort (#383)
Browse files Browse the repository at this point in the history
if NodePort is "0" for updating RayCluster's `status.endpoints`.

follow up to #341
  • Loading branch information
davidxia authored Jul 25, 2022
1 parent dd0b0a3 commit 8dcc8a7
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 11 deletions.
42 changes: 31 additions & 11 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,34 +782,54 @@ func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster)
}
}

if err := r.updateEndpoints(instance); err != nil {
return err
}

timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(context.Background(), instance); err != nil {
return err
}

return nil
}

func (r *RayClusterReconciler) updateEndpoints(instance *rayiov1alpha1.RayCluster) error {
// TODO: (@scarlet25151) There may be several K8s Services for a RayCluster.
// We assume we can find the right one by filtering Services with appropriate label selectors
// and picking the first one. We may need to select by name in the future if the Service naming is stable.
rayHeadSvc := corev1.ServiceList{}
filterLabels = client.MatchingLabels{
filterLabels := client.MatchingLabels{
common.RayClusterLabelKey: instance.Name,
common.RayNodeTypeLabelKey: "head",
}
// TODO: (@scarlet25151) for now there would be several kubernetes serivces related to
// one raycluster, we have used the label to select the headservice and pick the first one.
// we may need use Get method to select by name.
if err := r.List(context.TODO(), &rayHeadSvc, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
}

if len(rayHeadSvc.Items) != 0 {
svc := rayHeadSvc.Items[0]
if instance.Status.Endpoints == nil {
instance.Status.Endpoints = map[string]string{}
}
for _, port := range svc.Spec.Ports {
if len(port.Name) == 0 {
r.Log.Info("updateStatus", "service port name is empty", port)
r.Log.Info("updateStatus", "service port's name is empty. Not adding it to RayCluster status.endpoints", port)
continue
}
instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.NodePort)
if port.NodePort != 0 {
instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.NodePort)
} else if port.TargetPort.IntVal != 0 {
instance.Status.Endpoints[port.Name] = fmt.Sprintf("%d", port.TargetPort.IntVal)
} else if port.TargetPort.StrVal != "" {
instance.Status.Endpoints[port.Name] = port.TargetPort.StrVal
} else {
r.Log.Info("updateStatus", "service port's targetPort is empty. Not adding it to RayCluster status.endpoints", port)
}
}
}
timeNow := metav1.Now()
instance.Status.LastUpdateTime = &timeNow
if err := r.Status().Update(context.Background(), instance); err != nil {
return err
} else {
r.Log.Info("updateEndpoints", "unable to find a Service for this RayCluster. Not adding RayCluster status.endpoints", instance.Name, "Service selectors", filterLabels)
}

return nil
Expand Down
49 changes: 49 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -56,6 +57,7 @@ var (
testPods []runtime.Object
testRayCluster *rayiov1alpha1.RayCluster
headSelector labels.Selector
testServices []runtime.Object
workerSelector labels.Selector
workersToDelete []string
)
Expand Down Expand Up @@ -298,6 +300,26 @@ func setupTest(t *testing.T) {
},
}

headService, err := common.BuildServiceForHeadPod(*testRayCluster)
if err != nil {
t.Errorf("failed to build head service: %v", err)
}
// K8s automatically sets TargetPort to the same value as Port. So we mimic that behavior here.
for i, port := range headService.Spec.Ports {
headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port}
}
dashboardService, err := common.BuildDashboardService(*testRayCluster)
if err != nil {
t.Errorf("failed to build dashboard service: %v", err)
}
for i, port := range dashboardService.Spec.Ports {
headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port}
}
testServices = []runtime.Object{
headService,
dashboardService,
}

instanceReqValue := []string{instanceName}
instanceReq, err := labels.NewRequirement(
common.RayClusterLabelKey,
Expand Down Expand Up @@ -730,3 +752,30 @@ func TestReconcile_AutoscalerRoleBinding(t *testing.T) {

assert.Nil(t, err, "Fail to get autoscaler RoleBinding after reconciliation")
}

func TestUpdateEndpoints(t *testing.T) {
setupTest(t)
defer tearDown(t)

fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(testServices...).Build()

testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

if err := testRayClusterReconciler.updateEndpoints(testRayCluster); err != nil {
t.Errorf("updateEndpoints failed: %v", err)
}

expected := map[string]string{
"client": "10001",
"dashboard": "8265",
"metrics": "8080",
"redis": "6379",
"serve": "8000",
}
assert.Equal(t, expected, testRayCluster.Status.Endpoints, "RayCluster status endpoints not updated")
}

0 comments on commit 8dcc8a7

Please sign in to comment.