Skip to content

Commit

Permalink
One pipeline run now starts exactly one pipeline process. Added sched…
Browse files Browse the repository at this point in the history
…uler tests. Introduced an interface for pipeline which makes testing easier. Removed single job log view.
  • Loading branch information
michelvocks committed Jul 17, 2018
1 parent 22375b5 commit ab48faa
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 261 deletions.
6 changes: 5 additions & 1 deletion cmd/gaia/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/handlers"
"github.com/gaia-pipeline/gaia/pipeline"
"github.com/gaia-pipeline/gaia/plugin"
scheduler "github.com/gaia-pipeline/gaia/scheduler"
"github.com/gaia-pipeline/gaia/store"
hclog "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -132,8 +133,11 @@ func main() {
os.Exit(1)
}

// Create new plugin system
pS := &plugin.Plugin{}

// Initialize scheduler
scheduler := scheduler.NewScheduler(store)
scheduler := scheduler.NewScheduler(store, pS)
err = scheduler.Init()
if err != nil {
gaia.Cfg.Logger.Error("cannot initialize scheduler:", "error", err.Error())
Expand Down
13 changes: 1 addition & 12 deletions frontend/client/views/pipeline/detail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ export default {
}
],
runsRows: [],
job: null,
pipelineViewOptions: {
physics: { stabilization: true },
layout: {
Expand Down Expand Up @@ -333,11 +332,6 @@ export default {
// Create vis network
// We have to move out the instance out of vue because of https://github.com/almende/vis/issues/2567
window.pipelineView = new Vis.Network(container, data, this.pipelineViewOptions)
// Create an selectNode event
window.pipelineView.on('selectNode', function (params) {
this.job = this.nodes.get(params.nodes[0])
}.bind(this))
}
},
Expand All @@ -358,13 +352,8 @@ export default {
},
jobLog () {
var jobid = null
if (this.job) {
jobid = this.job.internalID
}
// Route
this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID, jobid: jobid }})
this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID }})
},
startPipeline (pipelineid) {
Expand Down
51 changes: 16 additions & 35 deletions frontend/client/views/pipeline/log.vue
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ export default {
logText: '',
jobRunning: true,
runID: null,
pipelineID: null,
jobID: null,
currentPath: ''
pipelineID: null
}
},
Expand All @@ -36,64 +34,47 @@ export default {
this.fetchData()
// periodically update dashboard
this.intervalID = setInterval(function () {
var intervalID = setInterval(function () {
this.fetchData()
}.bind(this), 3000)
this.currentPath = this.$route.path
// Append interval id to store
this.$store.commit('appendInterval', intervalID)
},
watch: {
'$route': 'fetchData'
},
destroyed () {
this.$store.commit('clearIntervals')
},
components: {
Message
},
methods: {
fetchData () {
if (this.$route.path !== this.currentPath) {
this.$store.commit('clearIntervals')
}
// look up required url parameters
this.pipelineID = this.$route.query.pipelineid
this.runID = this.$route.query.runid
if (!this.runID || !this.pipelineID) {
return
}
// job id is optional. If ommitted, all logs from all jobs
// are displayed.
this.jobID = this.$route.query.jobid
this.$http
.get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', {
showProgressBar: false,
params: {
jobid: this.jobID
}
})
.get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', { showProgressBar: false })
.then(response => {
if (response.data) {
// Check if we got multiple objects
var finished = true
this.logText = ''
for (let i = 0, l = response.data.length; i < l; i++) {
// We add the received log
this.logText += response.data[i].log
// LF does not work for HTML. Replace with <br />
this.logText = this.logText.replace(/\n/g, '<br />')
// Job not finished?
if (!response.data[i].finished) {
finished = false
}
}
// We add the received log
this.logText = response.data.log
// LF does not work for HTML. Replace with <br />
this.logText = this.logText.replace(/\n/g, '<br />')
// All jobs finished. Stop interval.
if (finished && response.data.length > 0) {
if (response.data.finished) {
this.jobRunning = false
clearInterval(this.intervalID)
}
Expand Down
3 changes: 3 additions & 0 deletions gaia.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ const (

// LogsFolderName represents the Name of the logs folder in pipeline run folder
LogsFolderName = "logs"

// LogsFileName represents the file name of the logs output
LogsFileName = "output.log"
)

// User is the user object
Expand Down
95 changes: 14 additions & 81 deletions handlers/pipeline_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net/http"
"os"
"path/filepath"
"sort"
"strconv"

"github.com/gaia-pipeline/gaia"
Expand Down Expand Up @@ -84,20 +83,15 @@ func PipelineGetLatestRun(c echo.Context) error {
return c.JSON(http.StatusOK, run)
}

// GetJobLogs returns jobs for a given job.
// If no jobID is given, a collection of all jobs logs will be returned.
// GetJobLogs returns logs from a pipeline run.
//
// Required parameters:
// pipelineid - Related pipeline id
// pipelinerunid - Related pipeline run id
//
// Optional parameters:
// jobid - Job id
func GetJobLogs(c echo.Context) error {
// Get parameters and validate
pipelineID := c.Param("pipelineid")
pipelineRunID := c.Param("runid")
jobID := c.QueryParam("jobid")

// Transform pipelineid to int
p, err := strconv.Atoi(pipelineID)
Expand All @@ -111,92 +105,31 @@ func GetJobLogs(c echo.Context) error {
return c.String(http.StatusBadRequest, "invalid pipeline run id given")
}

// Get pipeline run from store
run, err := storeService.PipelineGetRunByPipelineIDAndID(p, r)
if err != nil {
return c.String(http.StatusBadRequest, "cannot find pipeline run with given pipeline id and pipeline run id")
}

// jobID is not empty, just return the logs from this job
if jobID != "" {
for _, job := range run.Jobs {
if strconv.FormatUint(uint64(job.ID), 10) == jobID {
// Get logs
jL, err := getLogs(pipelineID, pipelineRunID, jobID, false)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}

// Check if job is finished
if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed {
jL.Finished = true
}

// We always return an array.
// It makes a bit easier in the frontend.
jobLogsList := []jobLogs{}
jobLogsList = append(jobLogsList, *jL)
return c.JSON(http.StatusOK, jobLogsList)
}
}
// Create return object
jL := jobLogs{}

// Logs for given job id not found
return c.String(http.StatusBadRequest, "cannot find job with given job id")
// Determine if job has been finished
if run.Status == gaia.RunFailed || run.Status == gaia.RunSuccess {
jL.Finished = true
}

// Sort the slice. This is important for the order of the returned logs.
sort.Slice(run.Jobs, func(i, j int) bool {
return run.Jobs[i].Priority < run.Jobs[j].Priority
})

// Return a collection of all logs
jobs := []jobLogs{}
for _, job := range run.Jobs {
// Get logs
jL, err := getLogs(pipelineID, pipelineRunID, strconv.FormatUint(uint64(job.ID), 10), true)
// Check if log file exists
logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, gaia.LogsFileName)
if _, err := os.Stat(logFilePath); err == nil {
content, err := ioutil.ReadFile(logFilePath)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}

// No error but also no job logs. Job must be in the queue.
// We skip it so no error will break things.
if jL == nil {
continue
}

// Check if job is finished
if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed {
jL.Finished = true
return c.String(http.StatusInternalServerError, "cannot read pipeline run log file")
}

jobs = append(jobs, *jL)
// Convert logs
jL.Log = string(content)
}

// Return logs
return c.JSON(http.StatusOK, jobs)
}

func getLogs(pipelineID, pipelineRunID, jobID string, getAllJobLogs bool) (*jobLogs, error) {
// Lookup log file
logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, jobID)

// We only check if logs exist when a specific job log was requested.
// If we don't do this, get all job logs will fail during a pipeline run.
if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
if !getAllJobLogs {
return nil, err
}
return nil, nil
}

// Read file
content, err := ioutil.ReadFile(logFilePath)
if err != nil {
return nil, err
}

// Create return struct
return &jobLogs{
Log: string(content),
}, nil
return c.JSON(http.StatusOK, jL)
}
32 changes: 18 additions & 14 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"

"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/scheduler"
"github.com/gaia-pipeline/protobuf"
plugin "github.com/hashicorp/go-plugin"
)
Expand Down Expand Up @@ -45,23 +46,29 @@ type Plugin struct {

// NewPlugin creates a new instance of Plugin.
// One Plugin instance represents one connection to a plugin.
//
// It expects the start command to start the plugin and the log path (including file)
// where the output should be logged to.
func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) {
// Allocate
p = &Plugin{}
func (p *Plugin) NewPlugin() scheduler.Plugin {
return &Plugin{}
}

// Connect prepares the log path, starts the plugin, initiates the
// gRPC connection and looks up the plugin.
// It's up to the caller to call plugin.Close to shutdown the plugin
// and close the gRPC connection.
//
// It expects the start command for the plugin and the path where
// the log file should be stored.
func (p *Plugin) Connect(command *exec.Cmd, logPath *string) error {
// Create log file and open it.
// We will close this file in the close method.
if logPath != nil {
var err error
p.logFile, err = os.OpenFile(
*logPath,
os.O_CREATE|os.O_WRONLY,
0666,
)
if err != nil {
return nil, err
return err
}
}

Expand All @@ -77,13 +84,6 @@ func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) {
Stderr: p.writer,
})

return p, nil
}

// Connect starts the plugin, initiates the gRPC connection and looks up the plugin.
// It's up to the caller to call plugin.Close to shutdown the plugin
// and close the gRPC connection.
func (p *Plugin) Connect() error {
// Connect via gRPC
gRPCClient, err := p.client.Client()
if err != nil {
Expand Down Expand Up @@ -116,6 +116,10 @@ func (p *Plugin) Execute(j *gaia.Job) error {

// Execute the job
_, err := p.pluginConn.ExecuteJob(job)

// Flush logs
p.writer.Flush()

return err
}

Expand Down
Loading

0 comments on commit ab48faa

Please sign in to comment.