Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eval Broker: Prevent redundant enqueue's when a node is not a leader #5699

Merged
merged 4 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func (b *EvalBroker) Enabled() bool {
// should only be enabled on the active leader.
func (b *EvalBroker) SetEnabled(enabled bool) {
b.l.Lock()
defer b.l.Unlock()

prevEnabled := b.enabled
b.enabled = enabled
if !prevEnabled && enabled {
Expand All @@ -169,7 +171,7 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
b.delayedEvalCancelFunc = cancel
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh)
}
b.l.Unlock()

if !enabled {
b.flush()
}
Expand Down Expand Up @@ -208,6 +210,11 @@ func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
// outstanding, the evaluation is blocked until an Ack/Nack is received.
// processEnqueue must be called with the lock held.
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
// If we're not enabled, don't enable more queuing.
if !b.enabled {
return
}

// Check if already enqueued
if _, ok := b.evals[eval.ID]; ok {
if token == "" {
Expand Down Expand Up @@ -259,8 +266,10 @@ func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
b.l.Lock()
defer b.l.Unlock()

delete(b.timeWait, eval.ID)
b.stats.TotalWaiting -= 1

b.enqueueLocked(eval, eval.Type)
}

Expand Down Expand Up @@ -678,11 +687,9 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
return nil
}

// Flush is used to clear the state of the broker
// Flush is used to clear the state of the broker. It must be called from within
// the lock.
func (b *EvalBroker) flush() {
b.l.Lock()
defer b.l.Unlock()

// Unblock any waiters
for _, waitCh := range b.waiting {
close(waitCh)
Expand Down Expand Up @@ -778,13 +785,13 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time.
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
b.l.RLock()
defer b.l.RUnlock()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a unit test in eval_broker_test.go. Suggestion - could create two eval brokers with one of them enabled, and then switch to disabling while enabling the other one in another goroutine. It should verify that the flush method drained everything on the previously enabled eval broker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be pretty hard to actually validate this on CI because we set GOMAXPROCS to one.

We’d only need a single broker in a test though bc they don’t interact with each other?

// If there is nothing wait for an update.
if b.delayHeap.Length() == 0 {
b.l.RUnlock()
return nil, time.Time{}
}
nextEval := b.delayHeap.Peek()
b.l.RUnlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock was originally unlocked here rather than using defer to avoid holding on to it after peeking into the heap. I am not certain this was the root cause of the non leader enqueues. Was this more of a clarity fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entirely a clarity fix - I can revert, I thought pulling out the eval would be fast enough that it’s not a big deal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its fine to leave it as is, the new lines of execution included in the lock's scope are not that expensive.

if nextEval == nil {
return nil, time.Time{}
}
Expand Down
58 changes: 58 additions & 0 deletions nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,64 @@ func TestEvalBroker_Enqueue_Disable(t *testing.T) {
}
}

func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) {
t.Parallel()
b := testBroker(t, 0)
baseEval := mock.Eval()
b.SetEnabled(true)

{
// Enqueue
b.Enqueue(baseEval.Copy())

delayedEval := baseEval.Copy()
delayedEval.Wait = 30
b.Enqueue(delayedEval)

waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}

// Flush via SetEnabled
b.SetEnabled(false)

{
// Check the stats
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}

{
// Enqueue again now we're disabled
b.Enqueue(baseEval.Copy())

delayedEval := baseEval.Copy()
delayedEval.Wait = 30 * time.Second
b.Enqueue(delayedEval)

waitEval := baseEval.Copy()
waitEval.WaitUntil = time.Now().Add(30 * time.Second)
b.Enqueue(waitEval)
}

{
// Check the stats again
stats := b.Stats()
require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed")
require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed")
require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed")
require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed")
_, ok := stats.ByScheduler[baseEval.Type]
require.False(t, ok, "Expected scheduler to have no stats")
}
}

func TestEvalBroker_Dequeue_Timeout(t *testing.T) {
t.Parallel()
b := testBroker(t, 0)
Expand Down