Skip to content

Commit

Permalink
Merge pull request #4847 from hashicorp/b-blocked-eval
Browse files Browse the repository at this point in the history
Blocked evaluation fixes
  • Loading branch information
dadgar authored Nov 8, 2018
2 parents ad15649 + 361d384 commit 1e3a20f
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 66 deletions.
111 changes: 83 additions & 28 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -29,6 +31,9 @@ const (
// allocations. It is unblocked when the capacity of a node that could run the
// failed allocation becomes available.
type BlockedEvals struct {
// logger is the logger to use by the blocked eval tracker.
logger log.Logger

evalBroker *EvalBroker
enabled bool
stats *BlockedStats
Expand All @@ -47,7 +52,7 @@ type BlockedEvals struct {

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string
jobs map[structs.NamespacedID]string

// unblockIndexes maps computed node classes or quota name to the index in
// which they were unblocked. This is used to check if an evaluation could
Expand Down Expand Up @@ -102,12 +107,13 @@ type BlockedStats struct {

// NewBlockedEvals creates a new blocked eval tracker that will enqueue
// unblocked evals into the passed broker.
func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
return &BlockedEvals{
logger: logger.Named("blocked_evals"),
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]string),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -176,21 +182,8 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
return
}

// Check if the job already has a blocked evaluation. If it does add it to
// the list of duplicates. We only ever want one blocked evaluation per job,
// otherwise we would create unnecessary work for the scheduler as multiple
// evals for the same job would be run, all producing the same outcome.
if _, existing := b.jobs[eval.JobID]; existing {
b.duplicates = append(b.duplicates, eval)

// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}

return
}
// Handle the new evaluation being for a job we are already tracking.
b.processBlockJobDuplicate(eval)

// Check if the eval missed an unblock while it was in the scheduler at an
// older index. The scheduler could have been invoked with a snapshot of
Expand All @@ -205,7 +198,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
}

// Mark the job as tracked.
b.jobs[eval.JobID] = eval.ID
b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID
b.stats.TotalBlocked++

// Track that the evaluation is being added due to reaching the quota limit
Expand Down Expand Up @@ -234,6 +227,66 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
b.captured[eval.ID] = wrapped
}

// processBlockJobDuplicate handles the case where the new eval is for a job
// that we are already tracking. If the eval is a duplicate, we add the older
// evaluation by Raft index to the list of duplicates such that it can be
// cancelled. We only ever want one blocked evaluation per job, otherwise we
// would create unnecessary work for the scheduler as multiple evals for the
// same job would be run, all producing the same outcome. It is critical to
// prefer the newer evaluation, since it will contain the most up to date set of
// class eligibility. This should be called with the lock held.
func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) {
existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)]
if !hasExisting {
return
}

var dup *structs.Evaluation
existingW, ok := b.captured[existingID]
if ok {
if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.captured, existingID)
b.stats.TotalBlocked--
dup = existingW.eval
} else {
dup = eval
}
} else {
existingW, ok = b.escaped[existingID]
if !ok {
// This is a programming error
b.logger.Error("existing blocked evaluation is neither tracked as captured or escaped", "existing_id", existingID)
delete(b.jobs, structs.NewNamespacedID(eval.JobID, eval.Namespace))
return
}

if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.escaped, existingID)
b.stats.TotalEscaped--
dup = existingW.eval
} else {
dup = eval
}
}

b.duplicates = append(b.duplicates, dup)

// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}
}

// latestEvalIndex returns the max of the evaluations create and snapshot index
func latestEvalIndex(eval *structs.Evaluation) uint64 {
if eval == nil {
return 0
}

return helper.Uint64Max(eval.CreateIndex, eval.SnapshotIndex)
}

