diff --git a/agents/go-agents/core/activity/activity.go b/agents/go-agents/core/activity/activity.go index 68c7e5f0fa1..c9464d1ac16 100644 --- a/agents/go-agents/core/activity/activity.go +++ b/agents/go-agents/core/activity/activity.go @@ -21,8 +21,6 @@ import ( const threshold int64 = 60 var ( - // ActivityTrackingEnabled defines whether activity tracking should be used - ActivityTrackingEnabled = false // Tracker provides workspace activity notification client Tracker WorkspaceActivityTracker = &NoOpActivityTracker{} ) diff --git a/agents/go-agents/exec-agent/exec/events.go b/agents/go-agents/core/process/events.go similarity index 99% rename from agents/go-agents/exec-agent/exec/events.go rename to agents/go-agents/core/process/events.go index cd987e51bc2..7cead79062a 100644 --- a/agents/go-agents/exec-agent/exec/events.go +++ b/agents/go-agents/core/process/events.go @@ -9,7 +9,7 @@ // Codenvy, S.A. - initial API and implementation // -package exec +package process import ( "time" diff --git a/agents/go-agents/core/process/file_logger.go b/agents/go-agents/core/process/file_logger.go index 8842dad993c..e9eb874c691 100644 --- a/agents/go-agents/core/process/file_logger.go +++ b/agents/go-agents/core/process/file_logger.go @@ -83,14 +83,16 @@ func (fl *FileLogger) writeLine(message *LogMessage) { } func (fl *FileLogger) doFlush() { - f, err := os.OpenFile(fl.filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) - if err != nil { - log.Printf("Couldn't open file '%s' for flushing the buffer. %s \n", fl.filename, err.Error()) - } else { - defer closeFile(f) - _, err = fl.buffer.WriteTo(f) + if fl.buffer.Len() > 0 { + f, err := os.OpenFile(fl.filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { - log.Printf("Error appears on flushing data to file '%s'. %s \n", fl.filename, err.Error()) + log.Printf("Couldn't open file '%s' for flushing the buffer. %s \n", fl.filename, err.Error()) + } else { + defer closeFile(f) + _, err = fl.buffer.WriteTo(f) + if err != nil { + log.Printf("Error appears on flushing data to file '%s'. %s \n", fl.filename, err.Error()) + } } } } diff --git a/agents/go-agents/core/process/file_logger_test.go b/agents/go-agents/core/process/file_logger_test.go index c688b3d43cd..db46df39bb6 100644 --- a/agents/go-agents/core/process/file_logger_test.go +++ b/agents/go-agents/core/process/file_logger_test.go @@ -15,7 +15,6 @@ import ( "encoding/json" "io/ioutil" "log" - "math/rand" "os" "testing" "time" @@ -23,8 +22,6 @@ import ( "github.com/eclipse/che/agents/go-agents/core/process" ) -var alphabet = []byte("abcdefgh123456789") - func TestFileLoggerCreatesFileWhenFileDoesNotExist(t *testing.T) { filename := os.TempDir() + string(os.PathSeparator) + randomName(10) defer removeFile(filename) @@ -113,15 +110,6 @@ func TestLogsAreFlushedOnClose(t *testing.T) { failIfDifferent(t, expectedStderr, stderr) } -func randomName(length int) string { - rand.Seed(time.Now().UnixNano()) - bytes := make([]byte, length) - for i := 0; i < length; i++ { - bytes[i] = alphabet[rand.Intn(len(alphabet))] - } - return string(bytes) -} - func removeFile(path string) { if err := os.Remove(path); err != nil { log.Printf("Can't remove file %s. Error: %s", path, err) diff --git a/agents/go-agents/exec-agent/exec/logs_distributor.go b/agents/go-agents/core/process/logs_distributor.go similarity index 99% rename from agents/go-agents/exec-agent/exec/logs_distributor.go rename to agents/go-agents/core/process/logs_distributor.go index 37dc06777ec..f3c195ce0c3 100644 --- a/agents/go-agents/exec-agent/exec/logs_distributor.go +++ b/agents/go-agents/core/process/logs_distributor.go @@ -9,7 +9,7 @@ // Codenvy, S.A. - initial API and implementation // -package exec +package process import ( "errors" diff --git a/agents/go-agents/exec-agent/exec/logs_distributor_test.go b/agents/go-agents/core/process/logs_distributor_test.go similarity index 90% rename from agents/go-agents/exec-agent/exec/logs_distributor_test.go rename to agents/go-agents/core/process/logs_distributor_test.go index 524cfbe1112..9e75d368859 100644 --- a/agents/go-agents/exec-agent/exec/logs_distributor_test.go +++ b/agents/go-agents/core/process/logs_distributor_test.go @@ -9,7 +9,7 @@ // Codenvy, S.A. - initial API and implementation // -package exec_test +package process_test import ( "fmt" @@ -18,14 +18,14 @@ import ( "os" "testing" - "github.com/eclipse/che/agents/go-agents/exec-agent/exec" + "github.com/eclipse/che/agents/go-agents/core/process" ) func TestLogsDistributorCreatesSubdirectories(t *testing.T) { baseDir := os.TempDir() + string(os.PathSeparator) + randomName(10) defer removeAll(baseDir) - distributor := exec.DefaultLogsDistributor{ + distributor := process.DefaultLogsDistributor{ MaxDirsCount: 4, } @@ -45,7 +45,7 @@ func TestLogsDistribution(t *testing.T) { baseDir := os.TempDir() + string(os.PathSeparator) + randomName(10) defer removeAll(baseDir) - distributor := exec.DefaultLogsDistributor{ + distributor := process.DefaultLogsDistributor{ MaxDirsCount: 4, } diff --git a/agents/go-agents/exec-agent/exec/process.go b/agents/go-agents/core/process/process.go similarity index 76% rename from agents/go-agents/exec-agent/exec/process.go rename to agents/go-agents/core/process/process.go index a5384c9e29f..2d7c84a29d7 100644 --- a/agents/go-agents/exec-agent/exec/process.go +++ b/agents/go-agents/core/process/process.go @@ -9,7 +9,7 @@ // Codenvy, S.A. - initial API and implementation // -package exec +package process import ( "errors" @@ -22,7 +22,6 @@ import ( "syscall" "time" - proc "github.com/eclipse/che/agents/go-agents/core/process" "github.com/eclipse/che/agents/go-agents/core/rpc" ) @@ -43,15 +42,45 @@ const ( var ( prevPid uint64 - // LogsDir directory where logs of processes should be stored - LogsDir string + // the directory under which all the logs are written + logsDir string + + // in memory storage of alive & dead processes processes = &processesMap{items: make(map[uint64]*MachineProcess)} - logsDist = NewLogsDistributor() - // ShellInterpreter is shell that executes commands - ShellInterpreter = DefaultShellInterpreter + + // used by process to point to the file for a process with given pid + logsDistributor LogsDistributor = NewLogsDistributor() + + // shell that executes commands + shellInterpreter = DefaultShellInterpreter ) +// SetLogsDir sets the path to the directory to write logs to. +func SetLogsDir(dir string) { + logsDir = dir +} + +// WipeLogs removes logs dir and all the files and directories under it. +func WipeLogs() error { + return os.RemoveAll(logsDir) +} + +// SetLogsDistributor changes the default strategy of logs distribution to a given one. +func SetLogsDistributor(ld LogsDistributor) { + if ld != nil { + logsDistributor = ld + } +} + +// SetShellInterpreter changes the default +// shell interpreter which is '/bin/bash' to the given one. +func SetShellInterpreter(si string) { + if si != "" { + shellInterpreter = si + } +} + // Command represents command that is used in command execution API type Command struct { Name string `json:"name"` @@ -61,7 +90,6 @@ type Command struct { // MachineProcess defines machine process model type MachineProcess struct { - // The virtual id of the process, it is guaranteed that pid // is always unique, while NativePid may occur twice or more(when including dead processes) Pid uint64 `json:"pid"` @@ -96,22 +124,22 @@ type MachineProcess struct { // Stdout/stderr pumper. // If process is not alive then the pumper value is set to nil - pumper *proc.LogsPumper + pumper *LogsPumper // Process subscribers, all the outgoing events are go through those subscribers. // If process is not alive then the subscribers value is set to nil subs []*Subscriber - // Process file logger - fileLogger *proc.FileLogger + // Process file logger. + // The value is set only if process logs directory is configured + fileLogger *FileLogger // Process mutex should be used to sync process data // or block on process related operations such as events publications mutex *sync.RWMutex - // When the process was last time used by client - lastUsed time.Time - lastUsedLock sync.RWMutex + // The time when the process died + deathTime time.Time // Called once before any of process events is published // and after process is started @@ -144,95 +172,101 @@ type processesMap struct { } // Start starts MachineProcess -func Start(process MachineProcess) (MachineProcess, error) { +func Start(newProcess MachineProcess) (MachineProcess, error) { // wrap command to be able to kill child processes see https://github.com/golang/go/issues/8854 - cmd := exec.Command("setsid", ShellInterpreter, "-c", process.CommandLine) + cmd := exec.Command("setsid", shellInterpreter, "-c", newProcess.CommandLine) // getting stdout pipe stdout, err := cmd.StdoutPipe() if err != nil { - return process, err + return newProcess, err } // getting stderr pipe stderr, err := cmd.StderrPipe() if err != nil { - return process, err + return newProcess, err } // starting a new process err = cmd.Start() if err != nil { - return process, err + return newProcess, err } // increment current pid & assign it to the value pid := atomic.AddUint64(&prevPid, 1) - // Figure out the place for logs file - dir, err := logsDist.DirForPid(LogsDir, pid) + // set shared data + newProcess.Pid = pid + newProcess.Alive = true + newProcess.NativePid = cmd.Process.Pid + + // create an internal copy of the new process + internalProcess := newProcess + + pumper := NewPumper(stdout, stderr) + fileLogger, err := newFileLogger(pid) if err != nil { - return process, err + return newProcess, err } - filename := fmt.Sprintf("%s%cpid-%d", dir, os.PathSeparator, pid) - fileLogger, err := proc.NewLogger(filename) - if err != nil { - return process, err + // set internal data + internalProcess.command = cmd + internalProcess.pumper = pumper + internalProcess.mutex = &sync.RWMutex{} + if fileLogger != nil { + internalProcess.fileLogger = fileLogger + internalProcess.logfileName = fileLogger.filename } - // save process - process.Pid = pid - process.Alive = true - process.NativePid = cmd.Process.Pid - process.command = cmd - process.pumper = proc.NewPumper(stdout, stderr) - process.logfileName = filename - process.fileLogger = fileLogger - process.mutex = &sync.RWMutex{} - process.updateLastUsedTime() + // register logs consumers + if fileLogger != nil { + pumper.AddConsumer(fileLogger) + } + pumper.AddConsumer(&internalProcess) + // save(publish) process instance processes.Lock() - processes.items[pid] = &process + processes.items[pid] = &internalProcess processes.Unlock() - // register logs consumers - process.pumper.AddConsumer(fileLogger) - process.pumper.AddConsumer(&process) - - if process.beforeEventsHook != nil { - process.beforeEventsHook(process) + if newProcess.beforeEventsHook != nil { + newProcess.beforeEventsHook(newProcess) } // before pumping is started publish process_started event startPublished := make(chan bool) go func() { - process.notifySubs(newStartedEvent(process), ProcessStatusBit) + internalProcess.notifySubs(newStartedEvent(newProcess), ProcessStatusBit) startPublished <- true }() // start pumping after start event is published 'pumper.Pump' is blocking go func() { <-startPublished - process.pumper.Pump() + pumper.Pump() }() - return process, nil + return newProcess, nil } // Get retrieves process by pid. // If process doesn't exist then error of type NoProcessError is returned. func Get(pid uint64) (MachineProcess, error) { p, ok := directGet(pid) - if ok { - return *p, nil + if !ok { + return MachineProcess{}, noProcess(pid) } - return MachineProcess{}, noProcess(pid) + p.mutex.RLock() + defer p.mutex.RUnlock() + return *p, nil + } // GetProcesses retrieves list of processes. // If parameter all is true then returns all processes, -// otherwice returns only live processes +// otherwise returns only live processes func GetProcesses(all bool) []MachineProcess { processes.RLock() defer processes.RUnlock() @@ -270,22 +304,26 @@ func Kill(pid uint64) error { // ReadLogs reads process logs between [from, till] inclusive. // Returns an error if any error occurs during logs reading. // If process doesn't exist error of type NoProcessError is returned. -func ReadLogs(pid uint64, from time.Time, till time.Time) ([]*proc.LogMessage, error) { +func ReadLogs(pid uint64, from time.Time, till time.Time) ([]*LogMessage, error) { p, ok := directGet(pid) if !ok { return nil, noProcess(pid) } - fl := p.fileLogger - if p.Alive { - fl.Flush() + + p.mutex.RLock() + reader, err := newLogsReader(p, from, till) + p.mutex.RUnlock() + + if err != nil { + return nil, err } - return proc.NewLogsReader(p.logfileName).From(from).Till(till).ReadLogs() + return reader.ReadLogs() } // ReadAllLogs reads all process logs. // Returns an error if any error occurs during logs reading. // If process doesn't exist error of type NoProcessError is returned. -func ReadAllLogs(pid uint64) ([]*proc.LogMessage, error) { +func ReadAllLogs(pid uint64) ([]*LogMessage, error) { return ReadLogs(pid, time.Time{}, time.Now()) } @@ -296,11 +334,13 @@ func RemoveSubscriber(pid uint64, id string) error { if !ok { return noProcess(pid) } + + p.mutex.Lock() + defer p.mutex.Unlock() + if !p.Alive { return notAlive(pid) } - p.mutex.Lock() - defer p.mutex.Unlock() for idx, sub := range p.subs { if sub.ID == id { p.subs = append(p.subs[0:idx], p.subs[idx+1:]...) @@ -348,9 +388,12 @@ func RestoreSubscriber(pid uint64, subscriber Subscriber, after time.Time) error defer p.mutex.Unlock() // Read logs between after and now - logs, err := ReadLogs(pid, after, time.Now()) - if err != nil { - return err + var logs []*LogMessage + reader, err := newLogsReader(p, after, time.Now()) + if err == nil { + if logs, err = reader.ReadLogs(); err != nil { + return err + } } // If process is dead there is no need to subscribe to it @@ -370,7 +413,7 @@ func RestoreSubscriber(pid uint64, subscriber Subscriber, after time.Time) error for i := 0; i < len(logs); i++ { message := logs[i] if message.Time.After(after) { - if message.Kind == proc.StdoutKind { + if message.Kind == StdoutKind { subscriber.Channel <- newStdoutEvent(p.Pid, message.Text, message.Time) } else { subscriber.Channel <- newStderrEvent(p.Pid, message.Text, message.Time) @@ -430,8 +473,10 @@ func (process *MachineProcess) Close() { // Cleanup machine process resources before dead event is sent process.mutex.Lock() process.Alive = false + process.deathTime = time.Now() process.command = nil process.pumper = nil + process.fileLogger = nil process.mutex.Unlock() process.notifySubs(newDiedEvent(*process), ProcessStatusBit) @@ -439,8 +484,6 @@ func (process *MachineProcess) Close() { process.mutex.Lock() process.subs = nil process.mutex.Unlock() - - process.updateLastUsedTime() } func (process *MachineProcess) notifySubs(event *rpc.Event, typeBit uint64) { @@ -457,12 +500,6 @@ func (process *MachineProcess) notifySubs(event *rpc.Event, typeBit uint64) { process.mutex.RUnlock() } -func (process *MachineProcess) updateLastUsedTime() { - process.lastUsedLock.Lock() - process.lastUsed = time.Now() - process.lastUsedLock.Unlock() -} - // Writes to a channel and returns true if write is successful, // otherwise if write to the channel failed e.g. channel is closed then returns false func tryWrite(eventsChan chan *rpc.Event, event *rpc.Event) (ok bool) { @@ -479,12 +516,41 @@ func directGet(pid uint64) (*MachineProcess, bool) { processes.RLock() defer processes.RUnlock() item, ok := processes.items[pid] - if ok { - item.updateLastUsedTime() - } return item, ok } +// Creates a new logs reader for given process. +func newLogsReader(p *MachineProcess, from time.Time, till time.Time) (*LogsReader, error) { + if p.logfileName == "" { + return nil, fmt.Errorf("Logs file for process '%d' is missing", p.Pid) + } + if p.Alive { + p.fileLogger.Flush() + } + return NewLogsReader(p.logfileName).From(from).Till(till), nil +} + +// Creates a new file logger for given pid. +func newFileLogger(pid uint64) (*FileLogger, error) { + if logsDir == "" { + return nil, nil + } + + // Figure out the place for logs file + dir, err := logsDistributor.DirForPid(logsDir, pid) + if err != nil { + return nil, err + } + + // TODO extract file process name generation to the strategy interface(consider LogsDistributor) + filename := fmt.Sprintf("%s%cpid-%d", dir, os.PathSeparator, pid) + fileLogger, err := NewLogger(filename) + if err != nil { + return nil, err + } + return fileLogger, nil +} + // Returns an error indicating that process with given pid doesn't exist func noProcess(pid uint64) *NoProcessError { return &NoProcessError{ diff --git a/agents/go-agents/exec-agent/exec/process_builder.go b/agents/go-agents/core/process/process_builder.go similarity index 99% rename from agents/go-agents/exec-agent/exec/process_builder.go rename to agents/go-agents/core/process/process_builder.go index 7df809c9652..191846984c5 100644 --- a/agents/go-agents/exec-agent/exec/process_builder.go +++ b/agents/go-agents/core/process/process_builder.go @@ -9,7 +9,7 @@ // Codenvy, S.A. - initial API and implementation // -package exec +package process //ProcessBuilder simplifies creation of MachineProcess type ProcessBuilder struct { diff --git a/agents/go-agents/exec-agent/exec/process_cleaner.go b/agents/go-agents/core/process/process_cleaner.go similarity index 67% rename from agents/go-agents/exec-agent/exec/process_cleaner.go rename to agents/go-agents/core/process/process_cleaner.go index 2b1829cd607..932dfc5b7bf 100644 --- a/agents/go-agents/exec-agent/exec/process_cleaner.go +++ b/agents/go-agents/core/process/process_cleaner.go @@ -9,7 +9,7 @@ // Codenvy, S.A. - initial API and implementation // -package exec +package process import ( "log" @@ -33,7 +33,7 @@ func NewCleaner(period int, threshold int) *Cleaner { // CleanPeriodically schedules cleanups of processes that exited // more than CleanupThreshold time before cleanup. -// This function is syncronious. +// This function is synchronous. func (pc *Cleaner) CleanPeriodically() { ticker := time.NewTicker(pc.CleanupPeriod) defer ticker.Stop() @@ -42,13 +42,24 @@ func (pc *Cleaner) CleanPeriodically() { } } -// CleanOnce cleanups processes that exited more than CleanupThreshold time ago +// CleanOnce cleanups processes that died before Time.Now() minus CleanupThreshold. +// +// process1.deathTime = 2 +// process2.deathTime = 5 +// +// death bound now +// v v +// timeline -> 1 --- 2 --- 3 --- 4 --- 5 --- 6 --- 7 -> +// ^ ^ +// process1 process2 +// +// the method execution will remove the process1. func (pc *Cleaner) CleanOnce() { - deadPoint := time.Now().Add(-pc.CleanupThreshold) + deathBound := time.Now().Add(-pc.CleanupThreshold) processes.Lock() for _, mp := range processes.items { - mp.lastUsedLock.RLock() - if !mp.Alive && mp.lastUsed.Before(deadPoint) { + mp.mutex.RLock() + if !mp.Alive && mp.deathTime.Before(deathBound) { delete(processes.items, mp.Pid) if err := os.Remove(mp.logfileName); err != nil { if !os.IsNotExist(err) { @@ -56,7 +67,7 @@ func (pc *Cleaner) CleanOnce() { } } } - mp.lastUsedLock.RUnlock() + mp.mutex.RUnlock() } processes.Unlock() } diff --git a/agents/go-agents/core/process/process_cleaner_test.go b/agents/go-agents/core/process/process_cleaner_test.go new file mode 100644 index 00000000000..843866cd98b --- /dev/null +++ b/agents/go-agents/core/process/process_cleaner_test.go @@ -0,0 +1,74 @@ +// +// Copyright (c) 2012-2017 Codenvy, S.A. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// which accompanies this distribution, and is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// Contributors: +// Codenvy, S.A. - initial API and implementation +// + +package process + +import ( + "sync" + "testing" + "time" +) + +func TestCleanWithZeroThreshold(t *testing.T) { + p1 := &MachineProcess{Pid: 1, mutex: &sync.RWMutex{}, Alive: false, deathTime: time.Now().Add(-time.Hour)} + p2 := &MachineProcess{Pid: 2, mutex: &sync.RWMutex{}, Alive: false, deathTime: time.Now()} + p3 := &MachineProcess{Pid: 3, mutex: &sync.RWMutex{}, Alive: true} + + processes.Lock() + processes.items[1] = p1 + processes.items[2] = p2 + processes.items[3] = p3 + processes.Unlock() + + NewCleaner(0, 0).CleanOnce() + + processMustNotExist(p1.Pid, t) + processMustNotExist(p2.Pid, t) + processMustExist(p3.Pid, t) +} + +func TestCleansOnlyExpiredProcesses(t *testing.T) { + p1 := &MachineProcess{Pid: 1, Alive: false, mutex: &sync.RWMutex{}, deathTime: time.Now().Add(-time.Hour)} + p2 := &MachineProcess{Pid: 2, Alive: false, mutex: &sync.RWMutex{}, deathTime: time.Now().Add(-time.Minute * 45)} + p3 := &MachineProcess{Pid: 3, Alive: false, mutex: &sync.RWMutex{}, deathTime: time.Now().Add(-time.Minute * 15)} + p4 := &MachineProcess{Pid: 4, Alive: true, mutex: &sync.RWMutex{}} + + processes.Lock() + processes.items[1] = p1 + processes.items[2] = p2 + processes.items[3] = p3 + processes.items[4] = p4 + processes.Unlock() + + // cleanup immediately + (&Cleaner{CleanupPeriod: 0, CleanupThreshold: time.Minute * 30}).CleanOnce() + + processMustNotExist(p1.Pid, t) + processMustNotExist(p2.Pid, t) + processMustExist(p3.Pid, t) + processMustExist(p4.Pid, t) +} + +func processMustNotExist(pid uint64, t *testing.T) { + _, err := Get(pid) + if err == nil { + t.Fatalf("Process with id '%d' must not exist", pid) + } + if _, ok := err.(*NoProcessError); !ok { + t.Fatalf("The error must be of type NoProcessError, error message: %s", err.Error()) + } +} + +func processMustExist(pid uint64, t *testing.T) { + if _, err := Get(pid); err != nil { + t.Fatalf("Process with pid '%d' must exist, but error occurred '%s'", pid, err.Error()) + } +} diff --git a/agents/go-agents/exec-agent/exec/process_test.go b/agents/go-agents/core/process/process_test.go similarity index 55% rename from agents/go-agents/exec-agent/exec/process_test.go rename to agents/go-agents/core/process/process_test.go index d9ae8ab0c0e..40453ffaf43 100644 --- a/agents/go-agents/exec-agent/exec/process_test.go +++ b/agents/go-agents/core/process/process_test.go @@ -9,7 +9,7 @@ // Codenvy, S.A. - initial API and implementation // -package exec_test +package process_test import ( "log" @@ -19,9 +19,9 @@ import ( "testing" "time" + "fmt" "github.com/eclipse/che/agents/go-agents/core/process" "github.com/eclipse/che/agents/go-agents/core/rpc" - "github.com/eclipse/che/agents/go-agents/exec-agent/exec" ) const ( @@ -31,11 +31,11 @@ const ( var alphabet = []byte("abcdefgh123456789") func TestOneLineOutput(t *testing.T) { - defer cleanupLogsDir() + defer wipeLogs() // create and start a process - p := startAndWaitTestProcess("echo test", t) + p := startAndWaitTestProcessWritingLogsToTmpDir("echo test", t) - logs, _ := exec.ReadAllLogs(p.Pid) + logs, _ := process.ReadAllLogs(p.Pid) if len(logs) != 1 { t.Fatalf("Expected logs size to be 1, but got %d", len(logs)) @@ -47,13 +47,13 @@ func TestOneLineOutput(t *testing.T) { } func TestEmptyLinesOutput(t *testing.T) { - defer cleanupLogsDir() - p := startAndWaitTestProcess("printf \"\n\n\n\n\n\"", t) + p := startAndWaitTestProcessWritingLogsToTmpDir("printf \"\n\n\n\n\n\"", t) + defer process.WipeLogs() - logs, _ := exec.ReadAllLogs(p.Pid) + logs, _ := process.ReadAllLogs(p.Pid) if len(logs) != 5 { - t.Fatal("Expected logs to be 4 sized") + t.Fatalf("Expected logs to be 5 sized, but the size is '%d'", len(logs)) } for _, value := range logs { @@ -64,22 +64,19 @@ func TestEmptyLinesOutput(t *testing.T) { } func TestAddSubscriber(t *testing.T) { - exec.LogsDir = TmpFile() - defer cleanupLogsDir() - outputLines := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} // create and start a process - pb := exec.NewBuilder(). + pb := process.NewBuilder(). CmdName("test"). CmdType("test"). CmdLine("printf \"" + strings.Join(outputLines, "\n") + "\"") // add a new subscriber eventsChan := make(chan *rpc.Event) - pb.FirstSubscriber(exec.Subscriber{ + pb.FirstSubscriber(process.Subscriber{ ID: "test", - Mask: exec.DefaultMask, + Mask: process.DefaultMask, Channel: eventsChan, }) @@ -93,9 +90,9 @@ func TestAddSubscriber(t *testing.T) { var received []string go func() { event := <-eventsChan - for event.EventType != exec.DiedEventType { - if event.EventType == exec.StdoutEventType { - out := event.Body.(*exec.ProcessOutputEventBody) + for event.EventType != process.DiedEventType { + if event.EventType == process.StdoutEventType { + out := event.Body.(*process.ProcessOutputEventBody) received = append(received, out.Text) } event = <-eventsChan @@ -118,10 +115,9 @@ func TestAddSubscriber(t *testing.T) { } func TestRestoreSubscriberForDeadProcess(t *testing.T) { - exec.LogsDir = TmpFile() - defer cleanupLogsDir() beforeStart := time.Now() - p := startAndWaitTestProcess("echo test", t) + p := startAndWaitTestProcessWritingLogsToTmpDir("echo test", t) + defer process.WipeLogs() // Read all the data from channel channel := make(chan *rpc.Event) @@ -134,7 +130,7 @@ func TestRestoreSubscriberForDeadProcess(t *testing.T) { select { case v := <-channel: received = append(received, v) - if v.EventType == exec.DiedEventType { + if v.EventType == process.DiedEventType { statusReceived = true } case <-time.After(time.Second): @@ -144,9 +140,9 @@ func TestRestoreSubscriberForDeadProcess(t *testing.T) { done <- true }() - _ = exec.RestoreSubscriber(p.Pid, exec.Subscriber{ + _ = process.RestoreSubscriber(p.Pid, process.Subscriber{ ID: "test", - Mask: exec.DefaultMask, + Mask: process.DefaultMask, Channel: channel, }, beforeStart) @@ -156,20 +152,19 @@ func TestRestoreSubscriberForDeadProcess(t *testing.T) { t.Fatalf("Expected to recieve 2 events but got %d", len(received)) } e1Type := received[0].EventType - e1Text := received[0].Body.(*exec.ProcessOutputEventBody).Text - if received[0].EventType != exec.StdoutEventType || e1Text != "test" { + e1Text := received[0].Body.(*process.ProcessOutputEventBody).Text + if received[0].EventType != process.StdoutEventType || e1Text != "test" { t.Fatalf("Expected to receieve output event with text 'test', but got '%s' event with text %s", e1Type, e1Text) } - if received[1].EventType != exec.DiedEventType { + if received[1].EventType != process.DiedEventType { t.Fatal("Expected to get 'process_died' event") } } func TestMachineProcessIsNotAliveAfterItIsDead(t *testing.T) { p := startAndWaitTestProcess(testCmd, t) - defer cleanupLogsDir() if p.Alive { t.Fatal("Process should not be alive") } @@ -177,16 +172,15 @@ func TestMachineProcessIsNotAliveAfterItIsDead(t *testing.T) { func TestItIsNotPossibleToAddSubscriberToDeadProcess(t *testing.T) { p := startAndWaitTestProcess(testCmd, t) - defer cleanupLogsDir() - if err := exec.AddSubscriber(p.Pid, exec.Subscriber{}); err == nil { + if err := process.AddSubscriber(p.Pid, process.Subscriber{}); err == nil { t.Fatal("Should not be able to add subscriber") } } func TestReadProcessLogs(t *testing.T) { - p := startAndWaitTestProcess(testCmd, t) - defer cleanupLogsDir() - logs, err := exec.ReadLogs(p.Pid, time.Time{}, time.Now()) + p := startAndWaitTestProcessWritingLogsToTmpDir(testCmd, t) + defer wipeLogs() + logs, err := process.ReadLogs(p.Pid, time.Time{}, time.Now()) if err != nil { t.Fatal(err) } @@ -202,63 +196,102 @@ func TestReadProcessLogs(t *testing.T) { } } -func startAndWaitTestProcess(cmd string, t *testing.T) *exec.MachineProcess { - exec.LogsDir = TmpFile() - events := make(chan *rpc.Event) - done := make(chan bool) +func TestLogsAreNotWrittenIfLogsDirIsNotSet(t *testing.T) { + p := doStartAndWaitTestProcess(testCmd, "", t) + + _, err := process.ReadAllLogs(p.Pid) + if err == nil { + t.Fatal("Error must be returned in the case when the process doesn't write logs") + } + + expected := fmt.Sprintf("Logs file for process '%d' is missing", p.Pid) + if err.Error() != expected { + t.Fatalf("Expected to get '%s' error but got '%s'", err.Error(), expected) + } +} + +func startAndWaitTestProcess(cmd string, t *testing.T) process.MachineProcess { + return doStartAndWaitTestProcess(cmd, "", t); +} - // Create and start process - pb := exec.NewBuilder(). +func startAndWaitTestProcessWritingLogsToTmpDir(cmd string, t *testing.T) process.MachineProcess { + return doStartAndWaitTestProcess(cmd, tmpFile(), t); +} + +func doStartAndWaitTestProcess(cmd string, logsDir string, t *testing.T) process.MachineProcess { + process.SetLogsDir(logsDir) + p, err := process.NewBuilder(). CmdName("test"). CmdType("test"). CmdLine(cmd). - FirstSubscriber(exec.Subscriber{ - ID: "test", - Mask: exec.DefaultMask, - Channel: events, - }) + Start() + if err != nil { + t.Fatal(err) + } + waitProcessDied(p, t) + + // Check process state after it is finished + result, err := process.Get(p.Pid) + if err != nil { + t.Fatal(err) + } + return result +} +func waitProcessDiedOrFailIfTimeoutReached(mp process.MachineProcess, timeout time.Duration) error { + events := make(chan *rpc.Event) + subscriber := process.Subscriber{ + ID: "test", + Mask: process.DefaultMask, + Channel: events, + } + if err := process.RestoreSubscriber(mp.Pid, subscriber, time.Now()); err != nil { + return err + } + + // wait process.DiedEventType + processDied := make(chan bool) go func() { - statusReceived := false - timeoutReached := false - for !statusReceived && !timeoutReached { - select { - case event := <-events: - if event.EventType == exec.DiedEventType { - statusReceived = true - } - case <-time.After(time.Second): - timeoutReached = true + for { + event, ok := <-events + if !ok { + return + } + if event.EventType == process.DiedEventType { + processDied <- true + return } } - done <- true }() - p, err := pb.Start() - if err != nil { - t.Fatal(err) - } - - // Wait until process is finished or timeout is reached - if ok := <-done; !ok { - t.Fatalf("Expected to receive %s process event", exec.DiedEventType) + // wait either process died or timeout reached + select { + case <-processDied: + return nil + case <-time.After(timeout): + close(processDied) + return fmt.Errorf( + "Process pid='%d' cmd='%s' didn't publish '%s' event before timeout was reached", + mp.Pid, + mp.CommandLine, + process.DiedEventType, + ) } +} - // Check process state after it is finished - result, err := exec.Get(p.Pid) - if err != nil { +func waitProcessDied(mp process.MachineProcess, t *testing.T) { + if err := waitProcessDiedOrFailIfTimeoutReached(mp, time.Second*5); err != nil { t.Fatal(err) } - return &result } -func TmpFile() string { +func tmpFile() string { return os.TempDir() + string(os.PathSeparator) + randomName(10) } -func cleanupLogsDir() { - if err := os.RemoveAll(exec.LogsDir); err != nil { - log.Printf("Can't remove folder %s. Error: %s", exec.LogsDir, err) +func wipeLogs() { + if err := process.WipeLogs(); err != nil { + log.Printf("Could not wipe process logs dir. %s", err.Error()) } } diff --git a/agents/go-agents/exec-agent/exec/common.go b/agents/go-agents/exec-agent/exec/common.go index 65d67d4c7d5..7acd8ade7cd 100644 --- a/agents/go-agents/exec-agent/exec/common.go +++ b/agents/go-agents/exec-agent/exec/common.go @@ -15,6 +15,7 @@ import ( "errors" "strconv" "strings" + "github.com/eclipse/che/agents/go-agents/core/process" ) const ( @@ -27,18 +28,18 @@ func maskFromTypes(types string) uint64 { for _, t := range strings.Split(types, ",") { switch strings.ToLower(strings.TrimSpace(t)) { case "stderr": - mask |= StderrBit + mask |= process.StderrBit case "stdout": - mask |= StdoutBit + mask |= process.StdoutBit case "process_status": - mask |= ProcessStatusBit + mask |= process.ProcessStatusBit } } return mask } func parseTypes(types string) uint64 { - var mask uint64 = DefaultMask + var mask uint64 = process.DefaultMask if types != "" { mask = maskFromTypes(types) } @@ -58,7 +59,7 @@ func parsePid(strPid string) (uint64, error) { } // Checks whether command is valid -func checkCommand(command *Command) error { +func checkCommand(command *process.Command) error { if command.Name == "" { return errors.New("Command name required") } diff --git a/agents/go-agents/exec-agent/exec/process_cleaner_test.go b/agents/go-agents/exec-agent/exec/process_cleaner_test.go deleted file mode 100644 index 2e9077a93d9..00000000000 --- a/agents/go-agents/exec-agent/exec/process_cleaner_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// -// Copyright (c) 2012-2017 Codenvy, S.A. -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// which accompanies this distribution, and is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// Contributors: -// Codenvy, S.A. - initial API and implementation -// - -package exec_test - -import ( - "testing" - "time" - - "github.com/eclipse/che/agents/go-agents/exec-agent/exec" -) - -func TestCleanWithZeroThreshold(t *testing.T) { - p := startAndWaitTestProcess(testCmd, t) - defer cleanupLogsDir() - - exec.NewCleaner(0, 0).CleanOnce() - - _, err := exec.Get(p.Pid) - if err == nil { - t.Fatal("Must not exist") - } - if _, ok := err.(*exec.NoProcessError); !ok { - t.Fatal(err) - } -} - -func TestCleansOnlyUnusedProcesses(t *testing.T) { - p1 := startAndWaitTestProcess(testCmd, t) - p2 := startAndWaitTestProcess(testCmd, t) - - time.Sleep(500 * time.Millisecond) - - // use one of the processes, so it is used now - _, _ = exec.Get(p1.Pid) - - // cleanup immediately - (&exec.Cleaner{CleanupPeriod: 0, CleanupThreshold: 500 * time.Millisecond}).CleanOnce() - - _, err1 := exec.Get(p1.Pid) - _, err2 := exec.Get(p2.Pid) - - // process 1 must be cleaned - if err1 != nil { - t.Fatalf("Expected process 2 to exist, but got an error: %s", err1.Error()) - } - - // process 2 must exist - if _, ok := err2.(*exec.NoProcessError); !ok { - t.Fatal("Expected process 2 to be cleaned") - } -} diff --git a/agents/go-agents/exec-agent/exec/rest_service.go b/agents/go-agents/exec-agent/exec/rest_service.go index 21fa1f5c5c6..267b9f81402 100644 --- a/agents/go-agents/exec-agent/exec/rest_service.go +++ b/agents/go-agents/exec-agent/exec/rest_service.go @@ -66,7 +66,7 @@ var HTTPRoutes = rest.RoutesGroup{ } func startProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { - command := Command{} + command := process.Command{} if err := restutil.ReadJSON(r, &command); err != nil { return err } @@ -76,7 +76,7 @@ func startProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error // If channel is provided then check whether it is ready to be // first process subscriber and use it if it is - var subscriber *Subscriber + var subscriber *process.Subscriber channelID := r.URL.Query().Get("channel") if channelID != "" { channel, ok := rpc.GetChannel(channelID) @@ -84,24 +84,24 @@ func startProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error m := fmt.Sprintf("Channel with id '%s' doesn't exist. Process won't be started", channelID) return rest.NotFound(errors.New(m)) } - subscriber = &Subscriber{ + subscriber = &process.Subscriber{ ID: channelID, Mask: parseTypes(r.URL.Query().Get("types")), Channel: channel.Events, } } - pb := NewBuilder().Cmd(command) + pb := process.NewBuilder().Cmd(command) if subscriber != nil { pb.FirstSubscriber(*subscriber) } - process, err := pb.Start() + proc, err := pb.Start() if err != nil { return err } - return restutil.WriteJSON(w, process) + return restutil.WriteJSON(w, proc) } func getProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { @@ -110,11 +110,11 @@ func getProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { return rest.BadRequest(err) } - process, err := Get(pid) + proc, err := process.Get(pid) if err != nil { return asHTTPError(err) } - return restutil.WriteJSON(w, process) + return restutil.WriteJSON(w, proc) } func killProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error { @@ -122,7 +122,7 @@ func killProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error if err != nil { return rest.BadRequest(err) } - if err := Kill(pid); err != nil { + if err := process.Kill(pid); err != nil { return asHTTPError(err) } return nil @@ -143,7 +143,7 @@ func getProcessLogsHF(w http.ResponseWriter, r *http.Request, p rest.Params) err return err } - logs, err := ReadLogs(logsParams.pid, logsParams.from, logsParams.till) + logs, err := process.ReadLogs(logsParams.pid, logsParams.from, logsParams.till) if err != nil { return asHTTPError(err) } @@ -216,12 +216,12 @@ func getProcessesHF(w http.ResponseWriter, r *http.Request, _ rest.Params) error if err != nil { all = false } - return restutil.WriteJSON(w, GetProcesses(all)) + return restutil.WriteJSON(w, process.GetProcesses(all)) } func asHTTPError(err error) error { - if npErr, ok := err.(*NoProcessError); ok { - return rest.NotFound(npErr.error) + if npErr, ok := err.(*process.NoProcessError); ok { + return rest.NotFound(npErr) } return err } diff --git a/agents/go-agents/exec-agent/exec/ws_service.go b/agents/go-agents/exec-agent/exec/ws_service.go index 9c8df1ce6c4..b7ce307ff45 100644 --- a/agents/go-agents/exec-agent/exec/ws_service.go +++ b/agents/go-agents/exec-agent/exec/ws_service.go @@ -134,7 +134,7 @@ type StartParams struct { func startProcessReqHF(params interface{}, t *rpc.Transmitter) error { startParams := params.(StartParams) - command := Command{ + command := process.Command{ Name: startParams.Name, CommandLine: startParams.CommandLine, Type: startParams.Type, @@ -143,14 +143,14 @@ func startProcessReqHF(params interface{}, t *rpc.Transmitter) error { return rpc.NewArgsError(err) } - _, err := NewBuilder(). + _, err := process.NewBuilder(). Cmd(command). - FirstSubscriber(Subscriber{ + FirstSubscriber(process.Subscriber{ ID: t.Channel.ID, Mask: parseTypes(startParams.EventTypes), Channel: t.Channel.Events, }). - BeforeEventsHook(func(process MachineProcess) { + BeforeEventsHook(func(process process.MachineProcess) { t.Send(process) }). Start() @@ -165,7 +165,7 @@ type KillParams struct { func killProcessReqHF(params interface{}, t *rpc.Transmitter) error { killParams := params.(KillParams) - if err := Kill(killParams.Pid); err != nil { + if err := process.Kill(killParams.Pid); err != nil { return asRPCError(err) } t.Send(&ProcessResult{ @@ -197,14 +197,14 @@ func subscribeReqHF(params interface{}, t *rpc.Transmitter) error { return rpc.NewArgsError(errors.New("Required at least 1 valid event type")) } - subscriber := Subscriber{ + subscriber := process.Subscriber{ ID: t.Channel.ID, Mask: mask, Channel: t.Channel.Events, } // Check whether subscriber should see previous logs or not if subscribeParams.After == "" { - if err := AddSubscriber(subscribeParams.Pid, subscriber); err != nil { + if err := process.AddSubscriber(subscribeParams.Pid, subscriber); err != nil { return asRPCError(err) } } else { @@ -212,7 +212,7 @@ func subscribeReqHF(params interface{}, t *rpc.Transmitter) error { if err != nil { return rpc.NewArgsError(errors.New("Bad format of 'after', " + err.Error())) } - if err := RestoreSubscriber(subscribeParams.Pid, subscriber, after); err != nil { + if err := process.RestoreSubscriber(subscribeParams.Pid, subscriber, after); err != nil { return err } } @@ -231,7 +231,7 @@ type UnsubscribeParams struct { func unsubscribeReqHF(params interface{}, t *rpc.Transmitter) error { unsubscribeParams := params.(UnsubscribeParams) - if err := RemoveSubscriber(unsubscribeParams.Pid, t.Channel.ID); err != nil { + if err := process.RemoveSubscriber(unsubscribeParams.Pid, t.Channel.ID); err != nil { return asRPCError(err) } t.Send(&ProcessResult{ @@ -252,7 +252,7 @@ func updateSubscriberReqHF(params interface{}, t *rpc.Transmitter) error { if updateParams.EventTypes == "" { return rpc.NewArgsError(errors.New("'eventTypes' required for subscriber update")) } - if err := UpdateSubscriber(updateParams.Pid, t.Channel.ID, maskFromTypes(updateParams.EventTypes)); err != nil { + if err := process.UpdateSubscriber(updateParams.Pid, t.Channel.ID, maskFromTypes(updateParams.EventTypes)); err != nil { return asRPCError(err) } t.Send(&SubscribeResult{ @@ -292,7 +292,7 @@ func getProcessLogsReqHF(params interface{}, t *rpc.Transmitter) error { return rpc.NewArgsError(errors.New("Bad format of 'till', " + err.Error())) } - logs, err := ReadLogs(getLogsParams.Pid, from, till) + logs, err := process.ReadLogs(getLogsParams.Pid, from, till) if err != nil { return asRPCError(err) } @@ -328,7 +328,7 @@ type GetProcessParams struct { func getProcessReqHF(body interface{}, t *rpc.Transmitter) error { params := body.(GetProcessParams) - p, err := Get(params.Pid) + p, err := process.Get(params.Pid) if err != nil { return asRPCError(err) } @@ -343,15 +343,15 @@ type GetProcessesParams struct { func getProcessesReqHF(body interface{}, t *rpc.Transmitter) error { params := body.(GetProcessesParams) - t.Send(GetProcesses(params.All)) + t.Send(process.GetProcesses(params.All)) return nil } func asRPCError(err error) error { - if npErr, ok := err.(*NoProcessError); ok { - return rpc.NewError(npErr.error, NoSuchProcessErrorCode) - } else if naErr, ok := err.(*NotAliveError); ok { - return rpc.NewError(naErr.error, ProcessNotAliveErrorCode) + if npErr, ok := err.(*process.NoProcessError); ok { + return rpc.NewError(npErr, NoSuchProcessErrorCode) + } else if naErr, ok := err.(*process.NotAliveError); ok { + return rpc.NewError(naErr, ProcessNotAliveErrorCode) } return err } diff --git a/agents/go-agents/exec-agent/main.go b/agents/go-agents/exec-agent/main.go index a329c7aaef0..f4925227f69 100644 --- a/agents/go-agents/exec-agent/main.go +++ b/agents/go-agents/exec-agent/main.go @@ -20,12 +20,95 @@ import ( "time" "github.com/eclipse/che/agents/go-agents/core/auth" + "github.com/eclipse/che/agents/go-agents/core/process" "github.com/eclipse/che/agents/go-agents/core/rest" "github.com/eclipse/che/agents/go-agents/core/rpc" "github.com/eclipse/che/agents/go-agents/exec-agent/exec" ) var ( + config = &execAgentConfig{} +) + +func init() { + config.registerFlags() +} + +func main() { + flag.Parse() + + log.SetOutput(os.Stdout) + + config.printAll() + + process.SetLogsDir(config.processLogsDir) + process.SetShellInterpreter(config.processShellInterpreter) + + // remove old logs + if err := process.WipeLogs(); err != nil { + log.Fatal(err) + } + + // start cleaner routine + if config.processCleanupPeriodInMinutes > 0 { + if config.processCleanupThresholdInMinutes < 0 { + log.Fatal("Expected process cleanup threshold to be non negative value") + } + cleaner := process.NewCleaner(config.processCleanupPeriodInMinutes, config.processCleanupThresholdInMinutes) + go cleaner.CleanPeriodically() + } + + appHTTPRoutes := []rest.RoutesGroup{ + exec.HTTPRoutes, + rpc.HTTPRoutes, + } + + appOpRoutes := []rpc.RoutesGroup{ + exec.RPCRoutes, + } + + // register routes and http handlers + r := rest.NewDefaultRouter(config.basePath, appHTTPRoutes) + rest.PrintRoutes(appHTTPRoutes) + rpc.RegisterRoutes(appOpRoutes) + rpc.PrintRoutes(appOpRoutes) + + var handler = getHandler(r) + http.Handle("/", handler) + + server := &http.Server{ + Handler: handler, + Addr: config.serverAddress, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + } + log.Fatal(server.ListenAndServe()) +} + +func getHandler(h http.Handler) http.Handler { + // required authentication for all the requests, if it is configured + if config.authEnabled { + cache := auth.NewCache(time.Minute*time.Duration(config.tokensExpirationTimeoutInMinutes), time.Minute*5) + return auth.NewCachingHandler(h, config.apiEndpoint, droppingRPCChannelsUnauthorizedHandler, cache) + } + + return h +} + +func droppingRPCChannelsUnauthorizedHandler(w http.ResponseWriter, req *http.Request, err error) { + token := req.URL.Query().Get("token") + for _, c := range rpc.GetChannels() { + if u, err1 := url.ParseRequestURI(c.RequestURI); err1 != nil { + log.Printf("Couldn't parse the RequestURI '%s' of channel '%s'", c.RequestURI, c.ID) + } else if u.Query().Get("token") == token { + log.Printf("Token for channel '%s' is expired, trying to drop the channel", c.ID) + rpc.DropChannel(c.ID) + } + } + http.Error(w, err.Error(), http.StatusUnauthorized) +} + +type execAgentConfig struct { serverAddress string basePath string apiEndpoint string @@ -33,20 +116,22 @@ var ( authEnabled bool tokensExpirationTimeoutInMinutes uint + processShellInterpreter string + processLogsDir string processCleanupThresholdInMinutes int processCleanupPeriodInMinutes int -) +} -func init() { +func (cfg *execAgentConfig) registerFlags() { // server configuration flag.StringVar( - &serverAddress, + &cfg.serverAddress, "addr", ":9000", "IP:PORT or :PORT the address to start the server on", ) flag.StringVar( - &basePath, + &cfg.basePath, "path", "", `the base path for all the rpc & rest routes, so route paths are treated not @@ -59,7 +144,7 @@ func init() { // workspace master server configuration flag.StringVar( - &apiEndpoint, + &cfg.apiEndpoint, "api-endpoint", os.Getenv("CHE_API"), `api-endpoint used by exec-agent modules(such as authentication) @@ -68,13 +153,13 @@ func init() { // auth configuration flag.BoolVar( - &authEnabled, + &cfg.authEnabled, "enable-auth", false, "whether authenicate requests on workspace master before allowing them to proceed", ) flag.UintVar( - &tokensExpirationTimeoutInMinutes, + &cfg.tokensExpirationTimeoutInMinutes, "tokens-expiration-timeout", auth.DefaultTokensExpirationTimeoutInMinutes, "how much time machine tokens stay in cache(if auth is enabled)", @@ -82,18 +167,18 @@ func init() { // process executor configuration flag.StringVar( - &exec.ShellInterpreter, + &cfg.processShellInterpreter, "cmd", - "/bin/bash", + process.DefaultShellInterpreter, "shell interpreter", ) flag.IntVar( - &processCleanupPeriodInMinutes, + &cfg.processCleanupPeriodInMinutes, "process-cleanup-period", -1, "how often processs cleanup job will be executed(in minutes)", ) - flag.IntVar(&processCleanupThresholdInMinutes, + flag.IntVar(&cfg.processCleanupThresholdInMinutes, "process-cleanup-threshold", -1, `how much time will dead and unused process stay(in minutes), @@ -106,103 +191,30 @@ func init() { } curDir += string(os.PathSeparator) + "logs" flag.StringVar( - &exec.LogsDir, + &cfg.processLogsDir, "logs-dir", curDir, "base directory for process logs", ) } -func main() { - flag.Parse() - - log.SetOutput(os.Stdout) - - printConfiguration() - - // remove old logs - if err := os.RemoveAll(exec.LogsDir); err != nil { - log.Fatal(err) - } - - // start cleaner routine - if processCleanupPeriodInMinutes > 0 { - if processCleanupThresholdInMinutes < 0 { - log.Fatal("Expected process cleanup threshold to be non negative value") - } - cleaner := exec.NewCleaner(processCleanupPeriodInMinutes, processCleanupThresholdInMinutes) - go cleaner.CleanPeriodically() - } - - appHTTPRoutes := []rest.RoutesGroup{ - exec.HTTPRoutes, - rpc.HTTPRoutes, - } - - appOpRoutes := []rpc.RoutesGroup{ - exec.RPCRoutes, - } - - // register routes and http handlers - r := rest.NewDefaultRouter(basePath, appHTTPRoutes) - rest.PrintRoutes(appHTTPRoutes) - rpc.RegisterRoutes(appOpRoutes) - rpc.PrintRoutes(appOpRoutes) - - var handler = getHandler(r) - http.Handle("/", handler) - - server := &http.Server{ - Handler: handler, - Addr: serverAddress, - WriteTimeout: 10 * time.Second, - ReadTimeout: 10 * time.Second, - } - log.Fatal(server.ListenAndServe()) -} - -func getHandler(h http.Handler) http.Handler { - // required authentication for all the requests, if it is configured - if authEnabled { - cache := auth.NewCache(time.Minute*time.Duration(tokensExpirationTimeoutInMinutes), time.Minute*5) - return auth.NewCachingHandler(h, apiEndpoint, droppingRPCChannelsUnauthorizedHandler, cache) - } - - return h -} - -func droppingRPCChannelsUnauthorizedHandler(w http.ResponseWriter, req *http.Request, err error) { - token := req.URL.Query().Get("token") - for _, c := range rpc.GetChannels() { - if u, err1 := url.ParseRequestURI(c.RequestURI); err1 != nil { - log.Printf("Couldn't parse the RequestURI '%s' of channel '%s'", c.RequestURI, c.ID) - } else if u.Query().Get("token") == token { - log.Printf("Token for channel '%s' is expired, trying to drop the channel", c.ID) - rpc.DropChannel(c.ID) - } - } - http.Error(w, err.Error(), http.StatusUnauthorized) -} - -func printConfiguration() { +func (cfg *execAgentConfig) printAll() { log.Println("Exec-agent configuration") log.Println(" Server") - log.Printf(" - Address: %s\n", serverAddress) - log.Printf(" - Base path: '%s'\n", basePath) - if authEnabled { + log.Printf(" - Address: %s\n", cfg.serverAddress) + log.Printf(" - Base path: '%s'\n", cfg.basePath) + if cfg.authEnabled { log.Println(" Authentication") - log.Printf(" - Enabled: %t\n", authEnabled) - log.Printf(" - Tokens expiration timeout: %dm\n", tokensExpirationTimeoutInMinutes) + log.Printf(" - Enabled: %t\n", cfg.authEnabled) + log.Printf(" - Tokens expiration timeout: %dm\n", cfg.tokensExpirationTimeoutInMinutes) + log.Println(" Workspace master server") + log.Printf(" - API endpoint: %s\n", cfg.apiEndpoint) } log.Println(" Process executor") - log.Printf(" - Logs dir: %s\n", exec.LogsDir) - if processCleanupPeriodInMinutes > 0 { - log.Printf(" - Cleanup job period: %dm\n", processCleanupPeriodInMinutes) - log.Printf(" - Not used & dead processes stay for: %dm\n", processCleanupThresholdInMinutes) - } - if authEnabled { - log.Println(" Workspace master server") - log.Printf(" - API endpoint: %s\n", apiEndpoint) + log.Printf(" - Logs dir: %s\n", cfg.processLogsDir) + if cfg.processCleanupPeriodInMinutes > 0 { + log.Printf(" - Cleanup job period: %dm\n", cfg.processCleanupPeriodInMinutes) + log.Printf(" - Not used & dead processes stay for: %dm\n", cfg.processCleanupThresholdInMinutes) } log.Println() } diff --git a/agents/go-agents/terminal-agent/main.go b/agents/go-agents/terminal-agent/main.go index 5ab645245a4..7ee1ef83995 100644 --- a/agents/go-agents/terminal-agent/main.go +++ b/agents/go-agents/terminal-agent/main.go @@ -35,25 +35,85 @@ import ( ) var ( + config = &terminalAgentConfig{} +) + +func init() { + config.registerFlags() +} + +func main() { + flag.Parse() + + log.SetOutput(os.Stdout) + + config.printAll() + + term.Cmd = config.shellInterpreter + + if config.activityTrackingEnabled { + activity.Tracker = activity.NewTracker(config.workspaceID, config.apiEndpoint) + go activity.Tracker.StartTracking() + } + + appHTTPRoutes := []rest.RoutesGroup{ + term.HTTPRoutes, + } + + // register routes and http handlers + r := rest.NewDefaultRouter(config.basePath, appHTTPRoutes) + rest.PrintRoutes(appHTTPRoutes) + + var handler = getHandler(r) + http.Handle("/", handler) + + server := &http.Server{ + Handler: handler, + Addr: config.serverAddress, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + } + log.Fatal(server.ListenAndServe()) +} + +func getHandler(h http.Handler) http.Handler { + // required authentication for all the requests, if it is configured + if config.authEnabled { + cache := auth.NewCache(time.Minute*time.Duration(config.tokensExpirationTimeoutInMinutes), time.Minute*5) + return auth.NewCachingHandler(h, config.apiEndpoint, droppingTerminalConnectionsUnauthorizedHandler, cache) + } + + return h +} + +func droppingTerminalConnectionsUnauthorizedHandler(w http.ResponseWriter, req *http.Request, err error) { + // TODO disconnect all the clients with the same token if authentication returned unauthorized. +} + +type terminalAgentConfig struct { serverAddress string basePath string apiEndpoint string + activityTrackingEnabled bool + + shellInterpreter string + workspaceID string authEnabled bool tokensExpirationTimeoutInMinutes uint -) +} -func init() { +func (cfg *terminalAgentConfig) registerFlags() { // server configuration flag.StringVar( - &serverAddress, + &cfg.serverAddress, "addr", ":9000", "IP:PORT or :PORT the address to start the server on", ) flag.StringVar( - &basePath, + &cfg.basePath, "path", "", `the base path for all the rpc & rest routes, so route paths are treated not @@ -66,13 +126,13 @@ func init() { // terminal configuration flag.StringVar( - &term.Cmd, + &cfg.shellInterpreter, "cmd", "/bin/bash", "shell interpreter and command to execute on slave side of the pty", ) flag.BoolVar( - &activity.ActivityTrackingEnabled, + &cfg.activityTrackingEnabled, "enable-activity-tracking", false, "whether workspace master will be notified about workspace activity", @@ -80,7 +140,7 @@ func init() { // workspace master server configuration flag.StringVar( - &apiEndpoint, + &cfg.apiEndpoint, "api-endpoint", os.Getenv("CHE_API"), `api-endpoint used by terminal-agent modules(such as activity checker or authentication) @@ -89,83 +149,37 @@ func init() { // auth configuration flag.BoolVar( - &authEnabled, + &cfg.authEnabled, "enable-auth", false, "whether authenicate requests on workspace master before allowing them to proceed", ) flag.UintVar( - &tokensExpirationTimeoutInMinutes, + &cfg.tokensExpirationTimeoutInMinutes, "tokens-expiration-timeout", auth.DefaultTokensExpirationTimeoutInMinutes, "how much time machine tokens stay in cache(if auth is enabled)", ) - workspaceID = os.Getenv("CHE_WORKSPACE_ID") -} - -func main() { - flag.Parse() - - log.SetOutput(os.Stdout) - - printConfiguration() - - if activity.ActivityTrackingEnabled { - activity.Tracker = activity.NewTracker(workspaceID, apiEndpoint) - go activity.Tracker.StartTracking() - } - - appHTTPRoutes := []rest.RoutesGroup{ - term.HTTPRoutes, - } - - // register routes and http handlers - r := rest.NewDefaultRouter(basePath, appHTTPRoutes) - rest.PrintRoutes(appHTTPRoutes) - - var handler = getHandler(r) - http.Handle("/", handler) - - server := &http.Server{ - Handler: handler, - Addr: serverAddress, - WriteTimeout: 10 * time.Second, - ReadTimeout: 10 * time.Second, - } - log.Fatal(server.ListenAndServe()) -} - -func getHandler(h http.Handler) http.Handler { - // required authentication for all the requests, if it is configured - if authEnabled { - cache := auth.NewCache(time.Minute*time.Duration(tokensExpirationTimeoutInMinutes), time.Minute*5) - return auth.NewCachingHandler(h, apiEndpoint, droppingTerminalConnectionsUnauthorizedHandler, cache) - } - - return h -} - -func droppingTerminalConnectionsUnauthorizedHandler(w http.ResponseWriter, req *http.Request, err error) { - // TODO disconnect all the clients with the same token if authentication returned unauthorized. + cfg.workspaceID = os.Getenv("CHE_WORKSPACE_ID") } -func printConfiguration() { +func (cfg *terminalAgentConfig) printAll() { log.Println("Terminal-agent configuration") log.Println(" Server") - log.Printf(" - Address: %s\n", serverAddress) - log.Printf(" - Base path: '%s'\n", basePath) + log.Printf(" - Address: %s\n", cfg.serverAddress) + log.Printf(" - Base path: '%s'\n", cfg.basePath) log.Println(" Terminal") log.Printf(" - Slave command: '%s'\n", term.Cmd) - log.Printf(" - Activity tracking enabled: %t\n", activity.ActivityTrackingEnabled) - if authEnabled { + log.Printf(" - Activity tracking enabled: %t\n", cfg.activityTrackingEnabled) + if cfg.authEnabled { log.Println(" Authentication") - log.Printf(" - Enabled: %t\n", authEnabled) - log.Printf(" - Tokens expiration timeout: %dm\n", tokensExpirationTimeoutInMinutes) + log.Printf(" - Enabled: %t\n", cfg.authEnabled) + log.Printf(" - Tokens expiration timeout: %dm\n", cfg.tokensExpirationTimeoutInMinutes) } - if authEnabled || activity.ActivityTrackingEnabled { + if cfg.authEnabled || cfg.activityTrackingEnabled { log.Println(" Workspace master server") - log.Printf(" - API endpoint: %s\n", apiEndpoint) + log.Printf(" - API endpoint: %s\n", cfg.apiEndpoint) } log.Println() }