Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api server] enable job spec server #416

Merged
merged 10 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apiserver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func startRpcServer(resourceManager *manager.ResourceManager) {
grpc.MaxRecvMsgSize(math.MaxInt32))
api.RegisterClusterServiceServer(s, server.NewClusterServer(resourceManager, &server.ClusterServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterComputeTemplateServiceServer(s, server.NewComputeTemplateServer(resourceManager, &server.ComputeTemplateServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterRayJobServiceServer(s, server.NewRayJobServer(resourceManager, &server.JobServerOptions{CollectMetrics: *collectMetricsFlag}))

// Register reflection service on gRPC server.
reflection.Register(s)
Expand Down Expand Up @@ -92,6 +93,7 @@ func startHttpProxy() {
)
registerHttpHandlerFromEndpoint(api.RegisterClusterServiceHandlerFromEndpoint, "ClusterService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(api.RegisterComputeTemplateServiceHandlerFromEndpoint, "ComputeTemplateService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(api.RegisterRayJobServiceHandlerFromEndpoint, "JobService", ctx, runtimeMux)

// Create a top level mux to include both Http gRPC servers and other endpoints like metrics
topMux := http.NewServeMux()
Expand Down
1 change: 1 addition & 0 deletions apiserver/deploy/base/apiserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ rules:
- ray.io
resources:
- rayclusters
- rayjobs
verbs:
- create
- delete
Expand Down
4 changes: 2 additions & 2 deletions apiserver/pkg/client/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/golang/glog"
"github.com/ray-project/kuberay/apiserver/pkg/util"
rayclusterclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
Expand All @@ -30,6 +30,6 @@ func NewRayClusterClientOrFatal(initConnectionTimeout time.Duration, options uti
cfg.QPS = options.QPS
cfg.Burst = options.Burst

rayClusterClient := rayclusterclient.NewForConfigOrDie(cfg).RayV1alpha1()
rayClusterClient := rayclient.NewForConfigOrDie(cfg).RayV1alpha1()
return &RayClusterClient{client: rayClusterClient}
}
35 changes: 35 additions & 0 deletions apiserver/pkg/client/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package client

import (
"time"

"github.com/golang/glog"
"github.com/ray-project/kuberay/apiserver/pkg/util"
rayclient "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned"
rayiov1alpha1 "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/typed/ray/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

type JobClientInterface interface {
RayJobClient(namespace string) rayiov1alpha1.RayJobInterface
}

type RayJobClient struct {
client rayiov1alpha1.RayV1alpha1Interface
}

func (cc RayJobClient) RayJobClient(namespace string) rayiov1alpha1.RayJobInterface {
return cc.client.RayJobs(namespace)
}

func NewRayJobClientOrFatal(initConnectionTimeout time.Duration, options util.ClientOptions) JobClientInterface {
cfg, err := config.GetConfig()
if err != nil {
glog.Fatalf("Failed to create RayCluster client. Error: %v", err)
}
cfg.QPS = options.QPS
cfg.Burst = options.Burst

rayJobClient := rayclient.NewForConfigOrDie(cfg).RayV1alpha1()
return &RayJobClient{client: rayJobClient}
}
7 changes: 7 additions & 0 deletions apiserver/pkg/manager/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type ClientManagerInterface interface {
ClusterClient() client.ClusterClientInterface
JobClient() client.JobClientInterface
KubernetesClient() client.KubernetesClientInterface
Time() util.TimeInterface
}
Expand All @@ -18,6 +19,7 @@ type ClientManagerInterface interface {
type ClientManager struct {
// Kubernetes clients
clusterClient client.ClusterClientInterface
jobClient client.JobClientInterface
kubernetesClient client.KubernetesClientInterface
// auxiliary tools
time util.TimeInterface
Expand All @@ -27,6 +29,10 @@ func (c *ClientManager) ClusterClient() client.ClusterClientInterface {
return c.clusterClient
}

func (c *ClientManager) JobClient() client.JobClientInterface {
return c.jobClient
}

func (c *ClientManager) KubernetesClient() client.KubernetesClientInterface {
return c.kubernetesClient
}
Expand All @@ -52,6 +58,7 @@ func (c *ClientManager) init() {
// TODO: Potentially, we may need storage layer clients to help persist the data.
// 2. kubernetes client initialization
c.clusterClient = client.NewRayClusterClientOrFatal(initConnectionTimeout, defaultKubernetesClientConfig)
c.jobClient = client.NewRayJobClientOrFatal(initConnectionTimeout, defaultKubernetesClientConfig)
c.kubernetesClient = client.CreateKubernetesCoreOrFatal(initConnectionTimeout, defaultKubernetesClientConfig)

klog.Infof("Client manager initialized successfully")
Expand Down
132 changes: 126 additions & 6 deletions apiserver/pkg/manager/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type ResourceManagerInterface interface {
GetComputeTemplate(ctx context.Context, name string, namespace string) (*v1.ConfigMap, error)
ListComputeTemplates(ctx context.Context, namespace string) ([]*v1.ConfigMap, error)
DeleteComputeTemplate(ctx context.Context, name string, namespace string) error
CreateJob(ctx context.Context, apiJob *api.RayJob) (*v1alpha1.RayJob, error)
GetJob(ctx context.Context, jobName string, namespace string) (*v1alpha1.RayJob, error)
ListJobs(ctx context.Context, namespace string) ([]*v1alpha1.RayJob, error)
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved
ListAllJobs(ctx context.Context) ([]*v1alpha1.RayJob, error)
DeleteJob(ctx context.Context, jobName string, namespace string) error
}

type ResourceManager struct {
Expand All @@ -48,6 +53,10 @@ func (r *ResourceManager) getRayClusterClient(namespace string) rayiov1alpha1.Ra
return r.clientManager.ClusterClient().RayClusterClient(namespace)
}

func (r *ResourceManager) getRayJobClient(namespace string) rayiov1alpha1.RayJobInterface {
return r.clientManager.JobClient().RayJobClient(namespace)
}

func (r *ResourceManager) getKubernetesConfigMapClient(namespace string) clientv1.ConfigMapInterface {
return r.clientManager.KubernetesClient().ConfigMapClient(namespace)
}
Expand All @@ -63,7 +72,7 @@ func (r *ResourceManager) getKubernetesNamespaceClient() clientv1.NamespaceInter
// clusters
func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Cluster) (*v1alpha1.RayCluster, error) {
// populate cluster map
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiCluster)
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiCluster.ClusterSpec, apiCluster.Namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiCluster.Namespace, apiCluster.Name)
}
Expand All @@ -83,22 +92,22 @@ func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Clu
return newRayCluster, nil
}

func (r *ResourceManager) populateComputeTemplate(ctx context.Context, cluster *api.Cluster) (map[string]*api.ComputeTemplate, error) {
func (r *ResourceManager) populateComputeTemplate(ctx context.Context, clusterSpec *api.ClusterSpec, nameSpace string) (map[string]*api.ComputeTemplate, error) {
dict := map[string]*api.ComputeTemplate{}
// populate head compute template
name := cluster.ClusterSpec.HeadGroupSpec.ComputeTemplate
configMap, err := r.GetComputeTemplate(ctx, name, cluster.Namespace)
name := clusterSpec.HeadGroupSpec.ComputeTemplate
configMap, err := r.GetComputeTemplate(ctx, name, nameSpace)
if err != nil {
return nil, err
}
computeTemplate := model.FromKubeToAPIComputeTemplate(configMap)
dict[name] = computeTemplate

// populate worker compute template
for _, spec := range cluster.ClusterSpec.WorkerGroupSpec {
for _, spec := range clusterSpec.WorkerGroupSpec {
name := spec.ComputeTemplate
if _, exist := dict[name]; !exist {
configMap, err := r.GetComputeTemplate(ctx, name, cluster.Namespace)
configMap, err := r.GetComputeTemplate(ctx, name, nameSpace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,6 +190,100 @@ func (r *ResourceManager) DeleteCluster(ctx context.Context, clusterName string,
return nil
}

func (r *ResourceManager) CreateJob(ctx context.Context, apiJob *api.RayJob) (*v1alpha1.RayJob, error) {
computeTemplateMap := make(map[string]*api.ComputeTemplate)
var err error

if apiJob.ClusterSpec != nil && len(apiJob.ClusterSelector) == 0 {
// populate cluster map
computeTemplateMap, err = r.populateComputeTemplate(ctx, apiJob.ClusterSpec, apiJob.Namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiJob.Namespace, apiJob.JobId)
}
}

// convert *api.Cluster to v1alpha1.RayCluster
rayJob := util.NewRayJob(apiJob, computeTemplateMap)
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved

newRayJob, err := r.getRayJobClient(apiJob.Namespace).Create(ctx, rayJob.Get(), metav1.CreateOptions{})
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create a job for (%s/%s)", apiJob.Namespace, apiJob.JobId)
}

return newRayJob, nil
}

func (r *ResourceManager) GetJob(ctx context.Context, jobName string, namespace string) (*v1alpha1.RayJob, error) {
client := r.getRayJobClient(namespace)
return getJobByName(ctx, client, jobName)
}

func (r *ResourceManager) ListJobs(ctx context.Context, namespace string) ([]*v1alpha1.RayJob, error) {
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
util.KubernetesManagedByLabelKey: util.ComponentName,
},
}
rayJobList, err := r.getRayJobClient(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
})
if err != nil {
return nil, util.Wrap(err, fmt.Sprintf("List RayCluster failed in %s", namespace))
}

var result []*v1alpha1.RayJob
length := len(rayJobList.Items)
for i := 0; i < length; i++ {
result = append(result, &rayJobList.Items[i])
}

return result, nil
}

func (r *ResourceManager) ListAllJobs(ctx context.Context) ([]*v1alpha1.RayJob, error) {
namespaces, err := r.getKubernetesNamespaceClient().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, util.Wrap(err, "Failed to fetch all Kubernetes namespaces")
}

var result []*v1alpha1.RayJob
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{
util.KubernetesManagedByLabelKey: util.ComponentName,
},
}
for _, namespace := range namespaces.Items {
rayJobList, err := r.getRayJobClient(namespace.Name).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
})
if err != nil {
return nil, util.Wrap(err, fmt.Sprintf("List RayCluster failed in %s", namespace.Name))
}

length := len(rayJobList.Items)
for i := 0; i < length; i++ {
result = append(result, &rayJobList.Items[i])
}
}
return result, nil
}

func (r *ResourceManager) DeleteJob(ctx context.Context, jobName string, namespace string) error {
client := r.getRayJobClient(namespace)
job, err := getJobByName(ctx, client, jobName)
if err != nil {
return util.Wrap(err, "Get job failure")
}

// Delete Kubernetes resources
if err := client.Delete(ctx, job.Name, metav1.DeleteOptions{}); err != nil {
// API won't need to delete the ray cluster CR
return util.NewInternalServerError(err, "Failed to delete cluster %v.", jobName)
}

return nil
}

// Compute Runtimes
func (r *ResourceManager) CreateComputeTemplate(ctx context.Context, runtime *api.ComputeTemplate) (*v1.ConfigMap, error) {
_, err := r.GetComputeTemplate(ctx, runtime.Name, runtime.Namespace)
Expand Down Expand Up @@ -277,6 +380,23 @@ func getClusterByName(ctx context.Context, client rayiov1alpha1.RayClusterInterf
return cluster, nil
}

// getJobByName returns the Kubernetes RayJob object by given name and client
func getJobByName(ctx context.Context, client rayiov1alpha1.RayJobInterface, name string) (*v1alpha1.RayJob, error) {
job, err := client.Get(ctx, name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil, util.NewNotFoundError(err, "Job %s not found", name)
}

return nil, util.Wrap(err, "Get Job failed")
}
if managedBy, ok := job.Labels[util.KubernetesManagedByLabelKey]; !ok || managedBy != util.ComponentName {
return nil, fmt.Errorf("RayCluster with name %s not managed by %s", name, util.ComponentName)
}

return job, nil
}

// getComputeTemplateByName returns the Kubernetes configmap object by given name and client
func getComputeTemplateByName(ctx context.Context, client clientv1.ConfigMapInterface, name string) (*v1.ConfigMap, error) {
runtime, err := client.Get(ctx, name, metav1.GetOptions{})
Expand Down
60 changes: 57 additions & 3 deletions apiserver/pkg/model/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"strconv"

"github.com/golang/glog"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/ray-project/kuberay/apiserver/pkg/util"
api "github.com/ray-project/kuberay/proto/go_client"
Expand Down Expand Up @@ -31,9 +33,7 @@ func FromCrdToApiCluster(cluster *v1alpha1.RayCluster, events []v1.Event) *api.C
}

// loop container and find the resource
pbCluster.ClusterSpec = &api.ClusterSpec{}
pbCluster.ClusterSpec.HeadGroupSpec = PopulateHeadNodeSpec(cluster.Spec.HeadGroupSpec)
pbCluster.ClusterSpec.WorkerGroupSpec = PopulateWorkerNodeSpec(cluster.Spec.WorkerGroupSpecs)
pbCluster.ClusterSpec = PopulateRayClusterSpec(cluster.Spec)

// parse events
for _, event := range events {
Expand All @@ -58,6 +58,13 @@ func FromCrdToApiCluster(cluster *v1alpha1.RayCluster, events []v1.Event) *api.C
return pbCluster
}

func PopulateRayClusterSpec(spec v1alpha1.RayClusterSpec) *api.ClusterSpec {
clusterSpec := &api.ClusterSpec{}
clusterSpec.HeadGroupSpec = PopulateHeadNodeSpec(spec.HeadGroupSpec)
clusterSpec.WorkerGroupSpec = PopulateWorkerNodeSpec(spec.WorkerGroupSpecs)
return clusterSpec
}

func PopulateHeadNodeSpec(spec v1alpha1.HeadGroupSpec) *api.HeadGroupSpec {
headNodeSpec := &api.HeadGroupSpec{
RayStartParams: spec.RayStartParams,
Expand Down Expand Up @@ -111,3 +118,50 @@ func FromKubeToAPIComputeTemplates(configMaps []*v1.ConfigMap) []*api.ComputeTem
}
return apiComputeTemplates
}

func FromCrdToApiJobs(jobs []*v1alpha1.RayJob) []*api.RayJob {
apiJobs := make([]*api.RayJob, 0)
for _, job := range jobs {
apiJobs = append(apiJobs, FromCrdToApiJob(job))
}
return apiJobs
}

func FromCrdToApiJob(job *v1alpha1.RayJob) (pbJob *api.RayJob) {
defer func() {
err := recover()
if err != nil {
glog.Errorf("failed to transfer ray job, err: %v, item: %v", err, job)
}
}()

var ttl int32 = -1
if job.Spec.TTLSecondsAfterFinished != nil {
ttl = *job.Spec.TTLSecondsAfterFinished
}

var deleteTime int64 = -1
if job.DeletionTimestamp != nil {
deleteTime = job.DeletionTimestamp.Unix()
}

pbJob = &api.RayJob{
Name: job.Name,
Namespace: job.Namespace,
User: job.Labels[util.RayClusterUserLabelKey],
Entrypoint: job.Spec.Entrypoint,
Metadata: job.Spec.Metadata,
RuntimeEnv: job.Spec.RuntimeEnv,
JobId: job.Status.JobId,
ShutdownAfterJobFinishes: job.Spec.ShutdownAfterJobFinishes,
ClusterSelector: job.Spec.ClusterSelector,
ClusterSpec: PopulateRayClusterSpec(job.Spec.RayClusterSpec),
TtlSecondsAfterFinished: ttl,
CreatedAt: &timestamp.Timestamp{Seconds: job.CreationTimestamp.Unix()},
DeleteAt: &timestamp.Timestamp{Seconds: deleteTime},
JobStatus: string(job.Status.JobStatus),
JobDeploymentStatus: string(job.Status.JobDeploymentStatus),
Message: job.Status.Message,
}
return pbJob
}
Loading