Skip to content

Commit

Permalink
stream: decoupled stream execution with grpc stream
Browse files Browse the repository at this point in the history
  • Loading branch information
criyle committed Feb 3, 2024
1 parent 485aa02 commit 010c30f
Show file tree
Hide file tree
Showing 15 changed files with 483 additions and 281 deletions.
8 changes: 4 additions & 4 deletions cmd/go-judge-ffi/cinterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,14 @@ func FileDelete(e *C.char) C.int {

type nopLogger struct{}

func (nopLogger) Debug(args ...interface{}) {
func (nopLogger) Debug(args ...any) {
}

func (nopLogger) Info(args ...interface{}) {
func (nopLogger) Info(args ...any) {
}

func (nopLogger) Warn(args ...interface{}) {
func (nopLogger) Warn(args ...any) {
}

func (nopLogger) Error(args ...interface{}) {
func (nopLogger) Error(args ...any) {
}
72 changes: 10 additions & 62 deletions cmd/go-judge/grpc_executor/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/criyle/go-judge/cmd/go-judge/model"
Expand All @@ -18,14 +17,6 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

const buffLen = 4096

var buffPool = sync.Pool{
New: func() interface{} {
return make([]byte, buffLen)
},
}

// New creates grpc executor server
func New(worker worker.Worker, fs filestore.FileStore, srcPrefix []string, logger *zap.Logger) pb.ExecutorServer {
return &execServer{
Expand All @@ -45,13 +36,10 @@ type execServer struct {
}

func (e *execServer) Exec(ctx context.Context, req *pb.Request) (*pb.Response, error) {
r, si, so, err := convertPBRequest(req, e.srcPrefix)
r, err := convertPBRequest(req, e.srcPrefix)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if len(si) > 0 || len(so) > 0 {
return nil, status.Error(codes.InvalidArgument, "stream in / out are not available for exec request")
}
e.logger.Sugar().Debugf("request: %+v", r)
rtCh, _ := e.worker.Submit(ctx, r)
rt := <-rtCh
Expand Down Expand Up @@ -166,38 +154,24 @@ func convertPBFileError(fe []envexec.FileError) []*pb.Response_FileError {
return rt
}

func convertPBRequest(r *pb.Request, srcPrefix []string) (req *worker.Request, streamIn []*fileStreamIn, streamOut []*fileStreamOut, err error) {
defer func() {
if err != nil {
for _, fi := range streamIn {
fi.Close()
}
streamIn = nil
for _, fi := range streamOut {
fi.Close()
}
streamOut = nil
}
}()
func convertPBRequest(r *pb.Request, srcPrefix []string) (req *worker.Request, err error) {
req = &worker.Request{
RequestID: r.RequestID,
Cmd: make([]worker.Cmd, 0, len(r.Cmd)),
PipeMapping: make([]worker.PipeMap, 0, len(r.PipeMapping)),
}
for _, c := range r.Cmd {
cm, si, so, err := convertPBCmd(c, srcPrefix)
streamIn = append(streamIn, si...)
streamOut = append(streamOut, so...)
cm, err := convertPBCmd(c, srcPrefix)
if err != nil {
return nil, streamIn, streamOut, err
return nil, err
}
req.Cmd = append(req.Cmd, cm)
}
for _, p := range r.PipeMapping {
pm := convertPBPipeMap(p)
req.PipeMapping = append(req.PipeMapping, pm)
}
return req, streamIn, streamOut, nil
return req, nil
}

func convertPBPipeMap(p *pb.Request_PipeMap) worker.PipeMap {
Expand All @@ -216,19 +190,7 @@ func convertPBPipeMap(p *pb.Request_PipeMap) worker.PipeMap {
}
}

func convertPBCmd(c *pb.Request_CmdType, srcPrefix []string) (cm worker.Cmd, streamIn []*fileStreamIn, streamOut []*fileStreamOut, err error) {
defer func() {
if err != nil {
for _, fi := range streamIn {
fi.Close()
}
streamIn = nil
for _, fi := range streamOut {
fi.Close()
}
streamOut = nil
}
}()
func convertPBCmd(c *pb.Request_CmdType, srcPrefix []string) (cm worker.Cmd, err error) {
cm = worker.Cmd{
Args: c.GetArgs(),
Env: c.GetEnv(),
Expand All @@ -249,23 +211,9 @@ func convertPBCmd(c *pb.Request_CmdType, srcPrefix []string) (cm worker.Cmd, str
Symlinks: c.GetSymlinks(),
}
for _, f := range c.GetFiles() {
var cf worker.CmdFile
switch fi := f.File.(type) {
case *pb.Request_File_StreamIn:
si := newFileStreamIn(fi.StreamIn.GetName(), c.GetTty())
streamIn = append(streamIn, si)
cf = si

case *pb.Request_File_StreamOut:
so := newFileStreamOut(fi.StreamOut.GetName())
streamOut = append(streamOut, so)
cf = so

default:
cf, err = convertPBFile(f, srcPrefix)
}
cf, err := convertPBFile(f, srcPrefix)
if err != nil {
return cm, streamIn, streamOut, err
return cm, err
}
cm.Files = append(cm.Files, cf)
}
Expand All @@ -274,12 +222,12 @@ func convertPBCmd(c *pb.Request_CmdType, srcPrefix []string) (cm worker.Cmd, str
for k, f := range copyIn {
cf, err := convertPBFile(f, srcPrefix)
if err != nil {
return cm, streamIn, streamOut, err
return cm, err
}
cm.CopyIn[k] = cf
}
}
return cm, streamIn, streamOut, nil
return cm, nil
}

func convertPBFile(c *pb.Request_File, srcPrefix []string) (worker.CmdFile, error) {
Expand Down
13 changes: 0 additions & 13 deletions cmd/go-judge/grpc_executor/grpc_other.go

This file was deleted.

Loading

0 comments on commit 010c30f

Please sign in to comment.