Skip to content

Commit

Permalink
create jobs queue for scheduling new builds
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Sehnoutka committed Sep 24, 2019
1 parent 7df735e commit cc671b3
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 5 deletions.
51 changes: 46 additions & 5 deletions internal/weldr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/julienschmidt/httprouter"

"osbuild-composer/internal/rpmmd"
"osbuild-composer/queue"
)

type API struct {
store *store
store *store
pendingBuilds chan queue.Build

repo rpmmd.RepoConfig
packages rpmmd.PackageList
Expand All @@ -23,11 +25,14 @@ type API struct {
}

func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte) *API {
// This needs to be shared with the worker API so that they can communicate with each other
builds := make(chan queue.Build, 200)
api := &API{
store: newStore(initialState, stateChannel),
repo: repo,
packages: packages,
logger: logger,
store: newStore(initialState, stateChannel),
pendingBuilds: builds,
repo: repo,
packages: packages,
logger: logger,
}

// sample blueprint on first run
Expand Down Expand Up @@ -67,6 +72,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger,
api.router.DELETE("/api/v0/blueprints/delete/:blueprint", api.blueprintDeleteHandler)
api.router.DELETE("/api/v0/blueprints/workspace/:blueprint", api.blueprintDeleteWorkspaceHandler)

api.router.POST("/api/v0/compose", api.composeHandler)
api.router.GET("/api/v0/compose/queue", api.composeQueueHandler)
api.router.GET("/api/v0/compose/finished", api.composeFinishedHandler)
api.router.GET("/api/v0/compose/failed", api.composeFailedHandler)
Expand Down Expand Up @@ -510,6 +516,41 @@ func (api *API) blueprintDeleteWorkspaceHandler(writer http.ResponseWriter, requ
statusResponseOK(writer)
}

// Schedule new compose by first translating the appropriate blueprint into a pipeline and then
// pushing it into the channel for waiting builds.
func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
// https://weldr.io/lorax/pylorax.api.html#pylorax.api.v0.v0_compose_start
type ComposeRequest struct {
BlueprintName string `json:"blueprint_name"`
ComposeType string `json:"compose_type"`
Branch string `json:"branch"`
}

contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {
statusResponseError(writer, http.StatusUnsupportedMediaType, "blueprint must be json")
return
}

var cr ComposeRequest
err := json.NewDecoder(request.Body).Decode(&cr)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid request format: "+err.Error())
return
}

bp := blueprint{}
changed := false
api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed?

api.pendingBuilds <- queue.Build{
Pipeline: bp.translateToPipeline(cr.ComposeType),
Manifest: "{\"output-path\": \"/var/cache/osbuild\"}",
}

statusResponseOK(writer)
}

func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
var reply struct {
New []interface{} `json:"new"`
Expand Down
5 changes: 5 additions & 0 deletions internal/weldr/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package weldr

import (
"encoding/json"
"fmt"
"log"
"sort"
"sync"
Expand Down Expand Up @@ -133,3 +134,7 @@ func (s *store) deleteBlueprintFromWorkspace(name string) {
delete(s.Workspace, name)
})
}

func (b *blueprint) translateToPipeline(outputFormat string) string {
return fmt.Sprintf("{\"pipeline\": \"%s\"}", "string")
}
63 changes: 63 additions & 0 deletions queue/manifest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package queue

import (
"fmt"
"io"
"os"
)

// Manifest contains additional metadata attached do a pipeline that are necessary for workers
type Manifest struct {
destination string
}

// SaveImage saves "src" into provided destination
func (m *Manifest) SaveImage(src string) error {
BUFFERSIZE := 4096 // Magic :)

sourceFileStat, err := os.Stat(src)
if err != nil {
return err
}

if !sourceFileStat.Mode().IsRegular() {
return fmt.Errorf("%s is not a regular file", src)
}

source, err := os.Open(src)
if err != nil {
return err
}
defer source.Close()

_, err = os.Stat(m.destination)
if err == nil {
return fmt.Errorf("file %s already exists", m.destination)
}

destination, err := os.Create(m.destination)
if err != nil {
return err
}
defer destination.Close()

if err != nil {
panic(err)
}

buf := make([]byte, BUFFERSIZE)
for {
n, err := source.Read(buf)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}

if _, err := destination.Write(buf[:n]); err != nil {
return err
}
}
return err
}
75 changes: 75 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package queue

import (
"sync"
"time"
)

// Build is a request waiting for a worker
type Build struct {
Pipeline string `json:"pipeline"`
Manifest string `json:"manifest"`
}

// Job is an image build already in progress
type Job struct {
UUID string `json:"uuid"`
WorkerID string `json:"worker-id"`
Build Build `json:"build"`
}

// JobQueue contains already running jobs waiting for
type JobQueue struct {
sync.Mutex
incomingBuilds chan Build
queue []Job
timeoutSeconds int
}

// NewJobQueue creates object of type JobQueue
func NewJobQueue(timeout int, builds chan Build) *JobQueue {
return &JobQueue{
incomingBuilds: builds,
queue: make([]Job, 10),
timeoutSeconds: timeout,
}
}

// StartNewJob starts a new job
func (j *JobQueue) StartNewJob(id string, worker string) Job {
newBuild := <-j.incomingBuilds
job := Job{
UUID: id,
WorkerID: worker,
Build: newBuild,
}

j.Lock()
j.queue = append(j.queue, job)
go func() {
var jobs *JobQueue = j
// just to make it explicit, that we want a pointer
// to that queue
time.Sleep(time.Duration(jobs.timeoutSeconds) * time.Second)
stillRunning := false
idx := 0
jobs.Lock()
for i := range jobs.queue {
// I we iterated over elements in the queue, Go would copy each
// element into the iterator, therefore we iterate over indexes only
if jobs.queue[i].UUID == id {
stillRunning = true
idx = i
}
}
if stillRunning {
// Reschedule the build
jobs.incomingBuilds <- jobs.queue[idx].Build
// Skip this element in the queue => remove it
copy(jobs.queue[idx:], jobs.queue[idx+1:])
}
jobs.Unlock()
}()
j.Unlock()
return job
}
31 changes: 31 additions & 0 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package queue_test

import (
"osbuild-composer/queue"
"testing"
"time"
)

func TestJobTimeout(t *testing.T) {
builds := make(chan queue.Build, 10)
jobs := queue.NewJobQueue(1, builds)
b := queue.Build{
Pipeline: "pipeline",
Manifest: "manifest",
}
builds <- b
jobs.StartNewJob("uuid1", "worker1")
time.Sleep(2 * time.Second)
select {
case build, ok := <-builds:
if !ok {
t.Error("Channel is not supposed to be closed.")
} else {
if build.Pipeline != "pipeline" || build.Manifest != "manifest" {
t.Error("Unexpected build in the channel.")
}
}
default:
t.Error("Channel is not supposed to be empty.")
}
}

0 comments on commit cc671b3

Please sign in to comment.