From cc671b35fcb6867478f0d0a18b84ce8d71101b0c Mon Sep 17 00:00:00 2001 From: Martin Sehnoutka Date: Tue, 24 Sep 2019 15:58:16 +0200 Subject: [PATCH] create jobs queue for scheduling new builds --- internal/weldr/api.go | 51 +++++++++++++++++++++++++--- internal/weldr/store.go | 5 +++ queue/manifest.go | 63 ++++++++++++++++++++++++++++++++++ queue/queue.go | 75 +++++++++++++++++++++++++++++++++++++++++ queue/queue_test.go | 31 +++++++++++++++++ 5 files changed, 220 insertions(+), 5 deletions(-) create mode 100644 queue/manifest.go create mode 100644 queue/queue.go create mode 100644 queue/queue_test.go diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 472c38ffbc0..eaae4e702d2 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -10,10 +10,12 @@ import ( "github.com/julienschmidt/httprouter" "osbuild-composer/internal/rpmmd" + "osbuild-composer/queue" ) type API struct { - store *store + store *store + pendingBuilds chan queue.Build repo rpmmd.RepoConfig packages rpmmd.PackageList @@ -23,11 +25,14 @@ type API struct { } func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte) *API { + // This needs to be shared with the worker API so that they can communicate with each other + builds := make(chan queue.Build, 200) api := &API{ - store: newStore(initialState, stateChannel), - repo: repo, - packages: packages, - logger: logger, + store: newStore(initialState, stateChannel), + pendingBuilds: builds, + repo: repo, + packages: packages, + logger: logger, } // sample blueprint on first run @@ -67,6 +72,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, api.router.DELETE("/api/v0/blueprints/delete/:blueprint", api.blueprintDeleteHandler) api.router.DELETE("/api/v0/blueprints/workspace/:blueprint", api.blueprintDeleteWorkspaceHandler) + api.router.POST("/api/v0/compose", api.composeHandler) api.router.GET("/api/v0/compose/queue", api.composeQueueHandler) api.router.GET("/api/v0/compose/finished", api.composeFinishedHandler) api.router.GET("/api/v0/compose/failed", api.composeFailedHandler) @@ -510,6 +516,41 @@ func (api *API) blueprintDeleteWorkspaceHandler(writer http.ResponseWriter, requ statusResponseOK(writer) } +// Schedule new compose by first translating the appropriate blueprint into a pipeline and then +// pushing it into the channel for waiting builds. +func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { + // https://weldr.io/lorax/pylorax.api.html#pylorax.api.v0.v0_compose_start + type ComposeRequest struct { + BlueprintName string `json:"blueprint_name"` + ComposeType string `json:"compose_type"` + Branch string `json:"branch"` + } + + contentType := request.Header["Content-Type"] + if len(contentType) != 1 || contentType[0] != "application/json" { + statusResponseError(writer, http.StatusUnsupportedMediaType, "blueprint must be json") + return + } + + var cr ComposeRequest + err := json.NewDecoder(request.Body).Decode(&cr) + if err != nil { + statusResponseError(writer, http.StatusBadRequest, "invalid request format: "+err.Error()) + return + } + + bp := blueprint{} + changed := false + api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed? + + api.pendingBuilds <- queue.Build{ + Pipeline: bp.translateToPipeline(cr.ComposeType), + Manifest: "{\"output-path\": \"/var/cache/osbuild\"}", + } + + statusResponseOK(writer) +} + func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { var reply struct { New []interface{} `json:"new"` diff --git a/internal/weldr/store.go b/internal/weldr/store.go index 9774d98f025..58763a7056c 100644 --- a/internal/weldr/store.go +++ b/internal/weldr/store.go @@ -2,6 +2,7 @@ package weldr import ( "encoding/json" + "fmt" "log" "sort" "sync" @@ -133,3 +134,7 @@ func (s *store) deleteBlueprintFromWorkspace(name string) { delete(s.Workspace, name) }) } + +func (b *blueprint) translateToPipeline(outputFormat string) string { + return fmt.Sprintf("{\"pipeline\": \"%s\"}", "string") +} diff --git a/queue/manifest.go b/queue/manifest.go new file mode 100644 index 00000000000..065a94560cc --- /dev/null +++ b/queue/manifest.go @@ -0,0 +1,63 @@ +package queue + +import ( + "fmt" + "io" + "os" +) + +// Manifest contains additional metadata attached do a pipeline that are necessary for workers +type Manifest struct { + destination string +} + +// SaveImage saves "src" into provided destination +func (m *Manifest) SaveImage(src string) error { + BUFFERSIZE := 4096 // Magic :) + + sourceFileStat, err := os.Stat(src) + if err != nil { + return err + } + + if !sourceFileStat.Mode().IsRegular() { + return fmt.Errorf("%s is not a regular file", src) + } + + source, err := os.Open(src) + if err != nil { + return err + } + defer source.Close() + + _, err = os.Stat(m.destination) + if err == nil { + return fmt.Errorf("file %s already exists", m.destination) + } + + destination, err := os.Create(m.destination) + if err != nil { + return err + } + defer destination.Close() + + if err != nil { + panic(err) + } + + buf := make([]byte, BUFFERSIZE) + for { + n, err := source.Read(buf) + if err != nil && err != io.EOF { + return err + } + if n == 0 { + break + } + + if _, err := destination.Write(buf[:n]); err != nil { + return err + } + } + return err +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 00000000000..08e9978e9fa --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,75 @@ +package queue + +import ( + "sync" + "time" +) + +// Build is a request waiting for a worker +type Build struct { + Pipeline string `json:"pipeline"` + Manifest string `json:"manifest"` +} + +// Job is an image build already in progress +type Job struct { + UUID string `json:"uuid"` + WorkerID string `json:"worker-id"` + Build Build `json:"build"` +} + +// JobQueue contains already running jobs waiting for +type JobQueue struct { + sync.Mutex + incomingBuilds chan Build + queue []Job + timeoutSeconds int +} + +// NewJobQueue creates object of type JobQueue +func NewJobQueue(timeout int, builds chan Build) *JobQueue { + return &JobQueue{ + incomingBuilds: builds, + queue: make([]Job, 10), + timeoutSeconds: timeout, + } +} + +// StartNewJob starts a new job +func (j *JobQueue) StartNewJob(id string, worker string) Job { + newBuild := <-j.incomingBuilds + job := Job{ + UUID: id, + WorkerID: worker, + Build: newBuild, + } + + j.Lock() + j.queue = append(j.queue, job) + go func() { + var jobs *JobQueue = j + // just to make it explicit, that we want a pointer + // to that queue + time.Sleep(time.Duration(jobs.timeoutSeconds) * time.Second) + stillRunning := false + idx := 0 + jobs.Lock() + for i := range jobs.queue { + // I we iterated over elements in the queue, Go would copy each + // element into the iterator, therefore we iterate over indexes only + if jobs.queue[i].UUID == id { + stillRunning = true + idx = i + } + } + if stillRunning { + // Reschedule the build + jobs.incomingBuilds <- jobs.queue[idx].Build + // Skip this element in the queue => remove it + copy(jobs.queue[idx:], jobs.queue[idx+1:]) + } + jobs.Unlock() + }() + j.Unlock() + return job +} diff --git a/queue/queue_test.go b/queue/queue_test.go new file mode 100644 index 00000000000..964e57bf6c6 --- /dev/null +++ b/queue/queue_test.go @@ -0,0 +1,31 @@ +package queue_test + +import ( + "osbuild-composer/queue" + "testing" + "time" +) + +func TestJobTimeout(t *testing.T) { + builds := make(chan queue.Build, 10) + jobs := queue.NewJobQueue(1, builds) + b := queue.Build{ + Pipeline: "pipeline", + Manifest: "manifest", + } + builds <- b + jobs.StartNewJob("uuid1", "worker1") + time.Sleep(2 * time.Second) + select { + case build, ok := <-builds: + if !ok { + t.Error("Channel is not supposed to be closed.") + } else { + if build.Pipeline != "pipeline" || build.Manifest != "manifest" { + t.Error("Unexpected build in the channel.") + } + } + default: + t.Error("Channel is not supposed to be empty.") + } +}