Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator support for openShift #1371

Merged
merged 1 commit into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions helm-chart/kuberay-operator/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,18 @@ rules:
- list
- update
- watch
- apiGroups:
- route.openshift.io
resources:
- routes
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
{{- if .Values.batchScheduler.enabled }}
- apiGroups:
- scheduling.volcano.sh
Expand Down
4 changes: 4 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ env:
# Otherwise, kuberay will use your custom domain
# - name: CLUSTER_DOMAIN
# value: ""
# If not set or set to false, when running on OpenShift with Ingress creation enabled, kuberay will create OpenShift route
# Otherwise, regardless of the type of cluster with Ingress creation enabled, kuberay will create Ingress
# - name: USE_INGRESS_ON_OPENSHIFT
# value: "true"
# Unconditionally requeue after the number of seconds specified in the
# environment variable RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV. If the
# environment variable is not set, requeue after the default value (300).
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ controller-gen: ## Download controller-gen locally if necessary.

KUSTOMIZE = $(shell pwd)/bin/kustomize
kustomize: ## Download kustomize locally if necessary.
test -s $(KUSTOMIZE) || GOBIN=$(KUSTOMIZE)/.. go install sigs.k8s.io/kustomize/kustomize/[email protected]
test -s $(KUSTOMIZE) || GOBIN=$(KUSTOMIZE)/.. go install sigs.k8s.io/kustomize/kustomize/v5@latest

GOFUMPT = $(shell pwd)/bin/gofumpt
gofumpt: ## Download gofumpt locally if necessary.
Expand Down
12 changes: 12 additions & 0 deletions ray-operator/config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,15 @@ rules:
- list
- update
- watch
- apiGroups:
- route.openshift.io
resources:
- routes
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
84 changes: 84 additions & 0 deletions ray-operator/controllers/ray/common/route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package common

import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

routev1 "github.com/openshift/api/route/v1"
rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// BuildRouteForHeadService Builds the Route (OpenShift) for head service dashboard.
// This is used to expose dashboard and remote submit service apis or external traffic.
func BuildRouteForHeadService(cluster rayv1alpha1.RayCluster) (*routev1.Route, error) {
labels := map[string]string{
RayClusterLabelKey: cluster.Name,
RayIDLabelKey: utils.GenerateIdentifier(cluster.Name, rayv1alpha1.HeadNode),
KubernetesApplicationNameLabelKey: ApplicationName,
KubernetesCreatedByLabelKey: ComponentName,
}

// Copy other configurations from cluster annotations to provide a generic way
// for user to customize their route settings.
annotation := map[string]string{}
for key, value := range cluster.Annotations {
annotation[key] = value
}

servicePorts := getServicePorts(cluster)
dashboardPort := DefaultDashboardPort
if port, ok := servicePorts["dashboard"]; ok {
dashboardPort = int(port)
}

weight := int32(100)

serviceName, err := utils.GenerateHeadServiceName("RayCluster", cluster.Spec, cluster.Name)
if err != nil {
return nil, err
}

route := &routev1.Route{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateRouteName(cluster.Name),
Namespace: cluster.Namespace,
Labels: labels,
Annotations: annotation,
},
Spec: routev1.RouteSpec{
To: routev1.RouteTargetReference{
Kind: "Service",
Name: serviceName,
Weight: &weight,
},
Port: &routev1.RoutePort{
TargetPort: intstr.FromInt(dashboardPort),
},
WildcardPolicy: "None",
},
}

return route, nil
}

// BuildRouteForRayService Builds the route for head service dashboard for RayService.
// This is used to expose dashboard for external traffic.
// RayService controller updates the ingress whenever a new RayCluster serves the traffic.
func BuildRouteForRayService(service rayv1alpha1.RayService, cluster rayv1alpha1.RayCluster) (*routev1.Route, error) {
route, err := BuildRouteForHeadService(cluster)
if err != nil {
return nil, err
}

serviceName, err := utils.GenerateHeadServiceName("RayService", cluster.Spec, cluster.Name)
if err != nil {
return nil, err
}
route.ObjectMeta.Name = serviceName
route.ObjectMeta.Namespace = service.Namespace
route.ObjectMeta.Labels[RayServiceLabelKey] = service.Name
route.ObjectMeta.Labels[RayIDLabelKey] = utils.CheckLabel(utils.GenerateIdentifier(service.Name, rayv1alpha1.HeadNode))

return route, nil
}
73 changes: 73 additions & 0 deletions ray-operator/controllers/ray/common/route_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package common

