Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): implement syncer service APIs #336

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions server/internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -14,7 +18,8 @@ const fieldManager = "job-manager-server"

// ClientFactory is a factory to create a Client.
type ClientFactory interface {
NewClient(clusterID string, token string) (Client, error)
NewClient(clusterID, token string) (Client, error)
NewDynamicClient(clusterID, token string) (DynamicClient, error)
}

// NewClientFactory creates a new ClientFactory.
Expand All @@ -27,17 +32,30 @@ type defaultClientFactory struct {
}

// NewK8sClient creates a new Client.
func (f *defaultClientFactory) NewClient(clusterID string, token string) (Client, error) {
client, err := kubernetes.NewForConfig(&rest.Config{
Host: fmt.Sprintf("%s/sessions/%s", f.endpoint, clusterID),
BearerToken: token,
})
func (f *defaultClientFactory) NewClient(clusterID, token string) (Client, error) {
client, err := kubernetes.NewForConfig(f.getRestConfig(clusterID, token))
if err != nil {
return nil, err
}
return &defaultClient{client: client}, nil
}

// NewDynamicK8sClient creates a new dynamic Client.
func (f *defaultClientFactory) NewDynamicClient(clusterID, token string) (DynamicClient, error) {
client, err := dynamic.NewForConfig(f.getRestConfig(clusterID, token))
if err != nil {
return nil, err
}
return NewDefaultDynamicClient(client), nil
}

func (f *defaultClientFactory) getRestConfig(clusterID, token string) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("%s/sessions/%s", f.endpoint, clusterID),
BearerToken: token,
}
}

// Client is a client to mange worker Kubernetes resources.
type Client interface {
CreateSecret(ctx context.Context, name, namespace string, data map[string][]byte) error
Expand All @@ -63,3 +81,36 @@ func (c *defaultClient) CreateConfigMap(ctx context.Context, name, namespace str
_, err := c.client.CoreV1().ConfigMaps(namespace).Apply(ctx, conf, opts)
return err
}

// DynamicClient is a dynamic client to mange worker Kubernetes resources.
type DynamicClient interface {
PatchResource(ctx context.Context, name, namespace string, gvr schema.GroupVersionResource, data []byte) (*unstructured.Unstructured, error)
DeleteResource(ctx context.Context, name, namespace string, gvr schema.GroupVersionResource) error
}

// NewDefaultDynamicClient creates a new DynamicClient.
func NewDefaultDynamicClient(client dynamic.Interface) DynamicClient {
return &defaultDynamicClient{client: client}
}

type defaultDynamicClient struct {
client dynamic.Interface
}

// PatchResource patches a Kubernetes resources.
func (c *defaultDynamicClient) PatchResource(ctx context.Context, name, namespace string, gvr schema.GroupVersionResource, data []byte) (*unstructured.Unstructured, error) {
dr := c.getResourceInterface(namespace, gvr)
return dr.Patch(ctx, name, types.ApplyPatchType, data, metav1.PatchOptions{FieldManager: fieldManager})
}

func (c *defaultDynamicClient) DeleteResource(ctx context.Context, name, namespace string, gvr schema.GroupVersionResource) error {
dr := c.getResourceInterface(namespace, gvr)
return dr.Delete(ctx, name, metav1.DeleteOptions{})
}

func (c *defaultDynamicClient) getResourceInterface(namespace string, gvr schema.GroupVersionResource) dynamic.ResourceInterface {
if namespace != "" {
return c.client.Resource(gvr).Namespace(namespace)
}
return c.client.Resource(gvr)
}
17 changes: 16 additions & 1 deletion server/internal/server/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)

