diff --git a/client/allocrunner/taskrunner/script_check_hook.go b/client/allocrunner/taskrunner/script_check_hook.go new file mode 100644 index 00000000000..a7d8935ba79 --- /dev/null +++ b/client/allocrunner/taskrunner/script_check_hook.go @@ -0,0 +1,370 @@ +package taskrunner + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/hashicorp/consul/api" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/taskenv" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/nomad/structs" +) + +var _ interfaces.TaskPoststartHook = &scriptCheckHook{} +var _ interfaces.TaskUpdateHook = &scriptCheckHook{} +var _ interfaces.TaskStopHook = &scriptCheckHook{} + +// default max amount of time to wait for all scripts on shutdown. +const defaultShutdownWait = time.Minute + +type scriptCheckHookConfig struct { + alloc *structs.Allocation + task *structs.Task + consul consul.ConsulServiceAPI + logger log.Logger + shutdownWait time.Duration +} + +// scriptCheckHook implements a task runner hook for running script +// checks in the context of a task +type scriptCheckHook struct { + consul consul.ConsulServiceAPI + allocID string + taskName string + logger log.Logger + shutdownWait time.Duration // max time to wait for scripts to shutdown + shutdownCh chan struct{} // closed when all scripts should shutdown + + // The following fields can be changed by Update() + driverExec tinterfaces.ScriptExecutor + taskEnv *taskenv.TaskEnv + + // These maintain state + scripts map[string]*scriptCheck + runningScripts map[string]*taskletHandle + + // Since Update() may be called concurrently with any other hook all + // hook methods must be fully serialized + mu sync.Mutex +} + +func newScriptCheckHook(c scriptCheckHookConfig) *scriptCheckHook { + scriptChecks := make(map[string]*scriptCheck) + for _, service := range c.task.Services { + for _, check := range service.Checks { + if check.Type != structs.ServiceCheckScript { + continue + } + sc := newScriptCheck(&scriptCheckConfig{ + allocID: c.alloc.ID, + taskName: c.task.Name, + check: check, + service: service, + agent: c.consul, + }) + scriptChecks[sc.id] = sc + } + } + + // Walk back through the task group to see if there are script checks + // associated with the task. If so, we'll create scriptCheck tasklets + // for them. The group-level service and any check restart behaviors it + // needs are entirely encapsulated within the group service hook which + // watches Consul for status changes. + tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup) + for _, service := range tg.Services { + for _, check := range service.Checks { + if check.Type != structs.ServiceCheckScript { + continue + } + if check.TaskName != c.task.Name { + continue + } + groupTaskName := "group-" + tg.Name + sc := newScriptCheck(&scriptCheckConfig{ + allocID: c.alloc.ID, + taskName: groupTaskName, + service: service, + check: check, + agent: c.consul, + }) + scriptChecks[sc.id] = sc + } + } + + h := &scriptCheckHook{ + consul: c.consul, + allocID: c.alloc.ID, + taskName: c.task.Name, + scripts: scriptChecks, + runningScripts: make(map[string]*taskletHandle), + shutdownWait: defaultShutdownWait, + shutdownCh: make(chan struct{}), + } + + if c.shutdownWait != 0 { + h.shutdownWait = c.shutdownWait // override for testing + } + h.logger = c.logger.Named(h.Name()) + return h +} + +func (h *scriptCheckHook) Name() string { + return "script_checks" +} + +// PostStart implements interfaces.TaskPoststartHook. It adds the current +// task context (driver and env) to the script checks and starts up the +// scripts. +func (h *scriptCheckHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + + if req.DriverExec == nil { + return fmt.Errorf("driver doesn't support script checks") + } + + // Store the TaskEnv for interpolating now and when Updating + h.driverExec = req.DriverExec + h.taskEnv = req.TaskEnv + h.scripts = h.getTaskScriptChecks() + + // Handle starting scripts + for checkID, script := range h.scripts { + // If it's already running, cancel and replace + if oldScript, running := h.runningScripts[checkID]; running { + oldScript.cancel() + } + // Start and store the handle + h.runningScripts[checkID] = script.run() + } + return nil +} + +// Updated implements interfaces.TaskUpdateHook. It adds the current +// task context (driver and env) to the script checks and replaces any +// that have been changed. +func (h *scriptCheckHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + + // Get current script checks with request's driver metadata as it + // can't change due to Updates + oldScriptChecks := h.getTaskScriptChecks() + + task := req.Alloc.LookupTask(h.taskName) + if task == nil { + return fmt.Errorf("task %q not found in updated alloc", h.taskName) + } + + // Update service hook fields + h.taskEnv = req.TaskEnv + + // Create new script checks struct with those new values + newScriptChecks := h.getTaskScriptChecks() + + // Handle starting scripts + for checkID, script := range newScriptChecks { + if _, ok := oldScriptChecks[checkID]; ok { + // If it's already running, cancel and replace + if oldScript, running := h.runningScripts[checkID]; running { + oldScript.cancel() + } + // Start and store the handle + h.runningScripts[checkID] = script.run() + } + } + + // Cancel scripts we no longer want + for checkID := range oldScriptChecks { + if _, ok := newScriptChecks[checkID]; !ok { + if oldScript, running := h.runningScripts[checkID]; running { + oldScript.cancel() + } + } + } + return nil +} + +// Stop implements interfaces.TaskStopHook and blocks waiting for running +// scripts to finish (or for the shutdownWait timeout to expire). +func (h *scriptCheckHook) Stop(ctx context.Context, req *interfaces.TaskStopRequest, resp *interfaces.TaskStopResponse) error { + h.mu.Lock() + defer h.mu.Unlock() + close(h.shutdownCh) + deadline := time.After(h.shutdownWait) + err := fmt.Errorf("timed out waiting for script checks to exit") + for _, script := range h.runningScripts { + select { + case <-script.wait(): + case <-ctx.Done(): + // the caller is passing the background context, so + // we should never really see this outside of testing + case <-deadline: + // at this point the Consul client has been cleaned + // up so we don't want to hang onto this. + return err + } + } + return nil +} + +// getTaskScriptChecks returns an interpolated copy of services and checks with +// values from the task's environment. +func (h *scriptCheckHook) getTaskScriptChecks() map[string]*scriptCheck { + // Guard against not having a valid taskEnv. This can be the case if the + // PreKilling or Exited hook is run before Poststart. + if h.taskEnv == nil || h.driverExec == nil { + return nil + } + newChecks := make(map[string]*scriptCheck) + for _, orig := range h.scripts { + sc := orig.Copy() + sc.exec = h.driverExec + sc.logger = h.logger + sc.shutdownCh = h.shutdownCh + sc.callback = newScriptCheckCallback(sc) + sc.Command = h.taskEnv.ReplaceEnv(orig.Command) + sc.Args = h.taskEnv.ParseAndReplace(orig.Args) + newChecks[sc.id] = sc + } + return newChecks +} + +// heartbeater is the subset of consul agent functionality needed by script +// checks to heartbeat +type heartbeater interface { + UpdateTTL(id, output, status string) error +} + +// scriptCheck runs script checks via a interfaces.ScriptExecutor and updates the +// appropriate check's TTL when the script succeeds. +type scriptCheck struct { + id string + agent heartbeater + lastCheckOk bool // true if the last check was ok; otherwise false + tasklet +} + +// scriptCheckConfig is a parameter struct for newScriptCheck +type scriptCheckConfig struct { + allocID string + taskName string + service *structs.Service + check *structs.ServiceCheck + agent heartbeater +} + +// newScriptCheck constructs a scriptCheck. we're only going to +// configure the immutable fields of scriptCheck here, with the +// rest being configured during the Poststart hook so that we have +// the rest of the task execution environment +func newScriptCheck(config *scriptCheckConfig) *scriptCheck { + serviceID := agentconsul.MakeTaskServiceID( + config.allocID, config.taskName, config.service) + checkID := agentconsul.MakeCheckID(serviceID, config.check) + + sc := &scriptCheck{ + id: checkID, + agent: config.agent, + lastCheckOk: true, // start logging on first failure + } + // we can't use the promoted fields of tasklet in the struct literal + sc.Command = config.check.Command + sc.Args = config.check.Args + sc.Interval = config.check.Interval + sc.Timeout = config.check.Timeout + return sc +} + +// Copy does a *shallow* copy of script checks. +func (sc *scriptCheck) Copy() *scriptCheck { + newSc := sc + return newSc +} + +// closes over the script check and returns the taskletCallback for +// when the script check executes. +func newScriptCheckCallback(s *scriptCheck) taskletCallback { + + return func(ctx context.Context, params execResult) { + output := params.output + code := params.code + err := params.err + + state := api.HealthCritical + switch code { + case 0: + state = api.HealthPassing + case 1: + state = api.HealthWarning + } + + var outputMsg string + if err != nil { + state = api.HealthCritical + outputMsg = err.Error() + } else { + outputMsg = string(output) + } + + // heartbeat the check to Consul + err = s.updateTTL(ctx, s.id, outputMsg, state) + select { + case <-ctx.Done(): + // check has been removed; don't report errors + return + default: + } + + if err != nil { + if s.lastCheckOk { + s.lastCheckOk = false + s.logger.Warn("updating check failed", "error", err) + } else { + s.logger.Debug("updating check still failing", "error", err) + } + + } else if !s.lastCheckOk { + // Succeeded for the first time or after failing; log + s.lastCheckOk = true + s.logger.Info("updating check succeeded") + } + } +} + +const ( + updateTTLBackoffBaseline = 1 * time.Second + updateTTLBackoffLimit = 3 * time.Second +) + +// updateTTL updates the state to Consul, performing an expontential backoff +// in the case where the check isn't registered in Consul to avoid a race between +// service registration and the first check. +func (s *scriptCheck) updateTTL(ctx context.Context, id, msg, state string) error { + for attempts := 0; ; attempts++ { + err := s.agent.UpdateTTL(id, msg, state) + if err == nil { + return nil + } + + // Handle the retry case + backoff := (1 << (2 * uint64(attempts))) * updateTTLBackoffBaseline + if backoff > updateTTLBackoffLimit { + return err + } + + // Wait till retrying + select { + case <-ctx.Done(): + return err + case <-time.After(backoff): + } + } +} diff --git a/client/allocrunner/taskrunner/script_check_hook_test.go b/client/allocrunner/taskrunner/script_check_hook_test.go new file mode 100644 index 00000000000..4373674fbae --- /dev/null +++ b/client/allocrunner/taskrunner/script_check_hook_test.go @@ -0,0 +1,215 @@ +package taskrunner + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/consul/api" + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func newScriptMock(hb heartbeater, exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *scriptCheck { + script := newScriptCheck(&scriptCheckConfig{ + allocID: "allocid", + taskName: "testtask", + agent: hb, + service: &structs.Service{Name: "xx"}, + check: &structs.ServiceCheck{}, + }) + script.exec = exec + script.logger = logger + script.Interval = interval + script.Timeout = timeout + script.callback = newScriptCheckCallback(script) + script.lastCheckOk = true + return script +} + +// fakeHeartbeater implements the heartbeater interface to allow mocking out +// Consul in script executor tests. +type fakeHeartbeater struct { + heartbeats chan heartbeat +} + +func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error { + f.heartbeats <- heartbeat{checkID: checkID, output: output, status: status} + return nil +} + +func newFakeHeartbeater() *fakeHeartbeater { + return &fakeHeartbeater{heartbeats: make(chan heartbeat)} +} + +type heartbeat struct { + checkID string + output string + status string +} + +// TestScript_Exec_Cancel asserts cancelling a script check shortcircuits +// any running scripts. +func TestScript_Exec_Cancel(t *testing.T) { + exec, cancel := newBlockingScriptExec() + defer cancel() + + logger := testlog.HCLogger(t) + script := newScriptMock(nil, // heartbeater should never be called + exec, logger, time.Hour, time.Hour) + + handle := script.run() + <-exec.running // wait until Exec is called + handle.cancel() // cancel now that we're blocked in exec + + select { + case <-handle.wait(): + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + require.NotEqual(t, atomic.LoadInt32(&exec.exited), 1, + "expected script executor to still be running after timeout") +} + +// TestScript_Exec_TimeoutBasic asserts a script will be killed when the +// timeout is reached. +func TestScript_Exec_TimeoutBasic(t *testing.T) { + t.Parallel() + exec, cancel := newBlockingScriptExec() + defer cancel() + + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock(hb, exec, logger, time.Hour, time.Second) + + handle := script.run() + defer handle.cancel() // cleanup + <-exec.running // wait until Exec is called + + // Check for UpdateTTL call + select { + case update := <-hb.heartbeats: + require.Equal(t, update.output, context.DeadlineExceeded.Error()) + require.Equal(t, update.status, api.HealthCritical) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + require.NotEqual(t, atomic.LoadInt32(&exec.exited), 1, + "expected script executor to still be running after timeout") + + // Cancel and watch for exit + handle.cancel() + select { + case <-handle.wait(): // ok! + case update := <-hb.heartbeats: + t.Errorf("unexpected UpdateTTL call on exit with status=%q", update) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } +} + +// TestScript_Exec_TimeoutCritical asserts a script will be killed when +// the timeout is reached and always set a critical status regardless of what +// Exec returns. +func TestScript_Exec_TimeoutCritical(t *testing.T) { + t.Parallel() + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock(hb, sleeperExec{}, logger, time.Hour, time.Nanosecond) + + handle := script.run() + defer handle.cancel() // cleanup + + // Check for UpdateTTL call + select { + case update := <-hb.heartbeats: + require.Equal(t, update.output, context.DeadlineExceeded.Error()) + require.Equal(t, update.status, api.HealthCritical) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to timeout") + } +} + +// TestScript_Exec_Shutdown asserts a script will be executed once more +// when told to shutdown. +func TestScript_Exec_Shutdown(t *testing.T) { + shutdown := make(chan struct{}) + exec := newSimpleExec(0, nil) + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock(hb, exec, logger, time.Hour, 3*time.Second) + script.shutdownCh = shutdown + + handle := script.run() + defer handle.cancel() // cleanup + close(shutdown) // tell scriptCheck to exit + + select { + case update := <-hb.heartbeats: + require.Equal(t, update.output, "code=0 err=") + require.Equal(t, update.status, api.HealthPassing) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + select { + case <-handle.wait(): // ok! + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } +} + +// TestScript_Exec_Codes asserts script exit codes are translated to their +// corresponding Consul health check status. +func TestScript_Exec_Codes(t *testing.T) { + + exec := newScriptedExec([]execResult{ + {[]byte("output"), 1, nil}, + {[]byte("output"), 0, nil}, + {[]byte("output"), 0, context.DeadlineExceeded}, + {[]byte("output"), 0, nil}, + {[]byte(""), 2, fmt.Errorf("some error")}, + {[]byte("output"), 0, nil}, + {[]byte("error9000"), 9000, nil}, + }) + logger := testlog.HCLogger(t) + hb := newFakeHeartbeater() + script := newScriptMock( + hb, exec, logger, time.Nanosecond, 3*time.Second) + + handle := script.run() + defer handle.cancel() // cleanup + deadline := time.After(3 * time.Second) + + expected := []heartbeat{ + {script.id, "output", api.HealthWarning}, + {script.id, "output", api.HealthPassing}, + {script.id, context.DeadlineExceeded.Error(), api.HealthCritical}, + {script.id, "output", api.HealthPassing}, + {script.id, "some error", api.HealthCritical}, + {script.id, "output", api.HealthPassing}, + {script.id, "error9000", api.HealthCritical}, + } + + for i := 0; i <= 6; i++ { + select { + case update := <-hb.heartbeats: + require.Equal(t, update, expected[i], + "expected update %d to be '%s' but received '%s'", + i, expected[i], update) + case <-deadline: + t.Fatalf("timed out waiting for all script checks to finish") + } + } +} diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 9083e9ecf69..25d1b59bccd 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -105,6 +105,15 @@ func (tr *TaskRunner) initHooks() { logger: hookLogger, })) } + + // If there are any script checks, add the hook + scriptCheckHook := newScriptCheckHook(scriptCheckHookConfig{ + alloc: tr.Alloc(), + task: tr.Task(), + consul: tr.consulClient, + logger: hookLogger, + }) + tr.runnerHooks = append(tr.runnerHooks, scriptCheckHook) } func (tr *TaskRunner) emitHookError(err error, hookName string) { diff --git a/client/allocrunner/taskrunner/tasklet.go b/client/allocrunner/taskrunner/tasklet.go new file mode 100644 index 00000000000..0f6d2e578c2 --- /dev/null +++ b/client/allocrunner/taskrunner/tasklet.go @@ -0,0 +1,158 @@ +package taskrunner + +import ( + "context" + "time" + + metrics "github.com/armon/go-metrics" + log "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" +) + +// contextExec allows canceling a interfaces.ScriptExecutor with a context. +type contextExec struct { + // pctx is the parent context. A subcontext will be created with Exec's + // timeout. + pctx context.Context + + // exec to be wrapped in a context + exec interfaces.ScriptExecutor +} + +func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec { + return &contextExec{ + pctx: ctx, + exec: exec, + } +} + +// execResult are the outputs of an Exec +type execResult struct { + output []byte + code int + err error +} + +// Exec a command until the timeout expires, the context is canceled, or the +// underlying Exec returns. +func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { + resCh := make(chan execResult, 1) + + // Don't trust the underlying implementation to obey timeout + ctx, cancel := context.WithTimeout(c.pctx, timeout) + defer cancel() + + go func() { + output, code, err := c.exec.Exec(timeout, cmd, args) + select { + case resCh <- execResult{output, code, err}: + case <-ctx.Done(): + } + }() + + select { + case res := <-resCh: + return res.output, res.code, res.err + case <-ctx.Done(): + return nil, 0, ctx.Err() + } +} + +// tasklet is an abstraction around periodically running a script within +// the context of a Task. The interfaces.ScriptExecutor is fired at least +// once and on each interval, and fires a callback whenever the script +// is complete. +type tasklet struct { + Command string // Command is the command to run for tasklet + Args []string // Args is a list of arguments for tasklet + Interval time.Duration // Interval of the tasklet + Timeout time.Duration // Timeout of the tasklet + exec interfaces.ScriptExecutor + callback taskletCallback + logger log.Logger + shutdownCh <-chan struct{} +} + +// taskletHandle is returned by tasklet.run by cancelling a tasklet and +// waiting for it to shutdown. +type taskletHandle struct { + // cancel the script + cancel func() + exitCh chan struct{} +} + +// wait returns a chan that's closed when the tasklet exits +func (t taskletHandle) wait() <-chan struct{} { + return t.exitCh +} + +// taskletCallback is called with a cancellation context and the output of a +// tasklet's Exec whenever it runs. +type taskletCallback func(context.Context, execResult) + +// run this tasklet check and return its cancel func. The tasklet's +// callback will be called each time it completes. If the shutdownCh is +// closed the check will be run once more before exiting. +func (t *tasklet) run() *taskletHandle { + ctx, cancel := context.WithCancel(context.Background()) + exitCh := make(chan struct{}) + + // Wrap the original interfaces.ScriptExecutor in one that obeys context + // cancelation. + ctxExec := newContextExec(ctx, t.exec) + + go func() { + defer close(exitCh) + timer := time.NewTimer(0) + defer timer.Stop() + for { + // Block until tasklet is removed, Nomad is shutting + // down, or the tasklet interval is up + select { + case <-ctx.Done(): + // tasklet has been removed + return + case <-t.shutdownCh: + // unblock but don't exit until after we run once more + case <-timer.C: + timer.Reset(t.Interval) + } + + metrics.IncrCounter([]string{ + "client", "allocrunner", "taskrunner", "tasklet_runs"}, 1) + + // Execute check script with timeout + t.logger.Trace("tasklet executing") + output, code, err := ctxExec.Exec(t.Timeout, t.Command, t.Args) + switch err { + case context.Canceled: + // check removed during execution; exit + return + case context.DeadlineExceeded: + metrics.IncrCounter([]string{ + "client", "allocrunner", "taskrunner", + "tasklet_timeouts"}, 1) + // If no error was returned, set one to make sure the tasklet + // is marked as failed + if err == nil { + err = context.DeadlineExceeded + } + + // Log deadline exceeded every time as it's a + // distinct issue from the tasklet returning failure + t.logger.Warn("tasklet timed out", "timeout", t.Timeout) + } + + t.callback(ctx, execResult{output, code, err}) + + select { + case <-t.shutdownCh: + // We've been told to exit and just ran so exit + return + default: + } + } + }() + return &taskletHandle{cancel: cancel, exitCh: exitCh} +} diff --git a/client/allocrunner/taskrunner/tasklet_test.go b/client/allocrunner/taskrunner/tasklet_test.go new file mode 100644 index 00000000000..4dc8f36f2c3 --- /dev/null +++ b/client/allocrunner/taskrunner/tasklet_test.go @@ -0,0 +1,268 @@ +package taskrunner + +import ( + "context" + "fmt" + "os" + "os/exec" + "sync/atomic" + "testing" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/testtask" + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + if !testtask.Run() { + os.Exit(m.Run()) + } +} + +func TestTasklet_Exec_HappyPath(t *testing.T) { + results := []execResult{ + {[]byte("output"), 0, nil}, + {[]byte("output"), 1, nil}, + {[]byte("output"), 0, context.DeadlineExceeded}, + {[]byte(""), 2, fmt.Errorf("some error")}, + {[]byte("error9000"), 9000, nil}, + } + exec := newScriptedExec(results) + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Nanosecond, 3*time.Second) + + handle := tm.run() + defer handle.cancel() // just-in-case cleanup + + deadline := time.After(3 * time.Second) + for i := 0; i <= 4; i++ { + select { + case result := <-tm.calls: + // for the happy path without cancelations or shutdowns, we expect + // to get the results passed to the callback in order and without + // modification + assert.Equal(t, result, results[i]) + case <-deadline: + t.Fatalf("timed out waiting for all script checks to finish") + } + } +} + +// TestTasklet_Exec_Cancel asserts cancelling a tasklet short-circuits +// any running executions the tasklet +func TestTasklet_Exec_Cancel(t *testing.T) { + exec, cancel := newBlockingScriptExec() + defer cancel() + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, time.Hour) + + handle := tm.run() + <-exec.running // wait until Exec is called + handle.cancel() // cancel now that we're blocked in exec + + select { + case <-handle.wait(): + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for tasklet check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + if atomic.LoadInt32(&exec.exited) == 1 { + t.Errorf("expected script executor to still be running after timeout") + } + // No tasklets finished, so no callbacks should have gotten a + // chance to fire + select { + case call := <-tm.calls: + t.Errorf("expected 0 calls of tasklet, got %v", call) + default: + break + } +} + +// TestTasklet_Exec_Timeout asserts a tasklet script will be killed +// when the timeout is reached. +func TestTasklet_Exec_Timeout(t *testing.T) { + t.Parallel() + exec, cancel := newBlockingScriptExec() + defer cancel() + + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, time.Second) + + handle := tm.run() + defer handle.cancel() // just-in-case cleanup + <-exec.running // wait until Exec is called + + // We should get a timeout + select { + case update := <-tm.calls: + if update.err != context.DeadlineExceeded { + t.Errorf("expected context.DeadlineExceeed but received %+v", update) + } + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be + // canceled. Only a wrapper around it obeys the context cancelation. + if atomic.LoadInt32(&exec.exited) == 1 { + t.Errorf("expected executor to still be running after timeout") + } + + // Cancel and watch for exit + handle.cancel() + select { + case <-handle.wait(): // ok! + case update := <-tm.calls: + t.Errorf("unexpected extra callback on exit with status=%v", update) + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for tasklet to exit") + } +} + +// TestTasklet_Exec_Shutdown asserts a script will be executed once more +// when told to shutdown. +func TestTasklet_Exec_Shutdown(t *testing.T) { + exec := newSimpleExec(0, nil) + shutdown := make(chan struct{}) + tm := newTaskletMock(exec, testlog.HCLogger(t), time.Hour, 3*time.Second) + tm.shutdownCh = shutdown + handle := tm.run() + + defer handle.cancel() // just-in-case cleanup + close(shutdown) // tell script to exit + + select { + case update := <-tm.calls: + if update.err != nil { + t.Errorf("expected clean shutdown but received %q", update.err) + } + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } + + select { + case <-handle.wait(): // ok + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for script check to exit") + } +} + +// test helpers + +type taskletMock struct { + tasklet + calls chan execResult +} + +func newTaskletMock(exec interfaces.ScriptExecutor, logger hclog.Logger, interval, timeout time.Duration) *taskletMock { + tm := &taskletMock{calls: make(chan execResult)} + tm.exec = exec + tm.logger = logger + tm.Interval = interval + tm.Timeout = timeout + tm.callback = func(ctx context.Context, params execResult) { + tm.calls <- params + } + return tm +} + +// blockingScriptExec implements ScriptExec by running a subcommand that never +// exits. +type blockingScriptExec struct { + // pctx is canceled *only* for test cleanup. Just like real + // ScriptExecutors its Exec method cannot be canceled directly -- only + // with a timeout. + pctx context.Context + + // running is ticked before blocking to allow synchronizing operations + running chan struct{} + + // set to 1 with atomics if Exec is called and has exited + exited int32 +} + +// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the +// caller recvs on the b.running chan. It also returns a CancelFunc for test +// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout +// expires. +func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + exec := &blockingScriptExec{ + pctx: ctx, + running: make(chan struct{}), + } + return exec, cancel +} + +func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) { + b.running <- struct{}{} + ctx, cancel := context.WithTimeout(b.pctx, dur) + defer cancel() + cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h") + testtask.SetCmdEnv(cmd) + err := cmd.Run() + code := 0 + if exitErr, ok := err.(*exec.ExitError); ok { + if !exitErr.Success() { + code = 1 + } + } + atomic.StoreInt32(&b.exited, 1) + return []byte{}, code, err +} + +// sleeperExec sleeps for 100ms but returns successfully to allow testing timeout conditions +type sleeperExec struct{} + +func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) { + time.Sleep(100 * time.Millisecond) + return []byte{}, 0, nil +} + +// simpleExec is a fake ScriptExecutor that returns whatever is specified. +type simpleExec struct { + code int + err error +} + +func (s simpleExec) Exec(time.Duration, string, []string) ([]byte, int, error) { + return []byte(fmt.Sprintf("code=%d err=%v", s.code, s.err)), s.code, s.err +} + +// newSimpleExec creates a new ScriptExecutor that returns the given code and err. +func newSimpleExec(code int, err error) simpleExec { + return simpleExec{code: code, err: err} +} + +// scriptedExec is a fake ScriptExecutor with a predetermined sequence +// of results. +type scriptedExec struct { + fn func() ([]byte, int, error) +} + +// For each call to Exec, scriptedExec returns the next result in its +// sequence of results +func (s scriptedExec) Exec(time.Duration, string, []string) ([]byte, int, error) { + return s.fn() +} + +func newScriptedExec(results []execResult) scriptedExec { + index := 0 + s := scriptedExec{} + // we have to close over the index because the interface we're + // mocking expects a value and not a pointer, which prevents + // us from updating the index + fn := func() ([]byte, int, error) { + result := results[index] + // prevents us from iterating off the end of the results + if index+1 < len(results) { + index = index + 1 + } + return result.output, result.code, result.err + } + s.fn = fn + return s +} diff --git a/client/consul/consul.go b/client/consul/consul.go index a789c4c34f5..df1c455cfe9 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -15,4 +15,5 @@ type ConsulServiceAPI interface { RemoveTask(*consul.TaskServices) UpdateTask(old, newTask *consul.TaskServices) error AllocRegistrations(allocID string) (*consul.AllocRegistration, error) + UpdateTTL(id, output, status string) error } diff --git a/client/consul/consul_testing.go b/client/consul/consul_testing.go index e38cfc6c17c..ce276b02a90 100644 --- a/client/consul/consul_testing.go +++ b/client/consul/consul_testing.go @@ -21,7 +21,7 @@ type MockConsulOp struct { func NewMockConsulOp(op, allocID, name string) MockConsulOp { switch op { case "add", "remove", "update", "alloc_registrations", - "add_group", "remove_group", "update_group": + "add_group", "remove_group", "update_group", "update_ttl": default: panic(fmt.Errorf("invalid consul op: %s", op)) } @@ -123,6 +123,15 @@ func (m *MockConsulServiceClient) AllocRegistrations(allocID string) (*consul.Al return nil, nil } +func (m *MockConsulServiceClient) UpdateTTL(checkID, output, status string) error { + // TODO(tgross): this method is here so we can implement the + // interface but the locking we need for testing creates a lot + // of opportunities for deadlocks in testing that will never + // appear in live code. + m.logger.Trace("UpdateTTL", "check_id", checkID, "status", status) + return nil +} + func (m *MockConsulServiceClient) GetOps() []MockConsulOp { m.mu.Lock() defer m.mu.Unlock() diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 7c8ca34d939..fc34eacee26 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -105,10 +105,8 @@ func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.Agen // operations are submitted to the main loop via commit() for synchronizing // with Consul. type operations struct { - regServices []*api.AgentServiceRegistration - regChecks []*api.AgentCheckRegistration - scripts []*scriptCheck - + regServices []*api.AgentServiceRegistration + regChecks []*api.AgentCheckRegistration deregServices []string deregChecks []string } @@ -230,10 +228,8 @@ type ServiceClient struct { opCh chan *operations - services map[string]*api.AgentServiceRegistration - checks map[string]*api.AgentCheckRegistration - scripts map[string]*scriptCheck - runningScripts map[string]*scriptHandle + services map[string]*api.AgentServiceRegistration + checks map[string]*api.AgentCheckRegistration explicitlyDeregisteredServices map[string]bool explicitlyDeregisteredChecks map[string]bool @@ -284,8 +280,6 @@ func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bo opCh: make(chan *operations, 8), services: make(map[string]*api.AgentServiceRegistration), checks: make(map[string]*api.AgentCheckRegistration), - scripts: make(map[string]*scriptCheck), - runningScripts: make(map[string]*scriptHandle), explicitlyDeregisteredServices: make(map[string]bool), explicitlyDeregisteredChecks: make(map[string]bool), allocRegistrations: make(map[string]*AllocRegistration), @@ -439,25 +433,16 @@ func (c *ServiceClient) merge(ops *operations) { for _, check := range ops.regChecks { c.checks[check.ID] = check } - for _, s := range ops.scripts { - c.scripts[s.id] = s - } for _, sid := range ops.deregServices { delete(c.services, sid) c.explicitlyDeregisteredServices[sid] = true } for _, cid := range ops.deregChecks { - if script, ok := c.runningScripts[cid]; ok { - script.cancel() - delete(c.scripts, cid) - delete(c.runningScripts, cid) - } delete(c.checks, cid) c.explicitlyDeregisteredChecks[cid] = true } metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services))) metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks))) - metrics.SetGauge([]string{"client", "consul", "script_checks"}, float32(len(c.runningScripts))) } // sync enqueued operations. @@ -593,16 +578,6 @@ func (c *ServiceClient) sync() error { } creg++ metrics.IncrCounter([]string{"client", "consul", "check_registrations"}, 1) - - // Handle starting scripts - if script, ok := c.scripts[id]; ok { - // If it's already running, cancel and replace - if oldScript, running := c.runningScripts[id]; running { - oldScript.cancel() - } - // Start and store the handle - c.runningScripts[id] = script.run() - } } // Only log if something was actually synced @@ -649,7 +624,7 @@ func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) ops.regServices = append(ops.regServices, serviceReg) for _, check := range service.Checks { - checkID := makeCheckID(id, check) + checkID := MakeCheckID(id, check) if check.Type == structs.ServiceCheckScript { return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name) } @@ -782,17 +757,9 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st checkIDs := make([]string, 0, numChecks) for _, check := range service.Checks { - checkID := makeCheckID(serviceID, check) + checkID := MakeCheckID(serviceID, check) checkIDs = append(checkIDs, checkID) if check.Type == structs.ServiceCheckScript { - if task.DriverExec == nil { - return nil, fmt.Errorf("driver doesn't support script checks") - } - - sc := newScriptCheck(task.AllocID, task.Name, checkID, check, task.DriverExec, - c.client, c.logger, c.shutdownCh) - ops.scripts = append(ops.scripts, sc) - // Skip getAddress for script checks checkReg, err := createCheckReg(serviceID, checkID, check, "", 0) if err != nil { @@ -977,7 +944,7 @@ func (c *ServiceClient) RegisterTask(task *TaskServices) error { serviceID := MakeTaskServiceID(task.AllocID, task.Name, service) for _, check := range service.Checks { if check.TriggersRestarts() { - checkID := makeCheckID(serviceID, check) + checkID := MakeCheckID(serviceID, check) c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter) } } @@ -1012,7 +979,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { // Existing service entry removed ops.deregServices = append(ops.deregServices, existingID) for _, check := range existingSvc.Checks { - cid := makeCheckID(existingID, check) + cid := MakeCheckID(existingID, check) ops.deregChecks = append(ops.deregChecks, cid) // Unwatch watched checks @@ -1040,12 +1007,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { // See if any checks were updated existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks)) for _, check := range existingSvc.Checks { - existingChecks[makeCheckID(existingID, check)] = check + existingChecks[MakeCheckID(existingID, check)] = check } // Register new checks for _, check := range newSvc.Checks { - checkID := makeCheckID(existingID, check) + checkID := MakeCheckID(existingID, check) if _, exists := existingChecks[checkID]; exists { // Check is still required. Remove it from the map so it doesn't get // deleted later. @@ -1101,7 +1068,7 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service) for _, check := range service.Checks { if check.TriggersRestarts() { - checkID := makeCheckID(serviceID, check) + checkID := MakeCheckID(serviceID, check) c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter) } } @@ -1120,7 +1087,7 @@ func (c *ServiceClient) RemoveTask(task *TaskServices) { ops.deregServices = append(ops.deregServices, id) for _, check := range service.Checks { - cid := makeCheckID(id, check) + cid := MakeCheckID(id, check) ops.deregChecks = append(ops.deregChecks, cid) if check.TriggersRestarts() { @@ -1177,6 +1144,11 @@ func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, return reg, nil } +// TODO(tgross): make sure this is properly nil-checked, etc. +func (c *ServiceClient) UpdateTTL(id, output, status string) error { + return c.client.UpdateTTL(id, output, status) +} + // Shutdown the Consul client. Update running task registrations and deregister // agent from Consul. On first call blocks up to shutdownWait before giving up // on syncing operations. @@ -1220,14 +1192,6 @@ func (c *ServiceClient) Shutdown() error { } } - // Give script checks time to exit (no need to lock as Run() has exited) - for _, h := range c.runningScripts { - select { - case <-h.wait(): - case <-deadline: - return fmt.Errorf("timed out waiting for script checks to run") - } - } return nil } @@ -1285,10 +1249,10 @@ func MakeTaskServiceID(allocID, taskName string, service *structs.Service) strin return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel) } -// makeCheckID creates a unique ID for a check. +// MakeCheckID creates a unique ID for a check. // // Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d -func makeCheckID(serviceID string, check *structs.ServiceCheck) string { +func MakeCheckID(serviceID string, check *structs.ServiceCheck) string { return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID)) } diff --git a/command/agent/consul/script.go b/command/agent/consul/script.go deleted file mode 100644 index b4a99c84761..00000000000 --- a/command/agent/consul/script.go +++ /dev/null @@ -1,215 +0,0 @@ -package consul - -import ( - "context" - "time" - - metrics "github.com/armon/go-metrics" - log "github.com/hashicorp/go-hclog" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" - "github.com/hashicorp/nomad/nomad/structs" -) - -// heartbeater is the subset of consul agent functionality needed by script -// checks to heartbeat -type heartbeater interface { - UpdateTTL(id, output, status string) error -} - -// contextExec allows canceling a ScriptExecutor with a context. -type contextExec struct { - // pctx is the parent context. A subcontext will be created with Exec's - // timeout. - pctx context.Context - - // exec to be wrapped in a context - exec interfaces.ScriptExecutor -} - -func newContextExec(ctx context.Context, exec interfaces.ScriptExecutor) *contextExec { - return &contextExec{ - pctx: ctx, - exec: exec, - } -} - -type execResult struct { - buf []byte - code int - err error -} - -// Exec a command until the timeout expires, the context is canceled, or the -// underlying Exec returns. -func (c *contextExec) Exec(timeout time.Duration, cmd string, args []string) ([]byte, int, error) { - resCh := make(chan execResult, 1) - - // Don't trust the underlying implementation to obey timeout - ctx, cancel := context.WithTimeout(c.pctx, timeout) - defer cancel() - - go func() { - output, code, err := c.exec.Exec(timeout, cmd, args) - select { - case resCh <- execResult{output, code, err}: - case <-ctx.Done(): - } - }() - - select { - case res := <-resCh: - return res.buf, res.code, res.err - case <-ctx.Done(): - return nil, 0, ctx.Err() - } -} - -// scriptHandle is returned by scriptCheck.run by cancelling a scriptCheck and -// waiting for it to shutdown. -type scriptHandle struct { - // cancel the script - cancel func() - exitCh chan struct{} -} - -// wait returns a chan that's closed when the script exits -func (s *scriptHandle) wait() <-chan struct{} { - return s.exitCh -} - -// scriptCheck runs script checks via a ScriptExecutor and updates the -// appropriate check's TTL when the script succeeds. -type scriptCheck struct { - allocID string - taskName string - - id string - check *structs.ServiceCheck - exec interfaces.ScriptExecutor - agent heartbeater - - // lastCheckOk is true if the last check was ok; otherwise false - lastCheckOk bool - - logger log.Logger - shutdownCh <-chan struct{} -} - -// newScriptCheck creates a new scriptCheck. run() should be called once the -// initial check is registered with Consul. -func newScriptCheck(allocID, taskName, checkID string, check *structs.ServiceCheck, - exec interfaces.ScriptExecutor, agent heartbeater, logger log.Logger, - shutdownCh <-chan struct{}) *scriptCheck { - - logger = logger.ResetNamed("consul.checks").With("task", taskName, "alloc_id", allocID, "check", check.Name) - return &scriptCheck{ - allocID: allocID, - taskName: taskName, - id: checkID, - check: check, - exec: exec, - agent: agent, - lastCheckOk: true, // start logging on first failure - logger: logger, - shutdownCh: shutdownCh, - } -} - -// run this script check and return its cancel func. If the shutdownCh is -// closed the check will be run once more before exiting. -func (s *scriptCheck) run() *scriptHandle { - ctx, cancel := context.WithCancel(context.Background()) - exitCh := make(chan struct{}) - - // Wrap the original ScriptExecutor in one that obeys context - // cancelation. - ctxExec := newContextExec(ctx, s.exec) - - go func() { - defer close(exitCh) - timer := time.NewTimer(0) - defer timer.Stop() - for { - // Block until check is removed, Nomad is shutting - // down, or the check interval is up - select { - case <-ctx.Done(): - // check has been removed - return - case <-s.shutdownCh: - // unblock but don't exit until after we heartbeat once more - case <-timer.C: - timer.Reset(s.check.Interval) - } - metrics.IncrCounter([]string{"client", "consul", "script_runs"}, 1) - - // Execute check script with timeout - output, code, err := ctxExec.Exec(s.check.Timeout, s.check.Command, s.check.Args) - switch err { - case context.Canceled: - // check removed during execution; exit - return - case context.DeadlineExceeded: - metrics.IncrCounter([]string{"client", "consul", "script_timeouts"}, 1) - // If no error was returned, set one to make sure the task goes critical - if err == nil { - err = context.DeadlineExceeded - } - - // Log deadline exceeded every time as it's a - // distinct issue from checks returning - // failures - s.logger.Warn("check timed out", "timeout", s.check.Timeout) - } - - state := api.HealthCritical - switch code { - case 0: - state = api.HealthPassing - case 1: - state = api.HealthWarning - } - - var outputMsg string - if err != nil { - state = api.HealthCritical - outputMsg = err.Error() - } else { - outputMsg = string(output) - } - - // Actually heartbeat the check - err = s.agent.UpdateTTL(s.id, outputMsg, state) - select { - case <-ctx.Done(): - // check has been removed; don't report errors - return - default: - } - - if err != nil { - if s.lastCheckOk { - s.lastCheckOk = false - s.logger.Warn("updating check failed", "error", err) - } else { - s.logger.Debug("updating check still failing", "error", err) - } - - } else if !s.lastCheckOk { - // Succeeded for the first time or after failing; log - s.lastCheckOk = true - s.logger.Info("updating check succeeded") - } - - select { - case <-s.shutdownCh: - // We've been told to exit and just heartbeated so exit - return - default: - } - } - }() - return &scriptHandle{cancel: cancel, exitCh: exitCh} -} diff --git a/command/agent/consul/script_test.go b/command/agent/consul/script_test.go deleted file mode 100644 index 25b6329b308..00000000000 --- a/command/agent/consul/script_test.go +++ /dev/null @@ -1,309 +0,0 @@ -package consul - -import ( - "context" - "fmt" - "os" - "os/exec" - "sync/atomic" - "testing" - "time" - - "github.com/hashicorp/consul/api" - "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/helper/testtask" - "github.com/hashicorp/nomad/nomad/structs" -) - -func TestMain(m *testing.M) { - if !testtask.Run() { - os.Exit(m.Run()) - } -} - -// blockingScriptExec implements ScriptExec by running a subcommand that never -// exits. -type blockingScriptExec struct { - // pctx is canceled *only* for test cleanup. Just like real - // ScriptExecutors its Exec method cannot be canceled directly -- only - // with a timeout. - pctx context.Context - - // running is ticked before blocking to allow synchronizing operations - running chan struct{} - - // set to 1 with atomics if Exec is called and has exited - exited int32 -} - -// newBlockingScriptExec returns a ScriptExecutor that blocks Exec() until the -// caller recvs on the b.running chan. It also returns a CancelFunc for test -// cleanup only. The runtime cannot cancel ScriptExecutors before their timeout -// expires. -func newBlockingScriptExec() (*blockingScriptExec, context.CancelFunc) { - ctx, cancel := context.WithCancel(context.Background()) - exec := &blockingScriptExec{ - pctx: ctx, - running: make(chan struct{}), - } - return exec, cancel -} - -func (b *blockingScriptExec) Exec(dur time.Duration, _ string, _ []string) ([]byte, int, error) { - b.running <- struct{}{} - ctx, cancel := context.WithTimeout(b.pctx, dur) - defer cancel() - cmd := exec.CommandContext(ctx, testtask.Path(), "sleep", "9000h") - testtask.SetCmdEnv(cmd) - err := cmd.Run() - code := 0 - if exitErr, ok := err.(*exec.ExitError); ok { - if !exitErr.Success() { - code = 1 - } - } - atomic.StoreInt32(&b.exited, 1) - return []byte{}, code, err -} - -// TestConsulScript_Exec_Cancel asserts cancelling a script check shortcircuits -// any running scripts. -func TestConsulScript_Exec_Cancel(t *testing.T) { - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: time.Hour, - } - exec, cancel := newBlockingScriptExec() - defer cancel() - - // pass nil for heartbeater as it shouldn't be called - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, nil, testlog.HCLogger(t), nil) - handle := check.run() - - // wait until Exec is called - <-exec.running - - // cancel now that we're blocked in exec - handle.cancel() - - select { - case <-handle.wait(): - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } - - // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be - // canceled. Only a wrapper around it obeys the context cancelation. - if atomic.LoadInt32(&exec.exited) == 1 { - t.Errorf("expected script executor to still be running after timeout") - } -} - -type execStatus struct { - checkID string - output string - status string -} - -// fakeHeartbeater implements the heartbeater interface to allow mocking out -// Consul in script executor tests. -type fakeHeartbeater struct { - updates chan execStatus -} - -func (f *fakeHeartbeater) UpdateTTL(checkID, output, status string) error { - f.updates <- execStatus{checkID: checkID, output: output, status: status} - return nil -} - -func newFakeHeartbeater() *fakeHeartbeater { - return &fakeHeartbeater{updates: make(chan execStatus)} -} - -// TestConsulScript_Exec_TimeoutBasic asserts a script will be killed when the -// timeout is reached. -func TestConsulScript_Exec_TimeoutBasic(t *testing.T) { - t.Parallel() - - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: time.Second, - } - - exec, cancel := newBlockingScriptExec() - defer cancel() - - hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), nil) - handle := check.run() - defer handle.cancel() // just-in-case cleanup - <-exec.running - - // Check for UpdateTTL call - select { - case update := <-hb.updates: - if update.status != api.HealthCritical { - t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } - - // The underlying ScriptExecutor (newBlockScriptExec) *cannot* be - // canceled. Only a wrapper around it obeys the context cancelation. - if atomic.LoadInt32(&exec.exited) == 1 { - t.Errorf("expected script executor to still be running after timeout") - } - - // Cancel and watch for exit - handle.cancel() - select { - case <-handle.wait(): - // ok! - case update := <-hb.updates: - t.Errorf("unexpected UpdateTTL call on exit with status=%q", update) - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } -} - -// sleeperExec sleeps for 100ms but returns successfully to allow testing timeout conditions -type sleeperExec struct{} - -func (sleeperExec) Exec(time.Duration, string, []string) ([]byte, int, error) { - time.Sleep(100 * time.Millisecond) - return []byte{}, 0, nil -} - -// TestConsulScript_Exec_TimeoutCritical asserts a script will be killed when -// the timeout is reached and always set a critical status regardless of what -// Exec returns. -func TestConsulScript_Exec_TimeoutCritical(t *testing.T) { - t.Parallel() - - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: time.Nanosecond, - } - hb := newFakeHeartbeater() - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, sleeperExec{}, hb, testlog.HCLogger(t), nil) - handle := check.run() - defer handle.cancel() // just-in-case cleanup - - // Check for UpdateTTL call - select { - case update := <-hb.updates: - if update.status != api.HealthCritical { - t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update) - } - if update.output != context.DeadlineExceeded.Error() { - t.Errorf("expected output=%q but found: %q", context.DeadlineExceeded.Error(), update.output) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to timeout") - } -} - -// simpleExec is a fake ScriptExecutor that returns whatever is specified. -type simpleExec struct { - code int - err error -} - -func (s simpleExec) Exec(time.Duration, string, []string) ([]byte, int, error) { - return []byte(fmt.Sprintf("code=%d err=%v", s.code, s.err)), s.code, s.err -} - -// newSimpleExec creates a new ScriptExecutor that returns the given code and err. -func newSimpleExec(code int, err error) simpleExec { - return simpleExec{code: code, err: err} -} - -// TestConsulScript_Exec_Shutdown asserts a script will be executed once more -// when told to shutdown. -func TestConsulScript_Exec_Shutdown(t *testing.T) { - serviceCheck := structs.ServiceCheck{ - Name: "sleeper", - Interval: time.Hour, - Timeout: 3 * time.Second, - } - - hb := newFakeHeartbeater() - shutdown := make(chan struct{}) - exec := newSimpleExec(0, nil) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown) - handle := check.run() - defer handle.cancel() // just-in-case cleanup - - // Tell scriptCheck to exit - close(shutdown) - - select { - case update := <-hb.updates: - if update.status != api.HealthPassing { - t.Errorf("expected %q due to timeout but received %q", api.HealthCritical, update) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } - - select { - case <-handle.wait(): - // ok! - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exit") - } -} - -func TestConsulScript_Exec_Codes(t *testing.T) { - run := func(code int, err error, expected string) func(t *testing.T) { - return func(t *testing.T) { - t.Parallel() - serviceCheck := structs.ServiceCheck{ - Name: "test", - Interval: time.Hour, - Timeout: 3 * time.Second, - } - - hb := newFakeHeartbeater() - shutdown := make(chan struct{}) - exec := newSimpleExec(code, err) - check := newScriptCheck("allocid", "testtask", "checkid", &serviceCheck, exec, hb, testlog.HCLogger(t), shutdown) - handle := check.run() - defer handle.cancel() - - select { - case update := <-hb.updates: - if update.status != expected { - t.Errorf("expected %q but received %q", expected, update) - } - // assert output is being reported - expectedOutput := fmt.Sprintf("code=%d err=%v", code, err) - if err != nil { - expectedOutput = err.Error() - } - if update.output != expectedOutput { - t.Errorf("expected output=%q but found: %q", expectedOutput, update.output) - } - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to exec") - } - } - } - - // Test exit codes with errors - t.Run("Passing", run(0, nil, api.HealthPassing)) - t.Run("Warning", run(1, nil, api.HealthWarning)) - t.Run("Critical-2", run(2, nil, api.HealthCritical)) - t.Run("Critical-9000", run(9000, nil, api.HealthCritical)) - - // Errors should always cause Critical status - err := fmt.Errorf("test error") - t.Run("Error-0", run(0, err, api.HealthCritical)) - t.Run("Error-1", run(1, err, api.HealthCritical)) - t.Run("Error-2", run(2, err, api.HealthCritical)) - t.Run("Error-9000", run(9000, err, api.HealthCritical)) -} diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index d61dc30e881..5c3effeb3d1 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" - "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -46,40 +45,9 @@ func testTask() *TaskServices { }, }, }, - DriverExec: newMockExec(), } } -// mockExec implements the ScriptExecutor interface and will use an alternate -// implementation t.ExecFunc if non-nil. -type mockExec struct { - // Ticked whenever a script is called - execs chan int - - // If non-nil will be called by script checks - ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error) -} - -func newMockExec() *mockExec { - return &mockExec{ - execs: make(chan int, 100), - } -} - -func (m *mockExec) Exec(dur time.Duration, cmd string, args []string) ([]byte, int, error) { - select { - case m.execs <- 1: - default: - } - if m.ExecFunc == nil { - // Default impl is just "ok" - return []byte("ok"), 0, nil - } - ctx, cancel := context.WithTimeout(context.Background(), dur) - defer cancel() - return m.ExecFunc(ctx, cmd, args) -} - // restartRecorder is a minimal TaskRestarter implementation that simply // counts how many restarts were triggered. type restartRecorder struct { @@ -96,7 +64,6 @@ type testFakeCtx struct { ServiceClient *ServiceClient FakeConsul *MockAgent Task *TaskServices - MockExec *mockExec } var errNoOps = fmt.Errorf("testing error: no pending operations") @@ -131,7 +98,6 @@ func setupFake(t *testing.T) *testFakeCtx { ServiceClient: sc, FakeConsul: fc, Task: tt, - MockExec: tt.DriverExec.(*mockExec), } } @@ -226,13 +192,6 @@ func TestConsul_ChangePorts(t *testing.T) { require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": origScriptKey = k - select { - case <-ctx.MockExec.execs: - // Here we validate there is nothing left on the channel - require.Equal(0, len(ctx.MockExec.execs)) - case <-time.After(3 * time.Second): - t.Fatalf("script not called in time") - } case "c3": origHTTPKey = k require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) @@ -678,291 +637,104 @@ func TestConsul_RegServices(t *testing.T) { func TestConsul_ShutdownOK(t *testing.T) { require := require.New(t) ctx := setupFake(t) - - // Add a script check to make sure its TTL gets updated - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheck", - Type: "script", - Command: "true", - // Make check block until shutdown - Interval: 9000 * time.Hour, - Timeout: 10 * time.Second, - InitialStatus: "warning", - }, - } - go ctx.ServiceClient.Run() - // Register a task and agent - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - + // register the Nomad agent service and check agentServices := []*structs.Service{ { Name: "http", Tags: []string{"nomad"}, PortLabel: "localhost:2345", + Checks: []*structs.ServiceCheck{ + { + Name: "nomad-tcp", + Type: "tcp", + Interval: 9000 * time.Hour, // make check block + Timeout: 10 * time.Second, + InitialStatus: "warning", + }, + }, }, } - if err := ctx.ServiceClient.RegisterAgent("client", agentServices); err != nil { - t.Fatalf("unexpected error registering agent: %v", err) - } - - testutil.WaitForResult(func() (bool, error) { - return ctx.ServiceClient.hasSeen(), fmt.Errorf("error contacting Consul") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Shutdown should block until scripts finish - if err := ctx.ServiceClient.Shutdown(); err != nil { - t.Errorf("unexpected error shutting down client: %v", err) - } - - // UpdateTTL should have been called once for the script check and once - // for shutdown - if n := len(ctx.FakeConsul.checkTTLs); n != 1 { - t.Fatalf("expected 1 checkTTL entry but found: %d", n) - } - for _, v := range ctx.FakeConsul.checkTTLs { - require.Equalf(2, v, "expected 2 updates but found %d", v) - } - for _, v := range ctx.FakeConsul.checks { - if v.Status != "passing" { - t.Fatalf("expected check to be passing but found %q", v.Status) - } - } -} - -// TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in -// ServiceClient. -func TestConsul_ShutdownSlow(t *testing.T) { - t.Parallel() - ctx := setupFake(t) - - // Add a script check to make sure its TTL gets updated - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheck", - Type: "script", - Command: "true", - // Make check block until shutdown - Interval: 9000 * time.Hour, - Timeout: 5 * time.Second, - InitialStatus: "warning", - }, - } - - // Make Exec slow, but not too slow - waiter := make(chan struct{}) - ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { - select { - case <-waiter: - default: - close(waiter) - } - time.Sleep(time.Second) - return []byte{}, 0, nil - } - - // Make shutdown wait time just a bit longer than ctx.Exec takes - ctx.ServiceClient.shutdownWait = 3 * time.Second - - go ctx.ServiceClient.Run() - - // Register a task and agent - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - // wait for Exec to get called before shutting down - <-waiter - - // Shutdown should block until all enqueued operations finish. - preShutdown := time.Now() - if err := ctx.ServiceClient.Shutdown(); err != nil { - t.Errorf("unexpected error shutting down client: %v", err) - } + require.NoError(ctx.ServiceClient.RegisterAgent("client", agentServices)) + require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond) - // Shutdown time should have taken: ~1s <= shutdown <= 3s - // actual timing might be less than 1s, to account for shutdown invocation overhead - shutdownTime := time.Now().Sub(preShutdown) - if shutdownTime < 900*time.Millisecond || shutdownTime > ctx.ServiceClient.shutdownWait { - t.Errorf("expected shutdown to take >1s and <%s but took: %s", ctx.ServiceClient.shutdownWait, shutdownTime) - } + // assert successful registration + require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered") + require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered") + require.Contains(ctx.FakeConsul.services, + makeAgentServiceID("client", agentServices[0])) - // UpdateTTL should have been called once for the script check - if n := len(ctx.FakeConsul.checkTTLs); n != 1 { - t.Fatalf("expected 1 checkTTL entry but found: %d", n) - } - for _, v := range ctx.FakeConsul.checkTTLs { - if v != 1 { - t.Fatalf("expected script check to be updated once but found %d", v) - } - } - for _, v := range ctx.FakeConsul.checks { - if v.Status != "passing" { - t.Fatalf("expected check to be passing but found %q", v.Status) - } - } + // Shutdown() should block until Nomad agent service/check is deregistered + require.NoError(ctx.ServiceClient.Shutdown()) + require.Len(ctx.FakeConsul.services, 0, "expected agent service to be deregistered") + require.Len(ctx.FakeConsul.checks, 0, "expected agent check to be deregistered") } // TestConsul_ShutdownBlocked tests the blocked past deadline path for the // shutdown logic in ServiceClient. func TestConsul_ShutdownBlocked(t *testing.T) { + require := require.New(t) t.Parallel() ctx := setupFake(t) + // can be short because we're intentionally blocking, but needs to + // be longer than the time we'll block Consul so we can be sure + // we're not delayed either. + ctx.ServiceClient.shutdownWait = time.Second + go ctx.ServiceClient.Run() - // Add a script check to make sure its TTL gets updated - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ + // register the Nomad agent service and check + agentServices := []*structs.Service{ { - Name: "scriptcheck", - Type: "script", - Command: "true", - // Make check block until shutdown - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - InitialStatus: "warning", + Name: "http", + Tags: []string{"nomad"}, + PortLabel: "localhost:2345", + Checks: []*structs.ServiceCheck{ + { + Name: "nomad-tcp", + Type: "tcp", + Interval: 9000 * time.Hour, // make check block + Timeout: 10 * time.Second, + InitialStatus: "warning", + }, + }, }, } + require.NoError(ctx.ServiceClient.RegisterAgent("client", agentServices)) + require.Eventually(ctx.ServiceClient.hasSeen, time.Second, 10*time.Millisecond) + require.Len(ctx.FakeConsul.services, 1, "expected agent service to be registered") + require.Len(ctx.FakeConsul.checks, 1, "expected agent check to be registered") - block := make(chan struct{}) - defer close(block) // cleanup after test - - // Make Exec block forever + // prevent normal shutdown by blocking Consul. the shutdown should wait + // until agent deregistration has finished waiter := make(chan struct{}) - ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) { + result := make(chan error) + go func() { + ctx.FakeConsul.mu.Lock() close(waiter) - <-block - return []byte{}, 0, nil - } - - // Use a short shutdown deadline since we're intentionally blocking forever - ctx.ServiceClient.shutdownWait = time.Second - - go ctx.ServiceClient.Run() - - // Register a task and agent - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } + result <- ctx.ServiceClient.Shutdown() + }() - // Wait for exec to be called - <-waiter + <-waiter // wait for lock to be hit // Shutdown should block until all enqueued operations finish. preShutdown := time.Now() - err := ctx.ServiceClient.Shutdown() - if err == nil { - t.Errorf("expected a timed out error from shutdown") - } - - // Shutdown time should have taken shutdownWait; to avoid timing - // related errors simply test for wait <= shutdown <= wait+3s - shutdownTime := time.Now().Sub(preShutdown) - maxWait := ctx.ServiceClient.shutdownWait + (3 * time.Second) - if shutdownTime < ctx.ServiceClient.shutdownWait || shutdownTime > maxWait { - t.Errorf("expected shutdown to take >%s and <%s but took: %s", ctx.ServiceClient.shutdownWait, maxWait, shutdownTime) - } - - // UpdateTTL should not have been called for the script check - if n := len(ctx.FakeConsul.checkTTLs); n != 0 { - t.Fatalf("expected 0 checkTTL entry but found: %d", n) - } - for _, v := range ctx.FakeConsul.checks { - if expected := "warning"; v.Status != expected { - t.Fatalf("expected check to be %q but found %q", expected, v.Status) - } - } -} - -// TestConsul_RemoveScript assert removing a script check removes all objects -// related to that check. -func TestConsul_CancelScript(t *testing.T) { - ctx := setupFake(t) - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheckDel", - Type: "script", - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - }, - { - Name: "scriptcheckKeep", - Type: "script", - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - }, - } - - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if len(ctx.FakeConsul.checks) != 2 { - t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks)) - } - - if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 { - t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d", - len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) - } - - for i := 0; i < 2; i++ { - select { - case <-ctx.MockExec.execs: - // Script ran as expected! - case <-time.After(3 * time.Second): - t.Fatalf("timed out waiting for script check to run") - } - } - - // Remove a check and update the task - origTask := ctx.Task.Copy() - ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ - { - Name: "scriptcheckKeep", - Type: "script", - Interval: 9000 * time.Hour, - Timeout: 9000 * time.Hour, - }, - } - - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if len(ctx.FakeConsul.checks) != 1 { - t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks)) - } - - if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 { - t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d", - len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts)) - } - - // Make sure exec wasn't called again select { - case <-ctx.MockExec.execs: - t.Errorf("unexpected execution of script; was goroutine not cancelled?") - case <-time.After(100 * time.Millisecond): - // No unexpected script execs - } - - // Don't leak goroutines - for _, scriptHandle := range ctx.ServiceClient.runningScripts { - scriptHandle.cancel() - } + case <-time.After(200 * time.Millisecond): + ctx.FakeConsul.mu.Unlock() + require.NoError(<-result) + case <-result: + t.Fatal("should not have received result until Consul unblocked") + } + shutdownTime := time.Now().Sub(preShutdown).Seconds() + require.Less(shutdownTime, time.Second.Seconds(), + "expected shutdown to take >200ms and <1s") + require.Greater(shutdownTime, 200*time.Millisecond.Seconds(), + "expected shutdown to take >200ms and <1s") + require.Len(ctx.FakeConsul.services, 0, + "expected agent service to be deregistered") + require.Len(ctx.FakeConsul.checks, 0, + "expected agent check to be deregistered") } // TestConsul_DriverNetwork_AutoUse asserts that if a driver network has @@ -1771,7 +1543,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { require.Len(ctx.ServiceClient.checks, 3) delete(ctx.ServiceClient.services, outofbandTaskServiceID) - delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) require.Len(ctx.ServiceClient.services, 2) require.Len(ctx.ServiceClient.checks, 2) @@ -1788,9 +1560,9 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) - require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) } // TestConsul_ServiceDeregistration_InProbation asserts that during initialization @@ -1880,7 +1652,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { require.Len(ctx.ServiceClient.checks, 3) delete(ctx.ServiceClient.services, outofbandTaskServiceID) - delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + delete(ctx.ServiceClient.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) require.Len(ctx.ServiceClient.services, 2) require.Len(ctx.ServiceClient.checks, 2) @@ -1897,9 +1669,9 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID) require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) - require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) - require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) // after probation, outofband services and checks are removed ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour) @@ -1912,8 +1684,8 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID) require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID) - require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) - require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) + require.Contains(ctx.FakeConsul.checks, MakeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0])) + require.NotContains(ctx.FakeConsul.checks, MakeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0])) }