import (
"strings"
"testing"

rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
)

var instanceWithRouteEnabled = &rayv1alpha1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "raycluster-sample",
Namespace: "default",
Annotations: map[string]string{
IngressClassAnnotationKey: "nginx",
},
},
Spec: rayv1alpha1.RayClusterSpec{
RayVersion: "1.0",
HeadGroupSpec: rayv1alpha1.HeadGroupSpec{
Replicas: pointer.Int32Ptr(1),
EnableIngress: pointer.BoolPtr(true),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
Image: "rayproject/autoscaler",
Command: []string{"python"},
Args: []string{"/opt/code.py"},
},
},
},
},
},
},
}

func TestBuildRouteForHeadService(t *testing.T) {
route, err := BuildRouteForHeadService(*instanceWithRouteEnabled)
assert.Nil(t, err)

// Test name
var builder strings.Builder
builder.WriteString(instanceWithIngressEnabled.ObjectMeta.Name)
builder.WriteString("-head-route")
if builder.String() != route.Name {
t.Fatalf("Error generating Route name. Expected `%v` but got `%v`", builder.String(), route.Name)
}
// Test To subject
expectedKind := "Service"
if expectedKind != route.Spec.To.Kind {
t.Fatalf("Error generating Route kind. Expected `%v` but got `%v`", expectedKind, route.Spec.To.Kind)
}
// Test Service name
builder.Reset()
builder.WriteString(instanceWithIngressEnabled.ObjectMeta.Name)
builder.WriteString("-head-svc")
if builder.String() != route.Spec.To.Name {
t.Fatalf("Error generating service name. Expected `%v` but got `%v`", builder.String(), route.Spec.To.Name)
}

// Test Service port
expectedPort := intstr.FromInt(8265)
if route.Spec.Port.TargetPort != expectedPort {
t.Fatalf("Error generating service port. Expected `%v` but got `%v`", expectedPort, route.Spec.Port.TargetPort)
}
}
115 changes: 114 additions & 1 deletion ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
routev1 "github.com/openshift/api/route/v1"
_ "k8s.io/api/apps/v1beta1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand All @@ -47,6 +50,47 @@ var (
podUIDIndexField = "metadata.uid"
)

// getDiscoveryClient returns a discovery client for the current reconciler
func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) {
return discovery.NewDiscoveryClientForConfig(config)
}

// Check where we are running. We are trying to distinguish here whether
// this is vanilla kubernetes cluster or OPenshift
func getClusterType(logger logr.Logger) bool {
if os.Getenv("USE_INGRESS_ON_OPENSHIFT") == "true" {
// Environment is set to treat OpenShift cluster as Vanilla Kubernetes
return false
}

// The discovery package is used to discover APIs supported by a Kubernetes API server.
config, err := ctrl.GetConfig()
if err == nil && config != nil {
dclient, err := getDiscoveryClient(config)
if err == nil && dclient != nil {
apiGroupList, err := dclient.ServerGroups()
if err != nil {
logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes")
return false
} else {
for i := 0; i < len(apiGroupList.Groups); i++ {
if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") {
logger.Info("We detected being on OpenShift!")
return true
}
}
return false
}
} else {
logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes")
return false
}
} else {
logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes")
return false
}
}

// NewReconciler returns a new reconcile.Reconciler
func NewReconciler(mgr manager.Manager) *RayClusterReconciler {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, podUIDIndexField, func(rawObj client.Object) []string {
Expand All @@ -55,13 +99,17 @@ func NewReconciler(mgr manager.Manager) *RayClusterReconciler {
}); err != nil {
panic(err)
}
log := ctrl.Log.WithName("controllers").WithName("RayCluster")
log.Info("Starting Reconciler")
isOpenShift := getClusterType(log)

return &RayClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
Log: log,
Recorder: mgr.GetEventRecorderFor("raycluster-controller"),
BatchSchedulerMgr: batchscheduler.NewSchedulerManager(mgr.GetConfig()),
IsOpenShift: isOpenShift,
}
}

