From 3ad38e22478b3c034c0ffe522d74edbecf2ea8fb Mon Sep 17 00:00:00 2001 From: Michel Vocks Date: Sun, 12 Aug 2018 12:10:38 +0200 Subject: [PATCH 1/2] Implemented pipeline parameters --- cmd/gaia/main.go | 24 ++--- frontend/client/router/index.js | 5 + frontend/client/views/overview/index.vue | 27 +++-- frontend/client/views/pipeline/detail.vue | 25 ++++- frontend/client/views/pipeline/param.vue | 115 ++++++++++++++++++++++ gaia.go | 19 +++- handlers/pipeline.go | 8 +- handlers/pipeline_test.go | 2 +- plugin/plugin.go | 15 ++- scheduler/scheduler.go | 55 ++++++++--- scheduler/scheduler_test.go | 53 ++++++---- services/service_provider.go | 2 +- 12 files changed, 285 insertions(+), 65 deletions(-) create mode 100644 frontend/client/views/pipeline/param.vue diff --git a/cmd/gaia/main.go b/cmd/gaia/main.go index 0db35fe6..b07716a4 100644 --- a/cmd/gaia/main.go +++ b/cmd/gaia/main.go @@ -146,6 +146,18 @@ func main() { os.Exit(1) } + // Initiating Vault + // Check Vault path + if gaia.Cfg.VaultPath == "" { + // Set default to data folder + gaia.Cfg.VaultPath = gaia.Cfg.DataPath + } + _, err = services.VaultService(nil) + if err != nil { + gaia.Cfg.Logger.Error("error initiating vault") + os.Exit(1) + } + // Initialize scheduler _, err = services.SchedulerService() if err != nil { @@ -159,18 +171,6 @@ func main() { os.Exit(1) } - // Initiating Vault - // Check Vault path - if gaia.Cfg.VaultPath == "" { - // Set default to data folder - gaia.Cfg.VaultPath = gaia.Cfg.DataPath - } - _, err = services.VaultService(nil) - if err != nil { - gaia.Cfg.Logger.Error("error initiating vault") - os.Exit(1) - } - // Start ticker. Periodic job to check for new plugins. pipeline.InitTicker() diff --git a/frontend/client/router/index.js b/frontend/client/router/index.js index 182e4904..2c0180c1 100755 --- a/frontend/client/router/index.js +++ b/frontend/client/router/index.js @@ -23,6 +23,11 @@ export default new Router({ name: 'Pipeline Logs', path: '/pipeline/log', component: lazyLoading('pipeline/log') + }, + { + name: 'Pipeline Parameters', + path: '/pipeline/params', + component: lazyLoading('pipeline/params') } ] }) diff --git a/frontend/client/views/overview/index.vue b/frontend/client/views/overview/index.vue index 7125d73b..5ce52048 100755 --- a/frontend/client/views/overview/index.vue +++ b/frontend/client/views/overview/index.vue @@ -38,7 +38,7 @@ unknown
@@ -62,7 +62,8 @@ import moment from 'moment' export default { data () { return { - pipelines: [] + pipelines: [], + pipeline: null } }, @@ -102,13 +103,27 @@ export default { }) }, - startPipeline (pipelineid) { + checkPipelineArgs (pipeline) { + this.pipeline = pipeline + + // check if this pipeline has args + for (let x = 0, y = pipeline.jobs.length; x < y; x++) { + if (pipeline.jobs[x].args && pipeline.jobs[x].args.type !== 'vault') { + // we found args. Redirect user to params view. + this.$router.push({path: '/pipeline/params', query: { pipelineid: pipeline.id }}) + } + } + // No args. Just start pipeline. + this.startPipeline() + }, + + startPipeline () { // Send start request this.$http - .post('/api/v1/pipeline/' + pipelineid + '/start') + .post('/api/v1/pipeline/' + this.pipeline.id + '/start') .then(response => { if (response.data) { - this.$router.push({path: '/pipeline/detail', query: { pipelineid: pipelineid, runid: response.data.id }}) + this.$router.push({path: '/pipeline/detail', query: { pipelineid: this.pipeline.id, runid: response.data.id }}) } }) .catch((error) => { diff --git a/frontend/client/views/pipeline/detail.vue b/frontend/client/views/pipeline/detail.vue index ef860be0..6447826e 100755 --- a/frontend/client/views/pipeline/detail.vue +++ b/frontend/client/views/pipeline/detail.vue @@ -2,7 +2,7 @@
- + @@ -120,7 +120,8 @@ export default { }, arrows: {to: true} } - } + }, + pipeline: null } }, @@ -183,6 +184,7 @@ export default { this.drawPipelineDetail(pipeline.data, pipelineRun.data) } this.runsRows = pipelineRuns.data + this.pipeline = pipeline.data }.bind(this))) .catch((error) => { this.$store.commit('clearIntervals') @@ -201,6 +203,7 @@ export default { if (pipelineRuns.data) { this.runsRows = pipelineRuns.data } + this.pipeline = pipeline.data }.bind(this))) .catch((error) => { this.$store.commit('clearIntervals') @@ -355,13 +358,25 @@ export default { this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID }}) }, - startPipeline (pipelineid) { + checkPipelineArgs () { + // check if this pipeline has args + for (let x = 0, y = this.pipeline.jobs.length; x < y; x++) { + if (this.pipeline.jobs[x].args && this.pipeline.jobs[x].args.type !== 'vault') { + // we found args. Redirect user to params view. + this.$router.push({path: '/pipeline/params', query: { pipelineid: this.pipeline.id }}) + } + } + // No args. Just start pipeline. + this.startPipeline() + }, + + startPipeline () { // Send start request this.$http - .post('/api/v1/pipeline/' + pipelineid + '/start') + .post('/api/v1/pipeline/' + this.pipeline.id + '/start') .then(response => { if (response.data) { - this.$router.push({path: '/pipeline/detail', query: { pipelineid: pipelineid, runid: response.data.id }}) + this.$router.push({path: '/pipeline/detail', query: { pipelineid: this.pipeline.id, runid: response.data.id }}) } }) .catch((error) => { diff --git a/frontend/client/views/pipeline/param.vue b/frontend/client/views/pipeline/param.vue new file mode 100644 index 00000000..ee2f9f8d --- /dev/null +++ b/frontend/client/views/pipeline/param.vue @@ -0,0 +1,115 @@ + + + + + \ No newline at end of file diff --git a/gaia.go b/gaia.go index 25b13db7..31b66cda 100644 --- a/gaia.go +++ b/gaia.go @@ -110,11 +110,20 @@ type GitRepo struct { // Job represents a single job of a pipeline type Job struct { - ID uint32 `json:"id,omitempty"` - Title string `json:"title,omitempty"` - Description string `json:"desc,omitempty"` - DependsOn []*Job `json:"dependson,omitempty"` - Status JobStatus `json:"status,omitempty"` + ID uint32 `json:"id,omitempty"` + Title string `json:"title,omitempty"` + Description string `json:"desc,omitempty"` + DependsOn []*Job `json:"dependson,omitempty"` + Status JobStatus `json:"status,omitempty"` + Args []Argument `json:"args,omitempty"` +} + +// Argument represents a single argument of a job +type Argument struct { + Description string `json:"desc,omitempty"` + Type string `json:"type,omitempty"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` } // CreatePipeline represents a pipeline which is not yet diff --git a/handlers/pipeline.go b/handlers/pipeline.go index 5e0c07e4..0ae962ee 100644 --- a/handlers/pipeline.go +++ b/handlers/pipeline.go @@ -235,11 +235,17 @@ func PipelineDelete(c echo.Context) error { } // PipelineStart starts a pipeline by the given id. +// It accepts arguments for the given pipeline. // Afterwards it returns the created/scheduled pipeline run. func PipelineStart(c echo.Context) error { schedulerService, _ := services.SchedulerService() pipelineIDStr := c.Param("pipelineid") + // Look for arguments. + // We do not check for errors here cause arguments are optional. + args := []gaia.Argument{} + c.Bind(&args) + // Convert string to int because id is int pipelineID, err := strconv.Atoi(pipelineIDStr) if err != nil { @@ -255,7 +261,7 @@ func PipelineStart(c echo.Context) error { } if foundPipeline.Name != "" { - pipelineRun, err := schedulerService.SchedulePipeline(&foundPipeline) + pipelineRun, err := schedulerService.SchedulePipeline(&foundPipeline, args) if err != nil { return c.String(http.StatusBadRequest, err.Error()) } else if pipelineRun != nil { diff --git a/handlers/pipeline_test.go b/handlers/pipeline_test.go index 97fa0b40..f142ec1a 100644 --- a/handlers/pipeline_test.go +++ b/handlers/pipeline_test.go @@ -27,7 +27,7 @@ type mockScheduleService struct { err error } -func (ms *mockScheduleService) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error) { +func (ms *mockScheduleService) SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*gaia.PipelineRun, error) { return ms.pipelineRun, ms.err } diff --git a/plugin/plugin.go b/plugin/plugin.go index c9ae11ca..dc1382bd 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -150,10 +150,21 @@ func (p *Plugin) Connect(command *exec.Cmd, logPath *string) error { // Execute triggers the execution of one single job // for the given plugin. func (p *Plugin) Execute(j *gaia.Job) error { - // Create new proto job object and just set the id. - // The rest is currently not important. + // Transform arguments + args := []*proto.Argument{} + for _, arg := range j.Args { + a := &proto.Argument{ + Key: arg.Key, + Value: arg.Value, + } + + args = append(args, a) + } + + // Create new proto job object. job := &proto.Job{ UniqueId: j.ID, + Args: args, } // Execute the job diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ed759722..94714265 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -25,6 +25,9 @@ const ( // errCircularDep is thrown when a circular dependency has been detected. errCircularDep = "circular dependency detected between %s and %s" + + // argTypeVault is the argument type vault. + argTypeVault = "vault" ) var ( @@ -59,7 +62,7 @@ type Plugin interface { // GaiaScheduler is a job scheduler for gaia pipeline runs. type GaiaScheduler interface { Init() error - SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error) + SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*gaia.PipelineRun, error) SetPipelineJobs(p *gaia.Pipeline) error } @@ -79,16 +82,20 @@ type Scheduler struct { // ca is the instance of the CA used to handle certs. ca security.CAAPI + + // vault is the instance of the vault. + vault security.VaultAPI } // NewScheduler creates a new instance of Scheduler. -func NewScheduler(store store.GaiaStore, pS Plugin, ca security.CAAPI) *Scheduler { +func NewScheduler(store store.GaiaStore, pS Plugin, ca security.CAAPI, vault security.VaultAPI) *Scheduler { // Create new scheduler s := &Scheduler{ scheduledRuns: make(chan gaia.PipelineRun, schedulerBufferLimit), storeService: store, pluginSystem: pS, ca: ca, + vault: vault, } return s @@ -151,16 +158,6 @@ func (s *Scheduler) prepareAndExec(r gaia.PipelineRun) { // Get related pipeline from pipeline run pipeline, _ := s.storeService.PipelineGet(r.PipelineID) - // Get all jobs - r.Jobs, err = s.getPipelineJobs(pipeline) - if err != nil { - gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error()) - - // Finish pipeline run - s.finishPipelineRun(&r, gaia.RunFailed) - return - } - // Check if this pipeline has jobs declared if len(r.Jobs) == 0 { // Finish pipeline run @@ -245,7 +242,7 @@ func (s *Scheduler) schedule() { // SchedulePipeline schedules a pipeline. We create a new schedule object // and save it in our store. The scheduler will later pick this up and will continue the work. -func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error) { +func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*gaia.PipelineRun, error) { // Get highest public id used for this pipeline highestID, err := s.storeService.PipelineGetRunHighestID(p) if err != nil { @@ -263,6 +260,38 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error return nil, err } + // Load secret from vault and set it + err = s.vault.LoadSecrets() + if err != nil { + gaia.Cfg.Logger.Error("cannot load secrets from vault during schedule pipeline", "error", err.Error()) + return nil, err + } + // We have to go through all jobs to find the related arguments. + // We will only pass related arguments to the specific job. + for jobID, job := range jobs { + if job.Args != nil { + for argID, arg := range job.Args { + // check if it's of type vault + if arg.Type == argTypeVault { + // Get & Set argument + s, err := s.vault.Get(arg.Key) + if err != nil { + gaia.Cfg.Logger.Error("cannot find secret with given key in vault", "key", arg.Key, "pipeline", p) + return nil, err + } + jobs[jobID].Args[argID].Value = string(s) + } else { + // Find related argument in given arguments + for _, givenArg := range args { + if arg.Key == givenArg.Key { + jobs[jobID].Args[argID] = givenArg + } + } + } + } + } + } + // Create new not scheduled pipeline run run := gaia.PipelineRun{ UniqueID: uuid.Must(uuid.NewV4(), nil).String(), diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index f132982b..1d29c9c9 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -31,6 +31,15 @@ func (c *CAFake) GenerateTLSConfig(certPath, keyPath string) (*tls.Config, error func (c *CAFake) CleanupCerts(crt, key string) error { return nil } func (c *CAFake) GetCACertPath() (string, string) { return "", "" } +type VaultFake struct{} + +func (v *VaultFake) LoadSecrets() error { return nil } +func (v *VaultFake) GetAll() []string { return []string{} } +func (v *VaultFake) SaveSecrets() error { return nil } +func (v *VaultFake) Add(key string, value []byte) {} +func (v *VaultFake) Remove(key string) {} +func (v *VaultFake) Get(key string) ([]byte, error) { return []byte{}, nil } + func TestInit(t *testing.T) { gaia.Cfg = &gaia.Config{} storeInstance := store.NewBoltStore() @@ -47,9 +56,7 @@ func TestInit(t *testing.T) { if err := storeInstance.Init(); err != nil { t.Fatal(err) } - var ca security.CAAPI - ca = &CAFake{} - s := NewScheduler(storeInstance, &PluginFake{}, ca) + s := NewScheduler(storeInstance, &PluginFake{}, &CAFake{}, &VaultFake{}) err := s.Init() if err != nil { t.Fatal(err) @@ -82,7 +89,7 @@ func TestPrepareAndExecFail(t *testing.T) { } p, r := prepareTestData() storeInstance.PipelinePut(&p) - s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}) + s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}, &VaultFake{}) s.prepareAndExec(r) // get pipeline run from store @@ -121,9 +128,7 @@ func TestPrepareAndExecInvalidType(t *testing.T) { p, r := prepareTestData() p.Type = gaia.PTypeUnknown storeInstance.PipelinePut(&p) - var ca security.CAAPI - ca = &CAFake{} - s := NewScheduler(storeInstance, &PluginFake{}, ca) + s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}, &VaultFake{}) s.prepareAndExec(r) // get pipeline run from store @@ -158,7 +163,7 @@ func TestPrepareAndExecJavaType(t *testing.T) { javaExecuteableName = "go" p.Type = gaia.PTypeJava storeInstance.PipelinePut(&p) - s := NewScheduler(storeInstance, &PluginFake{}, &CAFake{}) + s := NewScheduler(storeInstance, &PluginFake{}, &CAFake{}, &VaultFake{}) s.prepareAndExec(r) // get pipeline run from store @@ -200,14 +205,12 @@ func TestSchedulePipeline(t *testing.T) { } p, _ := prepareTestData() storeInstance.PipelinePut(&p) - var ca security.CAAPI - ca = &CAFake{} - s := NewScheduler(storeInstance, &PluginFake{}, ca) + s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}, &VaultFake{}) err := s.Init() if err != nil { t.Fatal(err) } - _, err = s.SchedulePipeline(&p) + _, err = s.SchedulePipeline(&p, prepareArgs()) if err != nil { t.Fatal(err) } @@ -231,10 +234,8 @@ func TestSchedule(t *testing.T) { } p, _ := prepareTestData() storeInstance.PipelinePut(&p) - var ca security.CAAPI - ca = &CAFake{} - s := NewScheduler(storeInstance, &PluginFake{}, ca) - _, err := s.SchedulePipeline(&p) + s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}, &VaultFake{}) + _, err := s.SchedulePipeline(&p, prepareArgs()) if err != nil { t.Fatal(err) } @@ -265,9 +266,7 @@ func TestSetPipelineJobs(t *testing.T) { } p, _ := prepareTestData() p.Jobs = nil - var ca security.CAAPI - ca = &CAFake{} - s := NewScheduler(storeInstance, &PluginFake{}, ca) + s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}, &VaultFake{}) err := s.SetPipelineJobs(&p) if err != nil { t.Fatal(err) @@ -277,12 +276,27 @@ func TestSetPipelineJobs(t *testing.T) { } } +func prepareArgs() []gaia.Argument { + arg1 := gaia.Argument{ + Description: "First Arg", + Key: "firstarg", + Type: "textfield", + } + arg2 := gaia.Argument{ + Description: "Second Arg", + Key: "secondarg", + Type: "textarea", + } + return []gaia.Argument{arg1, arg2} +} + func prepareJobs() []gaia.Job { job1 := gaia.Job{ ID: hash("Job1"), Title: "Job1", DependsOn: []*gaia.Job{}, Status: gaia.JobWaitingExec, + Args: prepareArgs(), } job2 := gaia.Job{ ID: hash("Job2"), @@ -323,6 +337,7 @@ func prepareTestData() (pipeline gaia.Pipeline, pipelineRun gaia.PipelineRun) { PipelineID: 1, Status: gaia.RunNotScheduled, UniqueID: uuid.Must(uuid.NewV4(), nil).String(), + Jobs: pipeline.Jobs, } return } diff --git a/services/service_provider.go b/services/service_provider.go index ef51b8ce..c68f49cc 100644 --- a/services/service_provider.go +++ b/services/service_provider.go @@ -58,7 +58,7 @@ func SchedulerService() (scheduler.GaiaScheduler, error) { return schedulerService, nil } pS := &plugin.Plugin{} - schedulerService = scheduler.NewScheduler(storeService, pS, certificateService) + schedulerService = scheduler.NewScheduler(storeService, pS, certificateService, vaultService) err := schedulerService.Init() if err != nil { gaia.Cfg.Logger.Error("cannot initialize scheduler:", "error", err.Error()) From 531a815cc3a6ccfb0d513679d662ed613aab9bc3 Mon Sep 17 00:00:00 2001 From: Michel Vocks Date: Sun, 12 Aug 2018 12:57:26 +0200 Subject: [PATCH 2/2] Fixed auto redirect to startPipeline. Fixed reset of already set arguments --- frontend/client/views/overview/index.vue | 4 +++- frontend/client/views/pipeline/detail.vue | 5 +++-- .../client/views/pipeline/{param.vue => params.vue} | 3 +++ plugin/plugin.go | 13 +++++++++++++ 4 files changed, 22 insertions(+), 3 deletions(-) rename frontend/client/views/pipeline/{param.vue => params.vue} (98%) diff --git a/frontend/client/views/overview/index.vue b/frontend/client/views/overview/index.vue index 5ce52048..148eea74 100755 --- a/frontend/client/views/overview/index.vue +++ b/frontend/client/views/overview/index.vue @@ -111,9 +111,11 @@ export default { if (pipeline.jobs[x].args && pipeline.jobs[x].args.type !== 'vault') { // we found args. Redirect user to params view. this.$router.push({path: '/pipeline/params', query: { pipelineid: pipeline.id }}) + return } } - // No args. Just start pipeline. + + // No args. Just start pipeline. this.startPipeline() }, diff --git a/frontend/client/views/pipeline/detail.vue b/frontend/client/views/pipeline/detail.vue index 6447826e..e2209551 100755 --- a/frontend/client/views/pipeline/detail.vue +++ b/frontend/client/views/pipeline/detail.vue @@ -364,9 +364,11 @@ export default { if (this.pipeline.jobs[x].args && this.pipeline.jobs[x].args.type !== 'vault') { // we found args. Redirect user to params view. this.$router.push({path: '/pipeline/params', query: { pipelineid: this.pipeline.id }}) + return } } - // No args. Just start pipeline. + + // No args. Just start pipeline. this.startPipeline() }, @@ -385,7 +387,6 @@ export default { }) } } - } diff --git a/frontend/client/views/pipeline/param.vue b/frontend/client/views/pipeline/params.vue similarity index 98% rename from frontend/client/views/pipeline/param.vue rename to frontend/client/views/pipeline/params.vue index ee2f9f8d..bda20b96 100644 --- a/frontend/client/views/pipeline/param.vue +++ b/frontend/client/views/pipeline/params.vue @@ -57,6 +57,9 @@ export default { } this.pipelineID = pipelineID + // reset args + this.args = [] + this.$http .get('/api/v1/pipeline/' + pipelineID, { showProgressBar: false }) .then(response => { diff --git a/plugin/plugin.go b/plugin/plugin.go index dc1382bd..9077dc25 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -201,6 +201,18 @@ func (p *Plugin) GetJobs() ([]gaia.Job, error) { return nil, err } + // Transform arguments + args := []gaia.Argument{} + for _, arg := range job.Args { + a := gaia.Argument{ + Description: arg.Description, + Key: arg.Key, + Type: arg.Type, + } + + args = append(args, a) + } + // add proto object to separate list to rebuild dep later. pList = append(pList, job) @@ -210,6 +222,7 @@ func (p *Plugin) GetJobs() ([]gaia.Job, error) { Title: job.Title, Description: job.Description, Status: gaia.JobWaitingExec, + Args: args, } l = append(l, j) }