Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocked evaluation fixes #4867

Merged
merged 6 commits into from
Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 91 additions & 26 deletions nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand All @@ -29,6 +31,9 @@ const (
// allocations. It is unblocked when the capacity of a node that could run the
// failed allocation becomes available.
type BlockedEvals struct {
// logger is the logger to use by the blocked eval tracker.
logger log.Logger

evalBroker *EvalBroker
enabled bool
stats *BlockedStats
Expand All @@ -47,7 +52,7 @@ type BlockedEvals struct {

// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string
jobs map[structs.NamespacedID]string

// unblockIndexes maps computed node classes or quota name to the index in
// which they were unblocked. This is used to check if an evaluation could
Expand Down Expand Up @@ -102,12 +107,13 @@ type BlockedStats struct {

// NewBlockedEvals creates a new blocked eval tracker that will enqueue
// unblocked evals into the passed broker.
func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
return &BlockedEvals{
logger: logger.Named("blocked_evals"),
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]string),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -176,19 +182,11 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
return
}

// Check if the job already has a blocked evaluation. If it does add it to
// the list of duplicates. We only ever want one blocked evaluation per job,
// otherwise we would create unnecessary work for the scheduler as multiple
// evals for the same job would be run, all producing the same outcome.
if _, existing := b.jobs[eval.JobID]; existing {
b.duplicates = append(b.duplicates, eval)

// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}

// Handle the new evaluation being for a job we are already tracking.
if b.processBlockJobDuplicate(eval) {
// If process block job duplicate returns true, the new evaluation has
// been marked as a duplicate and we have nothing to do, so return
// early.
return
}

Expand All @@ -205,7 +203,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
}

// Mark the job as tracked.
b.jobs[eval.JobID] = eval.ID
b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)] = eval.ID
b.stats.TotalBlocked++

// Track that the evaluation is being added due to reaching the quota limit
Expand Down Expand Up @@ -234,6 +232,71 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
b.captured[eval.ID] = wrapped
}

// processBlockJobDuplicate handles the case where the new eval is for a job
// that we are already tracking. If the eval is a duplicate, we add the older
// evaluation by Raft index to the list of duplicates such that it can be
// cancelled. We only ever want one blocked evaluation per job, otherwise we
// would create unnecessary work for the scheduler as multiple evals for the
// same job would be run, all producing the same outcome. It is critical to
// prefer the newer evaluation, since it will contain the most up to date set of
// class eligibility. The return value is set to true, if the passed evaluation
// is cancelled. This should be called with the lock held.
func (b *BlockedEvals) processBlockJobDuplicate(eval *structs.Evaluation) (newCancelled bool) {
existingID, hasExisting := b.jobs[structs.NewNamespacedID(eval.JobID, eval.Namespace)]
if !hasExisting {
return
}

var dup *structs.Evaluation
existingW, ok := b.captured[existingID]
if ok {
if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.captured, existingID)
b.stats.TotalBlocked--
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
} else {
existingW, ok = b.escaped[existingID]
if !ok {
// This is a programming error
b.logger.Error("existing blocked evaluation is neither tracked as captured or escaped", "existing_id", existingID)
delete(b.jobs, structs.NewNamespacedID(eval.JobID, eval.Namespace))
return
}

if latestEvalIndex(existingW.eval) <= latestEvalIndex(eval) {
delete(b.escaped, existingID)
b.stats.TotalEscaped--
dup = existingW.eval
} else {
dup = eval
newCancelled = true
}
}

b.duplicates = append(b.duplicates, dup)

// Unblock any waiter.
select {
case b.duplicateCh <- struct{}{}:
default:
}

return
}

// latestEvalIndex returns the max of the evaluations create and snapshot index
func latestEvalIndex(eval *structs.Evaluation) uint64 {
if eval == nil {
return 0
}

return helper.Uint64Max(eval.CreateIndex, eval.SnapshotIndex)
}

