Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job-queue: pass a well-formed job object to the worker #7

Merged
merged 1 commit into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
type Job struct {
ComposeID string
Pipeline pipeline.Pipeline
Target target.Target
Targets []target.Target
}
2 changes: 1 addition & 1 deletion internal/job/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *Store) UpdateJob(id string, job Job) bool {
req, _ := s.jobs[id]
req.ComposeID = job.ComposeID
req.Pipeline = job.Pipeline
req.Target = job.Target
req.Targets = job.Targets

return true
}
Expand Down
31 changes: 14 additions & 17 deletions internal/jobqueue/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func New(logger *log.Logger, jobs <-chan job.Job) *API {
api.router.NotFound = http.HandlerFunc(notFoundHandler)

api.router.POST("/job-queue/v1/jobs", api.addJobHandler)
api.router.PATCH("/job-queue/v1/jobs/:job-id", api.updateJobHandler)
api.router.PATCH("/job-queue/v1/jobs/:id", api.updateJobHandler)

return api
}
Expand Down Expand Up @@ -77,11 +77,11 @@ func statusResponseError(writer http.ResponseWriter, code int, errors ...string)

func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
type requestBody struct {
JobID string `json:"job-id"`
ID string `json:"id"`
}
type replyBody struct {
Pipeline pipeline.Pipeline `json:"pipeline"`
Target target.Target `json:"target"`
Targets []target.Target `json:"targets"`
}

contentType := request.Header["Content-Type"]
Expand All @@ -93,22 +93,22 @@ func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request,
var body requestBody
err := json.NewDecoder(request.Body).Decode(&body)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid job-id: "+err.Error())
statusResponseError(writer, http.StatusBadRequest, "invalid id: "+err.Error())
return
}

id := body.JobID
var req job.Job
id := body.ID
var jobSlot job.Job

if !api.jobStore.AddJob(id, req) {
if !api.jobStore.AddJob(id, jobSlot) {
statusResponseError(writer, http.StatusBadRequest)
return
}

req = <-api.pendingJobs
api.jobStore.UpdateJob(id, req)
nextJob := <-api.pendingJobs
api.jobStore.UpdateJob(id, nextJob)

json.NewEncoder(writer).Encode(replyBody{req.Pipeline, req.Target})
json.NewEncoder(writer).Encode(replyBody{nextJob.Pipeline, nextJob.Targets})

}

Expand All @@ -127,15 +127,12 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque
err := json.NewDecoder(request.Body).Decode(&body)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error())
return
} else if body.Status == "running" {
statusResponseOK(writer)
return
} else if body.Status != "finished" {
} else if body.Status == "finished" {
api.jobStore.DeleteJob(params.ByName("id"))
statusResponseOK(writer)
} else {
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+body.Status)
return
}

api.jobStore.DeleteJob(params.ByName("job-id"))
statusResponseOK(writer)
}
24 changes: 19 additions & 5 deletions internal/jobqueue/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func testRoute(t *testing.T, api *jobqueue.API, method, path, body string, expec
}

