Skip to content

Commit

Permalink
stream: rename types to avoid stutters
Browse files Browse the repository at this point in the history
  • Loading branch information
criyle committed Feb 5, 2024
1 parent c29d0ad commit 8dd368a
Show file tree
Hide file tree
Showing 16 changed files with 79 additions and 54 deletions.
12 changes: 9 additions & 3 deletions cmd/go-judge-grpc-proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (

"github.com/criyle/go-judge/pb"
"github.com/gin-gonic/gin"
"github.com/golang/protobuf/jsonpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"
)

Expand All @@ -28,7 +29,12 @@ type execProxy struct {

func (p *execProxy) Exec(c *gin.Context) {
req := new(pb.Request)
if err := jsonpb.Unmarshal(c.Request.Body, req); err != nil {
b, err := io.ReadAll(c.Request.Body)
if err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
if err := protojson.Unmarshal(b, req); err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
Expand Down Expand Up @@ -125,7 +131,7 @@ func (p *execProxy) FileDelete(c *gin.Context) {
func main() {
flag.Parse()
token := os.Getenv("TOKEN")
opts := []grpc.DialOption{grpc.WithInsecure()}
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if token != "" {
opts = append(opts, grpc.WithPerRPCCredentials(newTokenAuth(token)))
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/go-judge-shell/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"golang.org/x/term"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

var (
Expand Down Expand Up @@ -43,7 +44,7 @@ func main() {
}

token := os.Getenv("TOKEN")
opts := []grpc.DialOption{grpc.WithInsecure()}
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if token != "" {
opts = append(opts, grpc.WithPerRPCCredentials(newTokenAuth(token)))
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/go-judge/grpc_executor/grpc_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type streamWrapper struct {
es pb.Executor_ExecStreamServer
}

func (sw *streamWrapper) Send(r stream.StreamResponse) error {
func (sw *streamWrapper) Send(r stream.Response) error {
res := &pb.StreamResponse{}
switch {
case r.Response != nil:
Expand All @@ -34,29 +34,29 @@ func (sw *streamWrapper) Send(r stream.StreamResponse) error {
return sw.es.Send(res)
}

func (sw *streamWrapper) Recv() (*stream.StreamRequest, error) {
func (sw *streamWrapper) Recv() (*stream.Request, error) {
req, err := sw.es.Recv()
if err != nil {
return nil, err
}
switch i := req.Request.(type) {
case *pb.StreamRequest_ExecRequest:
return &stream.StreamRequest{Request: convertPBStreamRequest(i.ExecRequest)}, nil
return &stream.Request{Request: convertPBStreamRequest(i.ExecRequest)}, nil
case *pb.StreamRequest_ExecInput:
return &stream.StreamRequest{Input: &stream.InputRequest{
return &stream.Request{Input: &stream.InputRequest{
Name: i.ExecInput.Name,
Content: i.ExecInput.Content,
}}, nil
case *pb.StreamRequest_ExecResize:
return &stream.StreamRequest{Resize: &stream.ResizeRequest{
return &stream.Request{Resize: &stream.ResizeRequest{
Name: i.ExecResize.Name,
Rows: int(i.ExecResize.Rows),
Cols: int(i.ExecResize.Cols),
X: int(i.ExecResize.X),
Y: int(i.ExecResize.Y),
}}, nil
case *pb.StreamRequest_ExecCancel:
return &stream.StreamRequest{Cancel: &struct{}{}}, nil
return &stream.Request{Cancel: &struct{}{}}, nil
}
return nil, errors.ErrUnsupported
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/go-judge/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newListener(addr string) (net.Listener, error) {
if host == "" {
return net.Listen("tcp", addr)
} else if host == "localhost" {
ips, err = getLocalhostIp()
ips, err = getLocalhostIP()
if err != nil {
return nil, err
}
Expand All @@ -51,7 +51,7 @@ func newListener(addr string) (net.Listener, error) {
return newMultiListener(ips, iPort)
}

func getLocalhostIp() ([]net.IP, error) {
func getLocalhostIP() ([]net.IP, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions cmd/go-judge/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Response struct {
mmap bool
}

// Close need to be called when mmap specified to be true
func (r *Response) Close() {
if !r.mmap {
return
Expand All @@ -127,6 +128,7 @@ func (r *Response) Close() {
}
}

// Close need to be called when mmap specified to be true
func (r *Result) Close() {
// remove temporary files
for _, f := range r.files {
Expand Down Expand Up @@ -318,6 +320,7 @@ func convertCmdFile(f *CmdFile, srcPrefix []string) (worker.CmdFile, error) {
}
}

// CheckPathPrefixes ensure path is allowed by prefixes
func CheckPathPrefixes(path string, prefixes []string) (bool, error) {
for _, p := range prefixes {
ok, err := checkPathPrefix(path, p)
Expand Down
8 changes: 4 additions & 4 deletions cmd/go-judge/model/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ func fileToByteGeneric(f *os.File) ([]byte, error) {
if _, err := f.Seek(0, 0); err != nil {
return nil, err
}
var s int64
if fi, err := f.Stat(); err != nil {
fi, err := f.Stat()
if err != nil {
return nil, err
} else {
s = fi.Size()
}
s := fi.Size()

c := make([]byte, s)
if _, err := io.ReadFull(f, c); err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions cmd/go-judge/model/util_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ func fileToByte(f *os.File, mmap bool) ([]byte, error) {
func fileToByteMmap(f *os.File) ([]byte, error) {
defer f.Close()

var s int64
if fi, err := f.Stat(); err != nil {
fi, err := f.Stat()
if err != nil {
return nil, err
} else {
s = fi.Size()
}
s := fi.Size()
if s == 0 {
return []byte{}, nil
}
Expand Down
20 changes: 14 additions & 6 deletions cmd/go-judge/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@ const (
minBuffLen = 4 << 10
)

// Stream defines the transport layer for the stream execution that
// stream input and output interactively
type Stream interface {
Send(StreamResponse) error
Recv() (*StreamRequest, error)
Send(Response) error
Recv() (*Request, error)
}

type StreamRequest struct {
// Request defines operations receive from the remote
type Request struct {
Request *model.Request
Resize *ResizeRequest
Input *InputRequest
Cancel *struct{}
}

type StreamResponse struct {
// Response defines response to the remote
type Response struct {
Response *model.Response
Output *OutputResponse
}

// ResizeRequest defines resize operation to the virtual terminal
type ResizeRequest struct {
Name string
Rows int
Expand All @@ -42,11 +47,13 @@ type ResizeRequest struct {
Y int
}

// InputRequest defines input operation from the remote
type InputRequest struct {
Name string
Content []byte
}

// OutputResponse defines output result to the remote
type OutputResponse struct {
Name string
Content []byte
Expand All @@ -56,6 +63,7 @@ var (
errFirstMustBeExec = errors.New("the first stream request must be exec request")
)

// Start initiate a interactive execution on the worker and transmit the request and response over Stream transport layer
func Start(baseCtx context.Context, s Stream, w worker.Worker, srcPrefix []string, logger *zap.Logger) error {
req, err := s.Recv()
if err != nil {
Expand Down Expand Up @@ -122,7 +130,7 @@ func sendLoop(ctx context.Context, s Stream, outCh chan *OutputResponse, rtCh <-
return ctx.Err()

case o := <-outCh:
err := s.Send(StreamResponse{Output: o})
err := s.Send(Response{Output: o})
if err != nil {
return fmt.Errorf("send output: %w", err)
}
Expand All @@ -133,7 +141,7 @@ func sendLoop(ctx context.Context, s Stream, outCh chan *OutputResponse, rtCh <-
if err != nil {
return fmt.Errorf("convert response: %w", err)
}
return s.Send(StreamResponse{Response: &model.Response{Results: ret.Results}})
return s.Send(Response{Response: &model.Response{Results: ret.Results}})
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/go-judge/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
//go:embed version.*
var versions embed.FS

// Version defines the version of go-judge
var Version string = "unable to get version"

func init() {
Expand Down
24 changes: 12 additions & 12 deletions cmd/go-judge/ws_executor/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type wsHandle struct {

type wsRequest struct {
model.Request
CancelRequestId string `json:"cancelRequestId"`
CancelRequestID string `json:"cancelRequestId"`
}

func (h *wsHandle) Register(r *gin.Engine) {
Expand All @@ -66,9 +66,9 @@ func (h *wsHandle) handleWS(c *gin.Context) {
cm := newContextMap()

handleRequest := func(baseCtx context.Context, req *wsRequest) error {
if req.CancelRequestId != "" {
h.logger.Sugar().Debugf("ws cancel: %s", req.CancelRequestId)
cm.Remove(req.CancelRequestId)
if req.CancelRequestID != "" {
h.logger.Sugar().Debugf("ws cancel: %s", req.CancelRequestID)
cm.Remove(req.CancelRequestID)
return nil
}
r, err := model.ConvertRequest(&req.Request, h.srcPrefix)
Expand Down Expand Up @@ -185,26 +185,26 @@ func newContextMap() *contextMap {
return &contextMap{m: make(map[string]context.CancelFunc)}
}

func (c *contextMap) Add(reqId string, cancel context.CancelFunc) error {
if reqId == "" {
func (c *contextMap) Add(reqID string, cancel context.CancelFunc) error {
if reqID == "" {
return fmt.Errorf("empty request id")
}
c.mu.Lock()
defer c.mu.Unlock()

if _, exist := c.m[reqId]; exist {
return fmt.Errorf("duplicated request id: %v", reqId)
if _, exist := c.m[reqID]; exist {
return fmt.Errorf("duplicated request id: %v", reqID)
}
c.m[reqId] = cancel
c.m[reqID] = cancel
return nil
}

func (c *contextMap) Remove(reqId string) {
func (c *contextMap) Remove(reqID string) {
c.mu.Lock()
defer c.mu.Unlock()

if cancel, exist := c.m[reqId]; exist {
delete(c.m, reqId)
if cancel, exist := c.m[reqID]; exist {
delete(c.m, reqID)
cancel()
}
}
2 changes: 1 addition & 1 deletion env/env_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func newCgroup(c Config) (cgroup.Cgroup, error) {
c.Error("Failed to get available controllers", err)
return nil, err
}
if t == cgroup.CgroupTypeV2 {
if t == cgroup.TypeV2 {
// Check if running on a systemd enabled system
c.Info("Running with cgroup v2, connecting systemd dbus to create cgroup")
var conn *dbus.Conn
Expand Down
11 changes: 8 additions & 3 deletions envexec/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ type Result struct {
FileError []FileError
}

// FileErrorType defines the location that file operation fails
type FileErrorType int

// FileError enums
const (
ErrCopyInOpenFile FileErrorType = iota
ErrCopyInCreateDir
Expand All @@ -102,6 +104,7 @@ const (
ErrSymlink
)

// FileError defines the location, file name and the detailed message for a failed file operation
type FileError struct {
Name string `json:"name"`
Type FileErrorType `json:"type"`
Expand Down Expand Up @@ -131,17 +134,19 @@ func (t FileErrorType) String() string {
return ""
}

// MarshalJSON encodes file error into json string
func (t FileErrorType) MarshalJSON() ([]byte, error) {
return []byte(`"` + t.String() + `"`), nil
}

// UnmarshalJSON decodes file error from json string
func (t *FileErrorType) UnmarshalJSON(b []byte) error {
str := string(b)
if v, ok := fileErrorStringReverse[str]; ok {
v, ok := fileErrorStringReverse[str]
if ok {
return fmt.Errorf("%s is not file error type", str)
} else {
*t = v
}
*t = v
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions envexec/doc.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Package envexec provides utility function to run program in restricted environments
// through container and cgroup.
//
// Cmd
// # Cmd
//
// Cmd defines single program to run, including copyin files before exec, run the program and copy
// out files after exec
//
// Single
// ## Single
//
// Single defines single Cmd with Environment and Cgroup Pool
// Single defines single Cmd with Environment and Cgroup Pool
//
// Group
// ## Group
//
// Group defines multiple Cmd with Environment and Cgroup Pool, together with Pipe mapping between
// different Cmd
Expand Down
4 changes: 3 additions & 1 deletion envexec/file_util_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func readerToFile(reader io.Reader) (*os.File, error) {

func copyDir(src *os.File, dst string) error {
// make sure dir exists
os.MkdirAll(dst, 0777)
if err := os.MkdirAll(dst, 0777); err != nil {
return err
}
newDir, err := os.Open(dst)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 8dd368a

Please sign in to comment.