diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d681c510ed2..3794277441c 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -48,12 +48,7 @@ type AllocRunner struct { restored map[string]struct{} taskLock sync.RWMutex - // taskReceivedTimer is used to mitigate updates sent to the server because - // we expect that shortly after receiving an alloc it will transistion - // state. We use a timer to send the update if this hasn't happened after a - // reasonable time. - taskReceivedTimer *time.Timer - taskStatusLock sync.RWMutex + taskStatusLock sync.RWMutex updateCh chan *structs.Allocation @@ -312,25 +307,13 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv taskState.State = state r.appendTaskEvent(taskState, event) - // We don't immediately mark ourselves as dirty, since in most cases there - // will immediately be another state transistion. This reduces traffic to - // the server. - if event != nil && event.Type == structs.TaskReceived { - if r.taskReceivedTimer == nil { - r.taskReceivedTimer = time.AfterFunc(taskReceivedSyncLimit, func() { - // Send a dirty signal to sync our state. - select { - case r.dirtyCh <- struct{}{}: - default: - } - }) + // If the task failed, we should kill all the other tasks in the task group. + if state == structs.TaskStateDead && taskState.Failed() { + for task, tr := range r.tasks { + if task != taskName { + tr.Destroy() + } } - return - } - - // Cancel any existing received state timer. - if r.taskReceivedTimer != nil { - r.taskReceivedTimer.Stop() } select { diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f9c90577d3b..b0df609dd13 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -283,7 +283,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] task.Config["command"] = "/bin/sleep" - task.Config["args"] = []string{"10"} + task.Config["args"] = []string{"1000"} go ar.Run() testutil.WaitForResult(func() (bool, error) { diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index 93a4be4f0e6..af8beaa0c69 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -36,9 +36,8 @@ type FileRotator struct { logger *log.Logger purgeCh chan struct{} doneCh chan struct{} - - closed bool - closedLock sync.Mutex + closed bool + closedLock sync.Mutex } // NewFileRotator returns a new file rotator @@ -208,6 +207,8 @@ func (f *FileRotator) Close() { } // Stop the purge go routine + f.closedLock.Lock() + defer f.closedLock.Unlock() if !f.closed { f.doneCh <- struct{}{} close(f.purgeCh) diff --git a/client/task_runner.go b/client/task_runner.go index 7c50ba8e0f5..c435922532c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -270,9 +270,8 @@ func (r *TaskRunner) run() { err := fmt.Errorf("task directory couldn't be found") r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name) - - // Non-restartable error - return + r.restartTracker.SetStartError(err) + goto RESTART } for _, artifact := range r.task.Artifacts {