// missedUnblock returns whether an evaluation missed an unblock while it was in
// the scheduler. Since the scheduler can operate at an index in the past, the
// evaluation may have been processed missing data that would allow it to
Expand Down Expand Up @@ -291,7 +344,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
func (b *BlockedEvals) Untrack(jobID, namespace string) {
b.l.Lock()
defer b.l.Unlock()

Expand All @@ -300,16 +353,18 @@ func (b *BlockedEvals) Untrack(jobID string) {
return
}

nsID := structs.NewNamespacedID(jobID, namespace)

// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
evalID, ok := b.jobs[nsID]
if !ok {
// No blocked evaluation so exit
return
}

// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
if w.eval.QuotaLimitReached != "" {
Expand All @@ -318,7 +373,7 @@ func (b *BlockedEvals) Untrack(jobID string) {
}

if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
Expand Down Expand Up @@ -440,7 +495,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
for id, wrapped := range b.escaped {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))

if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
Expand All @@ -467,7 +522,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
// is eligible based on the computed node class, or never seen the
// computed node class.
unblocked[wrapped.eval] = wrapped.token
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
delete(b.captured, id)
if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
Expand Down Expand Up @@ -502,7 +557,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.captured, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
}
Expand All @@ -513,7 +568,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
b.stats.TotalEscaped -= 1
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
Expand Down Expand Up @@ -571,7 +626,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalQuotaLimit = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]string)
b.jobs = make(map[structs.NamespacedID]string)
b.unblockIndexes = make(map[string]uint64)
b.timetable = nil
b.duplicates = nil
Expand Down
26 changes: 20 additions & 6 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
Expand All @@ -14,7 +15,7 @@ import (
func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) {
broker := testBroker(t, 0)
broker.SetEnabled(true)
blocked := NewBlockedEvals(broker)
blocked := NewBlockedEvals(broker, testlog.HCLogger(t))
blocked.SetEnabled(true)
return blocked, broker
}
Expand Down Expand Up @@ -99,10 +100,16 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {

// Create duplicate blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.CreateIndex = 100
e2 := mock.Eval()
e2.JobID = e.JobID
e2.CreateIndex = 101
e3 := mock.Eval()
e3.JobID = e.JobID
e3.CreateIndex = 102
e4 := mock.Eval()
e4.JobID = e.JobID
e4.CreateIndex = 100
blocked.Block(e)
blocked.Block(e2)

Expand All @@ -114,8 +121,8 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {

// Get the duplicates.
out := blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
if len(out) != 1 || !reflect.DeepEqual(out[0], e) {
t.Fatalf("bad: %#v %#v", out, e)
}

// Call block again after a small sleep.
Expand All @@ -126,9 +133,16 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {

// Get the duplicates.
out = blocked.GetDuplicates(1 * time.Second)
if len(out) != 1 || !reflect.DeepEqual(out[0], e3) {
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
}

// Add an older evaluation and assert it gets cancelled
blocked.Block(e4)
out = blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e4) {
t.Fatalf("bad: %#v %#v", out, e4)
}
}

func TestBlockedEvals_UnblockEscaped(t *testing.T) {
Expand Down Expand Up @@ -647,7 +661,7 @@ func TestBlockedEvals_Untrack(t *testing.T) {
}

// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
Expand All @@ -672,7 +686,7 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) {
}

// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bs = blocked.Stats()
if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 {
t.Fatalf("bad: %#v", bs)
Expand Down
5 changes: 2 additions & 3 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -609,7 +608,7 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
n.blockedEvals.Untrack(eval.JobID, eval.Namespace)
}
}

Expand Down
5 changes: 3 additions & 2 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ func testStateStore(t *testing.T) *state.StateStore {
func testFSM(t *testing.T) *nomadFSM {
broker := testBroker(t, 0)
dispatcher, _ := testPeriodicDispatcher(t)
logger := testlog.HCLogger(t)
fsmConfig := &FSMConfig{
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker),
Logger: testlog.HCLogger(t),
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
}
fsm, err := NewFSM(fsmConfig)
Expand Down
4 changes: 3 additions & 1 deletion nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,16 +627,18 @@ func TestLeader_ReapDuplicateEval(t *testing.T) {

// Create a duplicate blocked eval
eval := mock.Eval()
eval.CreateIndex = 100
eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.CreateIndex = 102
s1.blockedEvals.Block(eval)
s1.blockedEvals.Block(eval2)

// Wait for the evaluation to marked as cancelled
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval2.ID)
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 1e3a20f

Please sign in to comment.