diff --git a/.circleci/config.yml b/.circleci/config.yml index e5eae7f6..d0f23b1a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -32,7 +32,7 @@ jobs: echo "" > coverage.txt for d in $(go list ./... | grep -v vendor | grep -v /testacc); do - go test -v -timeout 20s -race -coverprofile=profile.out -covermode=atomic $d + go test -v -timeout 50s -race -coverprofile=profile.out -covermode=atomic $d if [ -f profile.out ]; then cat profile.out >> coverage.txt rm profile.out diff --git a/Makefile b/Makefile index 2eb42ac9..f6dc0999 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ RELEASE_NAME=${NAME} HELM_DIR=$(shell pwd)/helm TEST=$$(go list ./... | grep -v /vendor/ | grep /testacc) TEST_TIMEOUT_ACC?=20m -TEST_TIMEOUT?=40s +TEST_TIMEOUT?=50s default: dev diff --git a/plugin/plugin.go b/plugin/plugin.go index 58ab9995..8148f6b5 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -8,6 +8,7 @@ import ( "io" "os" "os/exec" + "sync" "time" "github.com/gaia-pipeline/gaia" @@ -39,6 +40,34 @@ var pluginMap = map[string]plugin.Plugin{ // timeFormat is the logging time format. const timeFormat = "2006/01/02 15:04:05" +// GaiaLogWriter represents a concurrent safe log writer which can be shared with go-plugin. +type GaiaLogWriter struct { + mu sync.RWMutex + buffer *bytes.Buffer + writer *bufio.Writer +} + +// Write locks and writes to the underlying writer. +func (g *GaiaLogWriter) Write(p []byte) (n int, err error) { + g.mu.Lock() + defer g.mu.Unlock() + return g.writer.Write(p) +} + +// Flush locks and flushes the underlying writer. +func (g *GaiaLogWriter) Flush() error { + g.mu.Lock() + defer g.mu.Unlock() + return g.writer.Flush() +} + +// WriteString locks and passes on the string to write to the underlying writer. +func (g *GaiaLogWriter) WriteString(s string) (int, error) { + g.mu.Lock() + defer g.mu.Unlock() + return g.writer.WriteString(s) +} + // GoPlugin represents a single plugin instance which uses gRPC // to connect to exactly one plugin. type GoPlugin struct { @@ -55,8 +84,7 @@ type GoPlugin struct { logFile *os.File // Writer used to write logs from execution to file or buffer - writer *bufio.Writer - buffer *bytes.Buffer + logger GaiaLogWriter // CA instance used to handle certificates ca security.CAAPI @@ -108,6 +136,9 @@ func (p *GoPlugin) NewPlugin(ca security.CAAPI) Plugin { // It's up to the caller to call plugin.Close to shutdown the plugin // and close the gRPC connection. func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error { + // Initialise the logger + p.logger = GaiaLogWriter{} + // Create log file and open it. // We will close this file in the close method. if logPath != nil { @@ -122,11 +153,11 @@ func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error { } // Create new writer - p.writer = bufio.NewWriter(p.logFile) + p.logger.writer = bufio.NewWriter(p.logFile) } else { // If no path is provided, write output to buffer - p.buffer = new(bytes.Buffer) - p.writer = bufio.NewWriter(p.buffer) + p.logger.buffer = new(bytes.Buffer) + p.logger.writer = bufio.NewWriter(p.logger.buffer) } // Create and sign a new pair of certificates for the server @@ -161,15 +192,15 @@ func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error { Plugins: pluginMap, Cmd: command, AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, - Stderr: p.writer, + Stderr: &p.logger, TLSConfig: tlsConfig, }) // Connect via gRPC p.clientProtocol, err = p.client.Client() if err != nil { - _ = p.writer.Flush() - return fmt.Errorf("%s\n\n--- output ---\n%s", err.Error(), p.buffer.String()) + _ = p.logger.Flush() + return fmt.Errorf("%s\n\n--- output ---\n%s", err.Error(), p.logger.buffer.String()) } return nil @@ -227,7 +258,7 @@ func (p *GoPlugin) Execute(j *gaia.Job) error { // Generate error message and attach it to logs. timeString := time.Now().Format(timeFormat) - _, _ = p.writer.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, resultObj.Message)) + _, _ = p.logger.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, resultObj.Message)) } else if err != nil { // An error occurred during the send or somewhere else. // The job itself usually does not return an error here. @@ -236,7 +267,7 @@ func (p *GoPlugin) Execute(j *gaia.Job) error { // Generate error message and attach it to logs. timeString := time.Now().Format(timeFormat) - _, _ = p.writer.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, err.Error())) + _, _ = p.logger.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, err.Error())) } else { j.Status = gaia.JobSuccess } @@ -319,7 +350,7 @@ func (p *GoPlugin) GetJobs() ([]*gaia.Job, error) { // FlushLogs flushes the logs. func (p *GoPlugin) FlushLogs() error { - return p.writer.Flush() + return p.logger.Flush() } // Close shutdown the plugin and kills the gRPC connection. @@ -332,7 +363,7 @@ func (p *GoPlugin) Close() { p.client.Kill() // Flush the writer - _ = p.writer.Flush() + _ = p.logger.Flush() // Close log file _ = p.logFile.Close() diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index 433b11b6..20d64010 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -124,7 +124,8 @@ func TestExecute(t *testing.T) { }) p := &GoPlugin{pluginConn: new(fakeGaiaPlugin)} buf := new(bytes.Buffer) - p.writer = bufio.NewWriter(buf) + p.logger = GaiaLogWriter{} + p.logger.writer = bufio.NewWriter(buf) j := &gaia.Job{ Args: []*gaia.Argument{ { @@ -148,7 +149,8 @@ func TestGetJobs(t *testing.T) { }) p := &GoPlugin{pluginConn: new(fakeGaiaPlugin)} buf := new(bytes.Buffer) - p.writer = bufio.NewWriter(buf) + p.logger = GaiaLogWriter{} + p.logger.writer = bufio.NewWriter(buf) _, err := p.GetJobs() if err != nil { t.Fatal(err) diff --git a/workers/pipeline/update_pipeline.go b/workers/pipeline/update_pipeline.go index 40a23a90..b24eb1a4 100644 --- a/workers/pipeline/update_pipeline.go +++ b/workers/pipeline/update_pipeline.go @@ -18,7 +18,7 @@ var ( // pythonPipInstallCmd is the command used to install the python distribution // package. - pythonPipInstallCmd = ". bin/activate; python -m pip install %s.tar.gz" + pythonPipInstallCmd = ". bin/activate; python -m pip install '%s.tar.gz'" // Ruby gem binary name. rubyGemName = "gem"