Skip to content

Commit

Permalink
Bug fixing for pipeline run. Looks already ok :-)
Browse files Browse the repository at this point in the history
  • Loading branch information
michelvocks committed Mar 6, 2018
1 parent 70ca7eb commit 3c17fc6
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 67 deletions.
2 changes: 1 addition & 1 deletion frontend/client/views/overview/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export default {
.get('/api/v1/pipelines/start/' + pipelineid)
.then(response => {
if (response.data) {
this.$router.push({path: '/pipelines/detail', query: { pipelineid: pipelineid, runid: response.data.id }})
this.$router.push({path: '/pipelines/detail', query: { pipelineid: pipelineid, runid: response.data.id }})
}
})
.catch(error => {
Expand Down
150 changes: 89 additions & 61 deletions frontend/client/views/pipelines/detail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,41 @@ export default {
data () {
return {
pipelineView: null
pipelineView: null,
nodes: null,
edges: null,
lastRedraw: false,
pipelineViewOptions: {
physics: { stabilization: true },
layout: {
hierarchical: {
enabled: true,
levelSeparation: 200,
direction: 'LR',
sortMethod: 'directed'
}
},
nodes: {
borderWidth: 4,
size: 40,
color: {
border: '#222222'
},
font: { color: '#eeeeee' }
},
edges: {
smooth: {
type: 'cubicBezier',
forceDirection: 'vertical',
roundness: 0.4
},
color: {
color: 'whitesmoke',
highlight: '#4da2fc'
},
arrows: {to: true}
}
}
}
},
Expand All @@ -43,26 +77,37 @@ export default {
// runID is optional
var runID = this.$route.query.runid
// Get all information from this specific pipeline
var pipeline = null
this.$http
.get('/api/v1/pipelines/detail/' + pipelineID)
.then(response => {
this.pipeline = response.data
})
// If runid was set, look up this run
var pipelineRun = null
if (runID) {
this.$http
.get('/api/v1/pipelines/detail/' + pipelineID + '/' + runID)
.then(response => {
this.pipelineRun = response.data
// Run ID specified. Do concurrent request
this.$http.all([this.getPipeline(pipelineID), this.getPipelineRun(pipelineID, runID)])
.then(this.$http.spread(function (pipeline, pipelineRun) {
// We only redraw the pipeline if pipeline is running
if (pipelineRun.data.status !== 'running' && !this.lastRedraw) {
this.drawPipelineDetail(pipeline.data, pipelineRun.data)
this.lastRedraw = true
} else if (pipelineRun.data.status === 'running') {
this.lastRedraw = false
this.drawPipelineDetail(pipeline.data, pipelineRun.data)
}
}.bind(this)))
} else {
this.getPipeline(pipelineID)
.then((response) => {
if (!this.lastRedraw) {
this.drawPipelineDetail(response.data, null)
this.lastRedraw = true
}
})
}
},
getPipeline (pipelineID) {
return this.$http.get('/api/v1/pipelines/detail/' + pipelineID, { showProgressBar: false })
},
// Draw pipeline view
this.drawPipelineDetail(pipeline, pipelineRun)
getPipelineRun (pipelineID, runID) {
return this.$http.get('/api/v1/pipelines/detail/' + pipelineID + '/' + runID, { showProgressBar: false })
},
drawPipelineDetail (pipeline, pipelineRun) {
Expand All @@ -73,23 +118,25 @@ export default {
} else {
jobs = pipeline.jobs
}
// prepare data object for vis
var data = {
nodes: [],
edges: []
}
console.log(pipeline)
console.log(pipelineRun)
console.log(jobs)
// Initiate data structure
var nodesArray = []
var edgesArray = []
// Iterate all jobs of the pipeline
for (let i = 0, l = jobs.length; i < l; i++) {
// Choose the image for this node
var nodeImage = require('assets/questionmark.png')
if (jobs[i].status) {
switch (jobs[i].status) {
case 'success':
case 'success':
nodeImage = require('assets/success.png')
break
case 'failed':
nodeImage = require('assets/failed.png')
nodeImage = require('assets/fail.png')
break
}
}
Expand All @@ -102,7 +149,7 @@ export default {
}
// Add node to nodes list
data.nodes.push(node)
nodesArray.push(node)
// Iterate all jobs again to find the next highest job priority
var highestPrio = null
Expand All @@ -123,53 +170,34 @@ export default {
}
// add edge to edges list
data.edges.push(edge)
edgesArray.push(edge)
}
}
}
}
// Define vis options
var options = {
physics: { stabilization: true },
layout: {
hierarchical: {
enabled: true,
levelSeparation: 200,
direction: 'LR',
sortMethod: 'directed'
}
},
nodes: {
borderWidth: 4,
size: 40,
color: {
border: '#222222'
},
font: { color: '#eeeeee' }
},
edges: {
smooth: {
type: 'cubicBezier',
forceDirection: 'vertical',
roundness: 0.4
},
color: {
color: 'whitesmoke',
highlight: '#4da2fc'
},
arrows: {to: true}
}
}
// If pipelineView already exist, just update it
if (this.pipelineView) {
// Redraw
this.pipelineView.Redraw()
this.nodes.clear()
this.edges.clear()
this.nodes.add(nodesArray)
this.edges.add(edgesArray)
this.pipelineView.stabilize()
} else {
// translate to vis data structure
this.nodes = new Vis.DataSet(nodesArray)
this.edges = new Vis.DataSet(edgesArray)
// prepare data object for vis
var data = {
nodes: this.nodes,
edges: this.edges
}
// Find container
var container = document.getElementById('pipeline-detail')
this.pipelineView = new Vis.Network(container, data, options)
this.pipelineView = new Vis.Network(container, data, this.pipelineViewOptions)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions gaia.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const (

// JobFailed status
JobFailed JobStatus = "failed"

// JobRunning status
JobRunning JobStatus = "running"
)

// User is the user object
Expand Down
2 changes: 1 addition & 1 deletion handlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func InitHandlers(i *iris.Application, store *store.Store, scheduler *scheduler.
i.Post(p+"pipelines/name", PipelineNameAvailable)
i.Get(p+"pipelines", PipelineGetAll)
i.Get(p+"pipelines/detail/{id:string}", PipelineGet)
i.Get(p+"pipelines/detail/{pipelineid:string}/{runid:string}", PipelineRunGet)
i.Get(p+"pipelines/start/{id:string}", PipelineStart)
i.Get(p+"pipelines/run/{pipelineid:string}/{runid:string}", PipelineRunGet)

return nil
}
23 changes: 19 additions & 4 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,20 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error
// increment by one
highestID++

// Get jobs
jobs, err := s.getPipelineJobs(p)
if err != nil {
gaia.Cfg.Logger.Error("cannot get pipeline jobs during schedule", "error", err.Error(), "pipeline", p)
return nil, err
}

// Create new not scheduled pipeline run
run := gaia.PipelineRun{
UniqueID: uuid.Must(uuid.NewV4()).String(),
ID: highestID,
PipelineID: p.ID,
ScheduleDate: time.Now(),
Jobs: jobs,
Status: gaia.RunNotScheduled,
}

Expand All @@ -170,8 +178,7 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error

// executePipeline executes the given pipeline and updates it status periodically.
func (s *Scheduler) executePipeline(p *gaia.Pipeline, r *gaia.PipelineRun) {
// Set pessimistic values
r.Status = gaia.RunFailed
// Set start time
r.RunDate = time.Now()

// Get all jobs
Expand All @@ -181,6 +188,7 @@ func (s *Scheduler) executePipeline(p *gaia.Pipeline, r *gaia.PipelineRun) {
gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error())

// Update store
r.Status = gaia.RunFailed
s.storeService.PipelinePutRun(r)
return
}
Expand Down Expand Up @@ -210,13 +218,14 @@ func executeJob(job *gaia.Job, p *gaia.Pipeline, wg *sync.WaitGroup, triggerSave
return
}

// Lets be pessimistic
job.Status = gaia.JobFailed
// Set Job to running
job.Status = gaia.JobRunning

// Create the start command for the pipeline
c := createPipelineCmd(p)
if c == nil {
gaia.Cfg.Logger.Debug("cannot execute pipeline job", "error", errCreateCMDForPipeline.Error(), "job", job)
job.Status = gaia.JobFailed
return
}

Expand All @@ -226,6 +235,7 @@ func executeJob(job *gaia.Job, p *gaia.Pipeline, wg *sync.WaitGroup, triggerSave
// Connect to plugin(pipeline)
if err := pC.Connect(); err != nil {
gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", p)
job.Status = gaia.JobFailed
return
}
defer pC.Close()
Expand All @@ -234,6 +244,7 @@ func executeJob(job *gaia.Job, p *gaia.Pipeline, wg *sync.WaitGroup, triggerSave
if err := pC.Execute(job); err != nil {
// TODO: Show it to user
gaia.Cfg.Logger.Debug("error during job execution", "error", err.Error(), "job", job)
job.Status = gaia.JobFailed
}

// If we are here, the job execution was ok
Expand Down Expand Up @@ -287,6 +298,8 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline
for _, job := range r.Jobs {
switch job.Status {
case gaia.JobFailed:
r.Status = gaia.RunFailed
s.storeService.PipelinePutRun(r)
return
case gaia.JobWaitingExec:
notExecJob = true
Expand All @@ -295,6 +308,8 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline

// All jobs have been executed
if !notExecJob {
r.Status = gaia.RunSuccess
s.storeService.PipelinePutRun(r)
return
}

Expand Down

0 comments on commit 3c17fc6

Please sign in to comment.