Expand All @@ -74,6 +122,7 @@ type RayClusterReconciler struct {
Scheme *runtime.Scheme
Recorder record.EventRecorder
BatchSchedulerMgr *batchscheduler.SchedulerManager
IsOpenShift bool
}

// Reconcile reads that state of the cluster for a RayCluster object and makes changes based on it
Expand All @@ -90,12 +139,14 @@ type RayClusterReconciler struct {
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingressclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;delete;patch
// +kubebuilder:rbac:groups=route.openshift.io,resources=routes,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=extensions,resources=ingresses,verbs=get;list;watch;create;update;delete;patch
// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;delete
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles,verbs=get;list;watch;create;delete;update
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=rolebindings,verbs=get;list;watch;create;delete

// [WARNING]: There MUST be a newline after kubebuilder markers.

// Reconcile used to bridge the desired state with the current state
func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
Expand Down Expand Up @@ -230,10 +281,55 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(oldStatus rayv1alpha
}

func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1alpha1.RayCluster) error {
r.Log.Info("Reconciling Ingress")
if instance.Spec.HeadGroupSpec.EnableIngress == nil || !*instance.Spec.HeadGroupSpec.EnableIngress {
return nil
}

if r.IsOpenShift {
// This is open shift - create route
return r.reconcileRouteOpenShift(ctx, instance)
} else {
// plain vanilla kubernetes - create ingress
return r.reconcileIngressKubernetes(ctx, instance)
}
}

func (r *RayClusterReconciler) reconcileRouteOpenShift(ctx context.Context, instance *rayv1alpha1.RayCluster) error {
headRoutes := routev1.RouteList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name}
if err := r.List(ctx, &headRoutes, client.InNamespace(instance.Namespace), filterLabels); err != nil {
r.Log.Error(err, "Route Listing error!", "Route.Error", err)
return err
}

if headRoutes.Items != nil && len(headRoutes.Items) == 1 {
r.Log.Info("reconcileIngresses", "head service route found", headRoutes.Items[0].Name)
return nil
}

if headRoutes.Items == nil || len(headRoutes.Items) == 0 {
route, err := common.BuildRouteForHeadService(*instance)
if err != nil {
r.Log.Error(err, "Failed building route!", "Route.Error", err)
return err
}

if err := ctrl.SetControllerReference(instance, route, r.Scheme); err != nil {
return err
}

err = r.createHeadRoute(ctx, route, instance)
if err != nil {
r.Log.Error(err, "Failed creating route!", "Route.Error", err)
return err
}
}

return nil
}

func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, instance *rayv1alpha1.RayCluster) error {
headIngresses := networkingv1.IngressList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name}
if err := r.List(ctx, &headIngresses, client.InNamespace(instance.Namespace), filterLabels); err != nil {
Expand Down Expand Up @@ -596,6 +692,23 @@ func (r *RayClusterReconciler) createHeadIngress(ctx context.Context, ingress *n
return nil
}

func (r *RayClusterReconciler) createHeadRoute(ctx context.Context, route *routev1.Route, instance *rayv1alpha1.RayCluster) error {
// making sure the name is valid
route.Name = utils.CheckName(route.Name)

if err := r.Create(ctx, route); err != nil {
if errors.IsAlreadyExists(err) {
r.Log.Info("Route already exists, no need to create")
return nil
}
r.Log.Error(err, "Route create error!", "Route.Error", err)
return err
}
r.Log.Info("Route created successfully", "route name", route.Name)
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Created", "Created route %s", route.Name)
return nil
}

func (r *RayClusterReconciler) createService(ctx context.Context, raySvc *corev1.Service, instance *rayv1alpha1.RayCluster) error {
// making sure the name is valid
raySvc.Name = utils.CheckName(raySvc.Name)
Expand Down
Loading