Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create jobs queue for scheduling new builds #3

Merged
merged 1 commit into from
Sep 25, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/osbuild-composer/main.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"os/signal"
"path/filepath"

"osbuild-composer/internal/queue"
"osbuild-composer/internal/rpmmd"
"osbuild-composer/internal/weldr"
)
@@ -59,7 +60,8 @@ func main() {
}

stateChannel := make(chan []byte, 10)
api := weldr.New(repo, packages, logger, state, stateChannel)
buildChannel := make(chan queue.Build, 200)
api := weldr.New(repo, packages, logger, state, stateChannel, buildChannel)
go func() {
for {
err := writeFileAtomically(StateFile, <-stateChannel, 0755)
62 changes: 62 additions & 0 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package queue

import "sync"

// Build is a request waiting for a worker
type Build struct {
Pipeline string `json:"pipeline"`
Manifest string `json:"manifest"`
}

// Manifest contains additional metadata attached do a pipeline that are necessary for workers
type Manifest struct {
destination string
}

// Job is an image build already in progress
type Job struct {
UUID string `json:"uuid"`
Build Build `json:"build"`
}

// JobQueue contains already running jobs waiting for
type JobQueue struct {
sync.Mutex
incomingBuilds chan Build // Channel of incoming builds form Weldr API, we never want to block on this
waitingBuilds []Build // Unbounded FIFO queue of waiting builds
runningJobs map[string]Job // Already running jobs, key is UUID
}

// NewJobQueue creates object of type JobQueue
func NewJobQueue(timeout int, builds chan Build) *JobQueue {
jobs := &JobQueue{
incomingBuilds: builds,
waitingBuilds: make([]Build, 0),
runningJobs: make(map[string]Job),
}
go func() {
for {
// This call will block, do not put it inside the locked zone
newBuild := <-jobs.incomingBuilds
// Locking the whole job queue => as short as possible
jobs.Lock()
jobs.waitingBuilds = append(jobs.waitingBuilds, newBuild)
jobs.Unlock()
}
}()
return jobs
}

// StartNewJob starts a new job
func (j *JobQueue) StartNewJob(id string, worker string) Job {
j.Lock()
newBuild := j.waitingBuilds[0] // Take the first element
j.waitingBuilds = j.waitingBuilds[1:] // Discart 1st element
j.Unlock()
job := Job{
UUID: id,
Build: newBuild,
}
j.runningJobs[id] = job
return job
}
58 changes: 52 additions & 6 deletions internal/weldr/api.go
Original file line number Diff line number Diff line change
@@ -9,11 +9,13 @@ import (

"github.com/julienschmidt/httprouter"

"osbuild-composer/internal/queue"
"osbuild-composer/internal/rpmmd"
)

type API struct {
store *store
store *store
pendingBuilds chan queue.Build

repo rpmmd.RepoConfig
packages rpmmd.PackageList
@@ -22,12 +24,15 @@ type API struct {
router *httprouter.Router
}

func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte) *API {
func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte, builds chan queue.Build) *API {
// This needs to be shared with the worker API so that they can communicate with each other
// builds := make(chan queue.Build, 200)
api := &API{
store: newStore(initialState, stateChannel),
repo: repo,
packages: packages,
logger: logger,
store: newStore(initialState, stateChannel),
pendingBuilds: builds,
repo: repo,
packages: packages,
logger: logger,
}

// sample blueprint on first run
@@ -67,6 +72,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger,
api.router.DELETE("/api/v0/blueprints/delete/:blueprint", api.blueprintDeleteHandler)
api.router.DELETE("/api/v0/blueprints/workspace/:blueprint", api.blueprintDeleteWorkspaceHandler)

api.router.POST("/api/v0/compose", api.composeHandler)
api.router.GET("/api/v0/compose/queue", api.composeQueueHandler)
api.router.GET("/api/v0/compose/finished", api.composeFinishedHandler)
api.router.GET("/api/v0/compose/failed", api.composeFailedHandler)
@@ -510,6 +516,46 @@ func (api *API) blueprintDeleteWorkspaceHandler(writer http.ResponseWriter, requ
statusResponseOK(writer)
}

// Schedule new compose by first translating the appropriate blueprint into a pipeline and then
// pushing it into the channel for waiting builds.
func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
// https://weldr.io/lorax/pylorax.api.html#pylorax.api.v0.v0_compose_start
type ComposeRequest struct {
BlueprintName string `json:"blueprint_name"`
ComposeType string `json:"compose_type"`
Branch string `json:"branch"`
}

contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {
statusResponseError(writer, http.StatusUnsupportedMediaType, "blueprint must be json")
return
}

var cr ComposeRequest
err := json.NewDecoder(request.Body).Decode(&cr)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid request format: "+err.Error())
return
}

bp := blueprint{}
changed := false
found := api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed?

if found {
api.pendingBuilds <- queue.Build{
Pipeline: bp.translateToPipeline(cr.ComposeType),
Manifest: "{\"output-path\": \"/var/cache/osbuild\"}",
}
} else {
statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist")
return
}

statusResponseOK(writer)
}

func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
var reply struct {
New []interface{} `json:"new"`
30 changes: 28 additions & 2 deletions internal/weldr/api_test.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"strings"
"testing"

"osbuild-composer/internal/queue"
"osbuild-composer/internal/rpmmd"
"osbuild-composer/internal/weldr"
)
@@ -111,13 +112,13 @@ func TestBasic(t *testing.T) {
}

for _, c := range cases {
api := weldr.New(repo, packages, nil, nil, nil)
api := weldr.New(repo, packages, nil, nil, nil, nil)
testRoute(t, api, "GET", c.Path, ``, c.ExpectedStatus, c.ExpectedJSON)
}
}