func TestCreateJob(t *testing.T) {
Expand Down Expand Up @@ -466,6 +468,10 @@ func (f *noopK8sClientFactory) NewClient(clusterID string, token string) (k8s.Cl
return &noopK8sClient{}, nil
}

func (f *noopK8sClientFactory) NewDynamicClient(clusterID, token string) (k8s.DynamicClient, error) {
return &noopDynClient{}, nil
}

type noopK8sClient struct{}

func (c *noopK8sClient) CreateSecret(ctx context.Context, name, namespace string, data map[string][]byte) error {
Expand All @@ -476,9 +482,18 @@ func (c *noopK8sClient) CreateConfigMap(ctx context.Context, name, namespace str
return nil
}

type fakeScheduler struct {
type noopDynClient struct{}

func (c *noopDynClient) PatchResource(ctx context.Context, name, namespace string, gvr schema.GroupVersionResource, data []byte) (*unstructured.Unstructured, error) {
return nil, nil
}

func (c *noopDynClient) DeleteResource(ctx context.Context, name, namespace string, gvr schema.GroupVersionResource) error {
return nil
}

type fakeScheduler struct{}

func (s *fakeScheduler) Schedule(userInfo *auth.UserInfo) (scheduler.SchedulingResult, error) {
if len(userInfo.AssignedKubernetesEnvs) == 0 {
return scheduler.SchedulingResult{}, fmt.Errorf("no kuberentes cluster/namespace")
Expand Down
96 changes: 96 additions & 0 deletions server/internal/server/syncer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ import (
"github.com/go-logr/logr"
v1 "github.com/llmariner/job-manager/api/v1"
"github.com/llmariner/job-manager/server/internal/k8s"
"github.com/llmariner/rbac-manager/pkg/auth"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// NewSyncerServiceServer creates a new syncer service server.
Expand Down Expand Up @@ -55,3 +59,95 @@ func (ss *SS) Run(ctx context.Context, port int) error {
}
return nil
}

// PatchKubernetesObject applies a kubernetes object.
func (ss *SS) PatchKubernetesObject(ctx context.Context, req *v1.PatchKubernetesObjectRequest) (*v1.PatchKubernetesObjectResponse, error) {
if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "name is required")
}
if req.Version == "" {
return nil, status.Errorf(codes.InvalidArgument, "version is required")
}
if req.Resource == "" {
return nil, status.Errorf(codes.InvalidArgument, "resource is required")
}
if len(req.Data) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "data is required")
}

userInfo, ok := auth.ExtractUserInfoFromContext(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about adding the request validation?

if !ok {
return nil, fmt.Errorf("failed to extract user info from context")
}
apikey, err := auth.ExtractTokenFromContext(ctx)
if err != nil {
return nil, err
}

// TODO(aya): Schedule to the cluster where it was created If the resource is not newly created.
sresult, err := ss.scheduler.Schedule(userInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this correct that PatchKubernetesObject is currently called when a new resource is created? My understanding is that we need to find out which cluster a resource is located when the resource is being updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have only tested the creation path currently. Eventually, to support creation and updating in this same function, I plan to store cluster info as a resource annotation in the tenant cluster and use it for updating. I'll leave a TODO comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

if err != nil {
return nil, status.Errorf(codes.Internal, "schedule: %s", err)
}
clusterID := sresult.ClusterID

if sresult.Namespace != req.Namespace {
// TODO(aya): rethink the namespace handling
return nil, status.Errorf(codes.NotFound, "not found the namespace")
}

client, err := ss.k8sClientFactory.NewDynamicClient(clusterID, apikey)
if err != nil {
return nil, status.Errorf(codes.Internal, "create k8s client: %s", err)
}

gvr := schema.GroupVersionResource{
Group: req.Group,
Version: req.Version,
Resource: req.Resource,
}
uobj, err := client.PatchResource(ctx, req.Name, req.Namespace, gvr, req.Data)
if err != nil {
return nil, status.Errorf(codes.Internal, "patch k8s object: %s", err)
}
return &v1.PatchKubernetesObjectResponse{
ClusterId: clusterID,
Uid: string(uobj.GetUID()),
}, nil
}

// DeleteKubernetesObject deletes a kubernetes object.
func (ss *SS) DeleteKubernetesObject(ctx context.Context, req *v1.DeleteKubernetesObjectRequest) (*v1.DeleteKubernetesObjectResponse, error) {
if req.ClusterId == "" {
return nil, status.Errorf(codes.InvalidArgument, "cluster ID is required")
}
if req.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "name is required")
}
if req.Version == "" {
return nil, status.Errorf(codes.InvalidArgument, "version is required")
}
if req.Resource == "" {
return nil, status.Errorf(codes.InvalidArgument, "resource is required")
}

apikey, err := auth.ExtractTokenFromContext(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about adding the request validation (e.g., return an error if req.ClusterId is empty)?

if err != nil {
return nil, err
}

client, err := ss.k8sClientFactory.NewDynamicClient(req.ClusterId, apikey)
if err != nil {
return nil, status.Errorf(codes.Internal, "create k8s client: %s", err)
}

gvr := schema.GroupVersionResource{
Group: req.Group,
Version: req.Version,
Resource: req.Resource,
}
if err := client.DeleteResource(ctx, req.Name, req.Namespace, gvr); err != nil {
return nil, status.Errorf(codes.Internal, "delete k8s object: %s", err)
}
return &v1.DeleteKubernetesObjectResponse{}, nil
}
Loading