Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): add syncer service server #333

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deployments/server/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ data:
httpPort: {{ .Values.httpPort }}
grpcPort: {{ .Values.grpcPort }}
workerServiceGrpcPort: {{ .Values.workerServiceGrpcPort }}
syncerServiceGrpcPort: {{ .Values.syncerServiceGrpcPort }}
fileManagerServerAddr: {{ .Values.fileManagerServerAddr }}
modelManagerServerAddr: {{ .Values.modelManagerServerAddr }}
sessionManagerServerEndpoint: {{ .Values.sessionManagerServerEndpoint }}
Expand Down
3 changes: 3 additions & 0 deletions deployments/server/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ spec:
- name: ws-grpc
containerPort: {{ .Values.workerServiceGrpcPort }}
protocol: TCP
- name: syncer-grpc
containerPort: {{ .Values.syncerServiceGrpcPort }}
protocol: TCP
volumeMounts:
- name: config
mountPath: /etc/config
Expand Down
35 changes: 33 additions & 2 deletions deployments/server/templates/ingress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ spec:
port:
number: {{ .Values.httpPort }}

---

{{- if .Values.global.workerServiceIngress.create -}}
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
Expand Down Expand Up @@ -115,3 +114,35 @@ spec:
port:
number: {{ .Values.workerServiceGrpcPort }}
{{- end -}}

{{- if .Values.syncerServiceIngress.create -}}
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ include "job-manager-server.fullname" . }}-syncer-service-grpc
labels:
{{- include "job-manager-server.labels" . | nindent 4 }}
annotations:
{{- toYaml .Values.syncerServiceIngress.annotations | nindent 4 }}
spec:
ingressClassName: {{ .Values.global.ingress.ingressClassName }}
{{- with .Values.global.ingress.tls }}
tls:
- hosts:
{{- toYaml .hosts | nindent 6 }}
{{- if .secretName }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
rules:
- http:
paths:
- path: /llmariner.syncer.server.v1.SyncerService
pathType: Prefix
backend:
service:
name: {{ include "job-manager-server.fullname" . }}-syncer-service-grpc
port:
number: {{ .Values.syncerServiceGrpcPort }}
{{- end -}}
20 changes: 20 additions & 0 deletions deployments/server/templates/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,23 @@ spec:
targetPort: ws-grpc
selector:
{{- include "job-manager-server.selectorLabels" . | nindent 4 }}

---

apiVersion: v1
kind: Service
metadata:
name: {{ include "job-manager-server.fullname" . }}-syncer-service-grpc
labels:
{{- include "job-manager-server.labels" . | nindent 4 }}
annotations:
{{- toYaml .Values.syncerServiceGrpcService.annotations | nindent 4 }}
spec:
type: ClusterIP
ports:
- name: syncer-grpc
port: {{ .Values.syncerServiceGrpcPort }}
protocol: TCP
targetPort: syncer-grpc
selector:
{{- include "job-manager-server.selectorLabels" . | nindent 4 }}
2 changes: 1 addition & 1 deletion deployments/server/values.schema.json

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions deployments/server/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ grpcPort: 8081
# The GRPC port number for the worker service.
# +docs:type=number
workerServiceGrpcPort: 8082
# The GRPC port number for the syncer service.
# +docs:type=number
syncerServiceGrpcPort: 8083

# The address of the file-manager-server to call public file APIs.
fileManagerServerAddr: file-manager-server-grpc:8081
Expand All @@ -124,6 +127,22 @@ modelManagerServerAddr: model-manager-server-grpc:8081
# The endpoint of the session-manager-server to call the Kubernetes APIs of the worker.
sessionManagerServerEndpoint: http://session-manager-server-http:8080/v1

syncerServiceGrpcService:
# Optional additional annotations to add to Service of the
# job-manager-server syncer service.
annotations: {}

# Specify the syncer Ingress configuration for syncer components to
# connect to the job-manager-server's syncer service. This is only
# necessary when installing LLMariner in a multi-cluster mode.
# For more information, see [Install across Multiple Clusters](https://llmariner.ai/docs/setup/install/multi_cluster_production/).
syncerServiceIngress:
# Specify whether to create an Ingress.
create: false
# Optional additional annotations to add to the syncer Ingress.
# +docs:property
# annotations: {}

# TODO(aya): build own image and think the way to switch the driver version

# Specify the Jupyter Notebook settings.
Expand Down
9 changes: 9 additions & 0 deletions hack/values-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ cluster-manager-server:

job-manager-server:
enable: true

syncerServiceGrpcService:
annotations:
konghq.com/protocol: grpc
syncerServiceIngress:
create: true
annotations:
konghq.com/protocols: grpc,grpcs

version: latest
image:
repository: llmariner/job-manager-server
Expand Down
8 changes: 7 additions & 1 deletion server/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ func run(ctx context.Context, c *config.Config) error {
usageSetter = sender.NoopUsageSetter{}
}

sched := scheduler.New(st, logger.WithName("scheduler"))

go func() {
sched := scheduler.New(st, logger.WithName("scheduler"))
s := server.New(
st,
fclient,
Expand All @@ -149,5 +150,10 @@ func run(ctx context.Context, c *config.Config) error {
errCh <- s.Run(ctx, c.WorkerServiceGRPCPort, c.AuthConfig)
}()

go func() {
s := server.NewSyncerServiceServer(logger, k8sClientFactory, sched)
errCh <- s.Run(ctx, c.SyncerServiceGRPCPort)
}()

return <-errCh
}
1 change: 1 addition & 0 deletions server/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type Config struct {
GRPCPort int `yaml:"grpcPort"`
WorkerServiceGRPCPort int `yaml:"workerServiceGrpcPort"`
SyncerServiceGRPCPort int `yaml:"syncerServiceGrpcPort"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: add validation for SyncerServiceGRPCPort?

HTTPPort int `yaml:"httpPort"`

FileManagerServerAddr string `yaml:"fileManagerServerAddr"`
Expand Down
57 changes: 57 additions & 0 deletions server/internal/server/syncer_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package server

import (
"context"
"fmt"
"net"

"github.com/go-logr/logr"
v1 "github.com/llmariner/job-manager/api/v1"
"github.com/llmariner/job-manager/server/internal/k8s"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

// NewSyncerServiceServer creates a new syncer service server.
func NewSyncerServiceServer(logger logr.Logger, k8sClientFactory k8s.ClientFactory, scheduler schedulerI) *SS {
return &SS{
logger: logger.WithName("syncer"),
k8sClientFactory: k8sClientFactory,
scheduler: scheduler,
}
}

// SS is a server for syncer services.
type SS struct {
v1.UnimplementedSyncerServiceServer

srv *grpc.Server
k8sClientFactory k8s.ClientFactory
scheduler schedulerI
logger logr.Logger
}

// Run runs the syncer service server.
func (ss *SS) Run(ctx context.Context, port int) error {
ss.logger.Info("Starting syncer service server...", "port", port)

// TODO: support auth
fakeAuth := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
return handler(fakeAuthInto(ctx), req)
}
opt := grpc.ChainUnaryInterceptor(fakeAuth)

srv := grpc.NewServer(opt)
v1.RegisterSyncerServiceServer(srv, ss)
reflection.Register(srv)
ss.srv = srv

l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return fmt.Errorf("listen: %w", err)
}
if err := srv.Serve(l); err != nil {
return fmt.Errorf("serve: %w", err)
}
return nil
}
Loading