func TestBasic(t *testing.T) {
expected_job := `{"pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/var/lib/osbuild-composer/ffffffff-ffff–ffff-ffff-ffffffffffff"}}]}`
var cases = []struct {
Method string
Path string
Expand All @@ -76,8 +77,8 @@ func TestBasic(t *testing.T) {
{"PATH", "/job-queue/v1/foo", ``, http.StatusNotFound, ``},
{"DELETE", "/job-queue/v1/foo", ``, http.StatusNotFound, ``},

{"POST", "/job-queue/v1/jobs", `{"job-id":"ffffffff-ffff–ffff-ffff-ffffffffffff"}`, http.StatusOK, `{"pipeline":"pipeline","target":"target"}`},
{"POST", "/job-queue/v1/jobs", `{"job-id":"ffffffff-ffff–ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``},
{"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff–ffff-ffff-ffffffffffff"}`, http.StatusOK, expected_job},
{"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff–ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``},
//{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"finished"}`, http.StatusBadRequest, ``},
{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``},
{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``},
Expand All @@ -90,10 +91,23 @@ func TestBasic(t *testing.T) {
api := jobqueue.New(nil, jobChannel)
for _, c := range cases {
jobChannel <- job.Job{
ComposeID: "ID",
Pipeline: pipeline.Pipeline("pipeline"),
Target: target.Target("target"),
ComposeID: "ffffffff-ffff–ffff-ffff-ffffffffffff",
Pipeline: pipeline.Pipeline{
Assembler: pipeline.Assembler{
Name: "org.osbuild.tar",
Options: pipeline.AssemblerTarOptions{
Filename: "image.tar",
},
},
},
Targets: []target.Target{{
Name: "org.osbuild.local",
Options: target.LocalOptions{
Location: "/var/lib/osbuild-composer/ffffffff-ffff–ffff-ffff-ffffffffffff",
}},
},
}

testRoute(t, api, c.Method, c.Path, c.Body, c.ExpectedStatus, c.ExpectedJSON)
}
}
17 changes: 16 additions & 1 deletion internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
package pipeline

type Pipeline string
type Pipeline struct {
Stages []Stage `json:"stages,omitempty"`
Assembler Assembler `json:"assembler"`
}

type Stage struct {
}

type Assembler struct {
Name string `json:"name"`
Options AssemblerTarOptions `json:"options"`
}

type AssemblerTarOptions struct {
Filename string `json:"filename"`
}
9 changes: 8 additions & 1 deletion internal/target/target.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
package target

type Target string
type Target struct {
Name string `json:"name"`
Options LocalOptions `json:"options"`
}

type LocalOptions struct {
Location string `json:"location"`
}
12 changes: 10 additions & 2 deletions internal/weldr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"osbuild-composer/internal/job"
"osbuild-composer/internal/rpmmd"
"osbuild-composer/internal/target"
)

type API struct {
Expand Down Expand Up @@ -618,11 +619,18 @@ func (api *API) composeHandler(writer http.ResponseWriter, httpRequest *http.Req
changed := false
found := api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed?

uuid := "ffffffff-ffff–ffff-ffff-ffffffffffff" // TODO: generate

if found {
api.pendingJobs <- job.Job{
ComposeID: "TODO",
ComposeID: uuid,
Pipeline: bp.translateToPipeline(cr.ComposeType),
Target: `{"output-path":"/var/cache/osbuild-composer"}`,
Targets: []target.Target{{
Name: "org.osbuild.local",
Options: target.LocalOptions{
Location: "/var/lib/osbuild-composer/" + uuid,
}},
},
}
} else {
statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist")
Expand Down
12 changes: 4 additions & 8 deletions internal/weldr/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"testing"

"osbuild-composer/internal/job"
"osbuild-composer/internal/pipeline"
"osbuild-composer/internal/rpmmd"
"osbuild-composer/internal/target"
"osbuild-composer/internal/weldr"
)

Expand Down Expand Up @@ -157,12 +155,10 @@ func TestCompose(t *testing.T) {
http.StatusOK, `{"status":true}`)

job := <-jobChannel
expected_pipeline := pipeline.Pipeline(`{"pipeline":"string"}`)
expected_target := target.Target(`{"output-path":"/var/cache/osbuild-composer"}`)
if expected_target != job.Target {
t.Errorf("Expected this manifest: %s; got this: %s", expected_target, job.Target)
if job.Pipeline.Assembler.Name != "org.osbuild.tar" {
t.Errorf("Expected tar assembler, got: %s", job.Pipeline.Assembler.Name)
}
if expected_pipeline != job.Pipeline {
t.Errorf("Expected this manifest: %s; got this: %s", expected_pipeline, job.Pipeline)
if job.Targets[0].Name != "org.osbuild.local" {
t.Errorf("Expected local target, got: %s", job.Targets[0].Name)
}
}
10 changes: 8 additions & 2 deletions internal/weldr/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package weldr

import (
"encoding/json"
"fmt"
"log"
"osbuild-composer/internal/pipeline"
"sort"
Expand Down Expand Up @@ -158,5 +157,12 @@ func (s *store) deleteBlueprintFromWorkspace(name string) {
}

func (b *blueprint) translateToPipeline(outputFormat string) pipeline.Pipeline {
return pipeline.Pipeline(fmt.Sprintf("{\"pipeline\":\"%s\"}", "string"))
return pipeline.Pipeline{
Assembler: pipeline.Assembler{
Name: "org.osbuild.tar",
Options: pipeline.AssemblerTarOptions{
Filename: "image.tar",
},
},
}
}