// missedUnblock returns whether an evaluation missed an unblock while it was in
// the scheduler. Since the scheduler can operate at an index in the past, the
// evaluation may have been processed missing data that would allow it to
Expand Down Expand Up @@ -291,7 +354,7 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
func (b *BlockedEvals) Untrack(jobID, namespace string) {
b.l.Lock()
defer b.l.Unlock()

Expand All @@ -300,16 +363,18 @@ func (b *BlockedEvals) Untrack(jobID string) {
return
}

nsID := structs.NewNamespacedID(jobID, namespace)

// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
evalID, ok := b.jobs[nsID]
if !ok {
// No blocked evaluation so exit
return
}

// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
if w.eval.QuotaLimitReached != "" {
Expand All @@ -318,7 +383,7 @@ func (b *BlockedEvals) Untrack(jobID string) {
}

if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.jobs, nsID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
Expand Down Expand Up @@ -440,7 +505,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
for id, wrapped := range b.escaped {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))

if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
Expand All @@ -467,7 +532,7 @@ func (b *BlockedEvals) unblock(computedClass, quota string, index uint64) {
// is eligible based on the computed node class, or never seen the
// computed node class.
unblocked[wrapped.eval] = wrapped.token
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
delete(b.captured, id)
if wrapped.eval.QuotaLimitReached != "" {
numQuotaLimit++
Expand Down Expand Up @@ -502,7 +567,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.captured, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
}
Expand All @@ -513,7 +578,7 @@ func (b *BlockedEvals) UnblockFailed() {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, wrapped.eval.JobID)
delete(b.jobs, structs.NewNamespacedID(wrapped.eval.JobID, wrapped.eval.Namespace))
b.stats.TotalEscaped -= 1
if wrapped.eval.QuotaLimitReached != "" {
quotaLimit++
Expand Down Expand Up @@ -571,7 +636,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalQuotaLimit = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]string)
b.jobs = make(map[structs.NamespacedID]string)
b.unblockIndexes = make(map[string]uint64)
b.timetable = nil
b.duplicates = nil
Expand Down
40 changes: 33 additions & 7 deletions nomad/blocked_evals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
Expand All @@ -14,7 +15,7 @@ import (
func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) {
broker := testBroker(t, 0)
broker.SetEnabled(true)
blocked := NewBlockedEvals(broker)
blocked := NewBlockedEvals(broker, testlog.HCLogger(t))
blocked.SetEnabled(true)
return blocked, broker
}
Expand Down Expand Up @@ -99,23 +100,29 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {

// Create duplicate blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.CreateIndex = 100
e2 := mock.Eval()
e2.JobID = e.JobID
e2.CreateIndex = 101
e3 := mock.Eval()
e3.JobID = e.JobID
e3.CreateIndex = 102
e4 := mock.Eval()
e4.JobID = e.JobID
e4.CreateIndex = 100
blocked.Block(e)
blocked.Block(e2)

// Verify block did track both
// Verify stats such that we are only tracking one
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Get the duplicates.
out := blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
if len(out) != 1 || !reflect.DeepEqual(out[0], e) {
t.Fatalf("bad: %#v %#v", out, e)
}

// Call block again after a small sleep.
Expand All @@ -126,9 +133,28 @@ func TestBlockedEvals_GetDuplicates(t *testing.T) {

// Get the duplicates.
out = blocked.GetDuplicates(1 * time.Second)
if len(out) != 1 || !reflect.DeepEqual(out[0], e3) {
if len(out) != 1 || !reflect.DeepEqual(out[0], e2) {
t.Fatalf("bad: %#v %#v", out, e2)
}

// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}

// Add an older evaluation and assert it gets cancelled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test case for the other way around (add an eval thats older than the current one so that its marked as the duplicate)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e1-e3 are that, e4 tests that we can cancel the incoming

blocked.Block(e4)
out = blocked.GetDuplicates(0)
if len(out) != 1 || !reflect.DeepEqual(out[0], e4) {
t.Fatalf("bad: %#v %#v", out, e4)
}

// Verify stats such that we are only tracking one
bStats = blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}

func TestBlockedEvals_UnblockEscaped(t *testing.T) {
Expand Down Expand Up @@ -647,7 +673,7 @@ func TestBlockedEvals_Untrack(t *testing.T) {
}

// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
Expand All @@ -672,7 +698,7 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) {
}

// Untrack and verify
blocked.Untrack(e.JobID)
blocked.Untrack(e.JobID, e.Namespace)
bs = blocked.Stats()
if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 {
t.Fatalf("bad: %#v", bs)
Expand Down
5 changes: 2 additions & 3 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -609,7 +608,7 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) {
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
n.blockedEvals.Untrack(eval.JobID, eval.Namespace)
}
}

Expand Down
5 changes: 3 additions & 2 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ func testStateStore(t *testing.T) *state.StateStore {
func testFSM(t *testing.T) *nomadFSM {
broker := testBroker(t, 0)
dispatcher, _ := testPeriodicDispatcher(t)
logger := testlog.HCLogger(t)
fsmConfig := &FSMConfig{
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker),
Logger: testlog.HCLogger(t),
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
}
fsm, err := NewFSM(fsmConfig)
Expand Down
4 changes: 3 additions & 1 deletion nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,16 +627,18 @@ func TestLeader_ReapDuplicateEval(t *testing.T) {

// Create a duplicate blocked eval
eval := mock.Eval()
eval.CreateIndex = 100
eval2 := mock.Eval()
eval2.JobID = eval.JobID
eval2.CreateIndex = 102
s1.blockedEvals.Block(eval)
s1.blockedEvals.Block(eval2)

// Wait for the evaluation to marked as cancelled
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval2.ID)
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
return false, err
}
Expand Down
Loading