func TestBlueprints(t *testing.T) {
api := weldr.New(repo, packages, nil, nil, nil)
api := weldr.New(repo, packages, nil, nil, nil, nil)

testRoute(t, api, "POST", "/api/v0/blueprints/new",
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0"}`,
@@ -135,3 +136,28 @@ func TestBlueprints(t *testing.T) {
http.StatusOK, `{"blueprints":[{"name":"test","description":"Test","modules":[],"packages":[{"name":"systemd","version":"123"}],"version":"0"}],
"changes":[{"name":"test","changed":true}], "errors":[]}`)
}

func TestCompose(t *testing.T) {
buildChannel := make(chan queue.Build, 200)
api := weldr.New(repo, packages, nil, nil, nil, buildChannel)

testRoute(t, api, "POST", "/api/v0/blueprints/new",
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0"}`,
http.StatusOK, `{"status":true}`)

testRoute(t, api, "POST", "/api/v0/compose", `{"blueprint_name": "http-server","compose_type": "tar","branch": "master"}`,
http.StatusBadRequest, `{"status":false,"errors":["blueprint does not exist"]}`)

testRoute(t, api, "POST", "/api/v0/compose", `{"blueprint_name": "test","compose_type": "tar","branch": "master"}`,
http.StatusOK, `{"status":true}`)

build := <-buildChannel
expected_pipeline := `{"pipeline": "string"}`
expected_manifest := `{"output-path": "/var/cache/osbuild"}`
if expected_manifest != build.Manifest {
t.Errorf("Expected this manifest: %s; got this: %s", expected_manifest, build.Manifest)
}
if expected_pipeline != build.Pipeline {
t.Errorf("Expected this manifest: %s; got this: %s", expected_pipeline, build.Pipeline)
}
}
5 changes: 5 additions & 0 deletions internal/weldr/store.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package weldr

import (
"encoding/json"
"fmt"
"log"
"sort"
"sync"
@@ -133,3 +134,7 @@ func (s *store) deleteBlueprintFromWorkspace(name string) {
delete(s.Workspace, name)
})
}

func (b *blueprint) translateToPipeline(outputFormat string) string {
return fmt.Sprintf("{\"pipeline\": \"%s\"}", "string")
}