diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index ee8386cb2e3..9f9ca013fbb 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -6,6 +6,8 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -29,6 +31,9 @@ const ( // allocations. It is unblocked when the capacity of a node that could run the // failed allocation becomes available. type BlockedEvals struct { + // logger is the logger to use by the blocked eval tracker. + logger log.Logger + evalBroker *EvalBroker enabled bool stats *BlockedStats @@ -47,7 +52,7 @@ type BlockedEvals struct { // jobs is the map of blocked job and is used to ensure that only one // blocked eval exists for each job. The value is the blocked evaluation ID. - jobs map[string]string + jobs map[structs.NamespacedID]string // unblockIndexes maps computed node classes or quota name to the index in // which they were unblocked. This is used to check if an evaluation could @@ -102,12 +107,13 @@ type BlockedStats struct { // NewBlockedEvals creates a new blocked eval tracker that will enqueue // unblocked evals into the passed broker. -func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { +func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals { return &BlockedEvals{ + logger: logger.Named("blocked_evals"), evalBroker: evalBroker, captured: make(map[string]wrappedEval), escaped: make(map[string]wrappedEval), - jobs: make(map[string]string), + jobs: make(map[structs.NamespacedID]string), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), duplicateCh: make(chan struct{}, 1), @@ -176,19 +182,11 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { return } - // Check if the job already has a blocked evaluation. If it does add it to - // the list of duplicates. We only ever want one blocked evaluation per job, - // otherwise we would create unnecessary work for the scheduler as multiple - // evals for the same job would be run, all producing the same outcome. - if _, existing := b.jobs[eval.JobID]; existing { - b.duplicates = append(b.duplicates, eval) - - // Unblock any waiter. - select { - case b.duplicateCh <- struct{}{}: - default: - } - + // Handle the new evaluation being for a job we are already tracking. + if b.processBlockJobDuplicate(eval) { + // If process block job duplicate returns true, the new evaluation has + // been marked as a duplicate and we have nothing to do, so return + // early. return } @@ -205,7 +203,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { } // Mark the job as tracked. - b.jobs[eval.JobID] = eval.ID + b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID b.stats.TotalBlocked++ // Track that the evaluation is being added due to reaching the quota limit @@ -234,6 +232,71 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { b.captured[eval.ID] = wrapped } +// processBlockJobDuplicate handles the case where the new eval is for a job +// that we are already tracking. If the eval is a duplicate, we add the older +// evaluation by Raft index to the list of duplicates such that it can be +// cancelled. We only ever want one blocked evaluation per job, otherwise we +// would create unnecessary work for the scheduler as multiple evals for the +// same job would be run, all producing the same outcome. It is critical to +// prefer the newer evaluation, since it will contain the most up to date set of +// class eligibility. The return value is set to true, if the passed evaluation +// is cancelled. This should be called with the lock held. +func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCancelled bool) { + existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] + if !hasExisting { + return + } + + var dup *structs.Evaluation + existingW, ok := b.captured[existingID] + if ok { + if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) { + delete(b.captured, existingID) + b.stats.TotalBlocked-- + dup = existingW.eval + } else { + dup = eval + newCancelled = true + } + } else { + existingW, ok = b.escaped[existingID] + if !ok { + // This is a programming error + b.logger.Error("existing blocked evaluation is neither tracked as captured or escaped", "existing_id", existingID) + delete(b.jobs, structs.NewNamespacedID(eval.JobID, eval.Namespace)) + return + } + + if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) { + delete(b.escaped, existingID) + b.stats.TotalEscaped-- + dup = existingW.eval + } else { + dup = eval + newCancelled = true + } + } + + b.duplicates = append(b.duplicates, dup) + + // Unblock any waiter. + select { + case b.duplicateCh <- struct{}{}: + default: + } + + return +} + +// latestEvalIndex returns the max of the evaluations create and snapshot index +func latestEvalIndex(eval *structs.Evaluation) uint64 { + if eval == nil { + return 0 + } + + return helper.Uint64Max(eval.CreateIndex, eval.SnapshotIndex) +} + // missedUnblock returns whether an evaluation missed an unblock while it was in // the scheduler. Since the scheduler can operate at an index in the past, the // evaluation may have been processed missing data that would allow it to @@ -291,7 +354,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool { // Untrack causes any blocked evaluation for the passed job to be no longer // tracked. Untrack is called when there is a successful evaluation for the job // and a blocked evaluation is no longer needed. -func (b *BlockedEvals) Untrack(jobID string) { +func (b *BlockedEvals) Untrack(jobID, namespace string) { b.l.Lock() defer b.l.Unlock() @@ -300,8 +363,10 @@ func (b *BlockedEvals) Untrack(jobID string) { return } + nsID := structs.NewNamespacedID(jobID, namespace) + // Get the evaluation ID to cancel - evalID, ok := b.jobs[jobID] + evalID, ok := b.jobs[nsID] if !ok { // No blocked evaluation so exit return @@ -309,7 +374,7 @@ func (b *BlockedEvals) Untrack(jobID string) { // Attempt to delete the evaluation if w, ok := b.captured[evalID]; ok { - delete(b.jobs, w.eval.JobID) + delete(b.jobs, nsID) delete(b.captured, evalID) b.stats.TotalBlocked-- if w.eval.QuotaLimitReached != "" { @@ -318,7 +383,7 @@ func (b *BlockedEvals) Untrack(jobID string) { } if w, ok := b.escaped[evalID]; ok { - delete(b.jobs, w.eval.JobID) + delete(b.jobs, nsID) delete(b.escaped, evalID) b.stats.TotalEscaped-- b.stats.TotalBlocked-- @@ -440,7 +505,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) { for id, wrapped := range b.escaped { unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) if wrapped.eval.QuotaLimitReached != "" { numQuotaLimit++ @@ -467,7 +532,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) { // is eligible based on the computed node class, or never seen the // computed node class. unblocked[wrapped.eval] = wrapped.token - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) delete(b.captured, id) if wrapped.eval.QuotaLimitReached != "" { numQuotaLimit++ @@ -502,7 +567,7 @@ func (b *BlockedEvals) UnblockFailed() { if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { unblocked[wrapped.eval] = wrapped.token delete(b.captured, id) - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) if wrapped.eval.QuotaLimitReached != "" { quotaLimit++ } @@ -513,7 +578,7 @@ func (b *BlockedEvals) UnblockFailed() { if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, wrapped.eval.JobID) + delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace)) b.stats.TotalEscaped -= 1 if wrapped.eval.QuotaLimitReached != "" { quotaLimit++ @@ -571,7 +636,7 @@ func (b *BlockedEvals) Flush() { b.stats.TotalQuotaLimit = 0 b.captured = make(map[string]wrappedEval) b.escaped = make(map[string]wrappedEval) - b.jobs = make(map[string]string) + b.jobs = make(map[structs.NamespacedID]string) b.unblockIndexes = make(map[string]uint64) b.timetable = nil b.duplicates = nil diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 009fadceb1c..8fe7db32f06 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -14,7 +15,7 @@ import ( func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) { broker := testBroker(t, 0) broker.SetEnabled(true) - blocked := NewBlockedEvals(broker) + blocked := NewBlockedEvals(broker, testlog.HCLogger(t)) blocked.SetEnabled(true) return blocked, broker } @@ -99,14 +100,20 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { // Create duplicate blocked evals and add them to the blocked tracker. e := mock.Eval() + e.CreateIndex = 100 e2 := mock.Eval() e2.JobID = e.JobID + e2.CreateIndex = 101 e3 := mock.Eval() e3.JobID = e.JobID + e3.CreateIndex = 102 + e4 := mock.Eval() + e4.JobID = e.JobID + e4.CreateIndex = 100 blocked.Block(e) blocked.Block(e2) - // Verify block did track both + // Verify stats such that we are only tracking one bStats := blocked.Stats() if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { t.Fatalf("bad: %#v", bStats) @@ -114,8 +121,8 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { // Get the duplicates. out := blocked.GetDuplicates(0) - if len(out) != 1 || !reflect.DeepEqual(out[0], e2) { - t.Fatalf("bad: %#v %#v", out, e2) + if len(out) != 1 || !reflect.DeepEqual(out[0], e) { + t.Fatalf("bad: %#v %#v", out, e) } // Call block again after a small sleep. @@ -126,9 +133,28 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) { // Get the duplicates. out = blocked.GetDuplicates(1 * time.Second) - if len(out) != 1 || !reflect.DeepEqual(out[0], e3) { + if len(out) != 1 || !reflect.DeepEqual(out[0], e2) { t.Fatalf("bad: %#v %#v", out, e2) } + + // Verify stats such that we are only tracking one + bStats = blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } + + // Add an older evaluation and assert it gets cancelled + blocked.Block(e4) + out = blocked.GetDuplicates(0) + if len(out) != 1 || !reflect.DeepEqual(out[0], e4) { + t.Fatalf("bad: %#v %#v", out, e4) + } + + // Verify stats such that we are only tracking one + bStats = blocked.Stats() + if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", bStats) + } } func TestBlockedEvals_UnblockEscaped(t *testing.T) { @@ -647,7 +673,7 @@ func TestBlockedEvals_Untrack(t *testing.T) { } // Untrack and verify - blocked.Untrack(e.JobID) + blocked.Untrack(e.JobID, e.Namespace) bStats = blocked.Stats() if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { t.Fatalf("bad: %#v", bStats) @@ -672,7 +698,7 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) { } // Untrack and verify - blocked.Untrack(e.JobID) + blocked.Untrack(e.JobID, e.Namespace) bs = blocked.Stats() if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 { t.Fatalf("bad: %#v", bs) diff --git a/nomad/fsm.go b/nomad/fsm.go index 0111f0c35ca..74c1e50f995 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -7,10 +7,9 @@ import ( "sync" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - - "github.com/armon/go-metrics" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -609,7 +608,7 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) { len(eval.FailedTGAllocs) == 0 { // If we have a successful evaluation for a node, untrack any // blocked evaluation - n.blockedEvals.Untrack(eval.JobID) + n.blockedEvals.Untrack(eval.JobID, eval.Namespace) } } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c8802bf9bd3..6b5f0c44b8b 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -48,11 +48,12 @@ func testStateStore(t *testing.T) *state.StateStore { func testFSM(t *testing.T) *nomadFSM { broker := testBroker(t, 0) dispatcher, _ := testPeriodicDispatcher(t) + logger := testlog.HCLogger(t) fsmConfig := &FSMConfig{ EvalBroker: broker, Periodic: dispatcher, - Blocked: NewBlockedEvals(broker), - Logger: testlog.HCLogger(t), + Blocked: NewBlockedEvals(broker, logger), + Logger: logger, Region: "global", } fsm, err := NewFSM(fsmConfig) diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 2475ab84487..dd02761e510 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -627,8 +627,10 @@ func TestLeader_ReapDuplicateEval(t *testing.T) { // Create a duplicate blocked eval eval := mock.Eval() + eval.CreateIndex = 100 eval2 := mock.Eval() eval2.JobID = eval.JobID + eval2.CreateIndex = 102 s1.blockedEvals.Block(eval) s1.blockedEvals.Block(eval2) @@ -636,7 +638,7 @@ func TestLeader_ReapDuplicateEval(t *testing.T) { state := s1.fsm.State() testutil.WaitForResult(func() (bool, error) { ws := memdb.NewWatchSet() - out, err := state.EvalByID(ws, eval2.ID) + out, err := state.EvalByID(ws, eval.ID) if err != nil { return false, err } diff --git a/nomad/server.go b/nomad/server.go index 4bb10f346fb..a3ea18860cb 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -15,14 +15,12 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/consul/agent/consul/autopilot" consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" lru "github.com/hashicorp/golang-lru" - raftboltdb "github.com/hashicorp/raft-boltdb" - - "github.com/hashicorp/consul/agent/consul/autopilot" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/codec" "github.com/hashicorp/nomad/helper/pool" @@ -35,6 +33,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/hashicorp/serf/serf" ) @@ -267,9 +266,6 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) return nil, err } - // Create a new blocked eval tracker. - blockedEvals := NewBlockedEvals(evalBroker) - // Configure TLS tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true) if err != nil { @@ -304,7 +300,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) reconcileCh: make(chan serf.Member, 32), eventCh: make(chan serf.Event, 256), evalBroker: evalBroker, - blockedEvals: blockedEvals, + blockedEvals: NewBlockedEvals(evalBroker, logger), rpcTLS: incomingTLS, aclCache: aclCache, shutdownCh: make(chan struct{}), @@ -401,7 +397,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) go s.planQueue.EmitStats(time.Second, s.shutdownCh) // Emit metrics for the blocked eval tracker. - go blockedEvals.EmitStats(time.Second, s.shutdownCh) + go s.blockedEvals.EmitStats(time.Second, s.shutdownCh) // Emit metrics for the Vault client. go s.vault.EmitStats(time.Second, s.shutdownCh) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1f269c4c062..6a48c38ea72 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -163,6 +163,14 @@ type NamespacedID struct { Namespace string } +// NewNamespacedID returns a new namespaced ID given the ID and namespace +func NewNamespacedID(id, ns string) NamespacedID { + return NamespacedID{ + ID: id, + Namespace: ns, + } +} + func (n NamespacedID) String() string { return fmt.Sprintf("", n.Namespace, n.ID) } diff --git a/nomad/worker.go b/nomad/worker.go index 75b20c1c774..19442f47cce 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -6,10 +6,9 @@ import ( "sync" "time" + "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - - "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" ) diff --git a/scheduler/context.go b/scheduler/context.go index 031a3b45a51..406f709c3ec 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -5,7 +5,6 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" ) @@ -240,16 +239,6 @@ func (e *EvalEligibility) HasEscaped() bool { func (e *EvalEligibility) GetClasses() map[string]bool { elig := make(map[string]bool) - // Go through the job. - for class, feas := range e.job { - switch feas { - case EvalComputedClassEligible: - elig[class] = true - case EvalComputedClassIneligible: - elig[class] = false - } - } - // Go through the task groups. for _, classes := range e.taskGroups { for class, feas := range classes { @@ -267,6 +256,21 @@ func (e *EvalEligibility) GetClasses() map[string]bool { } } + // Go through the job. + for class, feas := range e.job { + switch feas { + case EvalComputedClassEligible: + // Only mark as eligible if it hasn't been marked before. This + // prevents the job marking a class as eligible when it is ineligible + // to all the task groups. + if _, ok := elig[class]; !ok { + elig[class] = true + } + case EvalComputedClassIneligible: + elig[class] = false + } + } + return elig } diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 28416b04773..da12e5fcd9f 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -1,7 +1,6 @@ package scheduler import ( - "reflect" "testing" "github.com/hashicorp/nomad/helper/testlog" @@ -9,6 +8,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) func testContext(t testing.TB) (*state.StateStore, *EvalContext) { @@ -259,7 +259,7 @@ func TestEvalEligibility_GetClasses(t *testing.T) { e.SetTaskGroupEligibility(false, "fizz", "v1:3") expClasses := map[string]bool{ - "v1:1": true, + "v1:1": false, "v1:2": false, "v1:3": true, "v1:4": false, @@ -267,7 +267,27 @@ func TestEvalEligibility_GetClasses(t *testing.T) { } actClasses := e.GetClasses() - if !reflect.DeepEqual(actClasses, expClasses) { - t.Fatalf("GetClasses() returned %#v; want %#v", actClasses, expClasses) + require.Equal(t, expClasses, actClasses) +} +func TestEvalEligibility_GetClasses_JobEligible_TaskGroupIneligible(t *testing.T) { + e := NewEvalEligibility() + e.SetJobEligibility(true, "v1:1") + e.SetTaskGroupEligibility(false, "foo", "v1:1") + + e.SetJobEligibility(true, "v1:2") + e.SetTaskGroupEligibility(false, "foo", "v1:2") + e.SetTaskGroupEligibility(true, "bar", "v1:2") + + e.SetJobEligibility(true, "v1:3") + e.SetTaskGroupEligibility(false, "foo", "v1:3") + e.SetTaskGroupEligibility(false, "bar", "v1:3") + + expClasses := map[string]bool{ + "v1:1": false, + "v1:2": true, + "v1:3": false, } + + actClasses := e.GetClasses() + require.Equal(t, expClasses, actClasses) }