-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eval Broker: Prevent redundant enqueue's when a node is not a leader #5699
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -161,6 +161,8 @@ func (b *EvalBroker) Enabled() bool { | |
// should only be enabled on the active leader. | ||
func (b *EvalBroker) SetEnabled(enabled bool) { | ||
b.l.Lock() | ||
defer b.l.Unlock() | ||
|
||
prevEnabled := b.enabled | ||
b.enabled = enabled | ||
if !prevEnabled && enabled { | ||
|
@@ -169,7 +171,7 @@ func (b *EvalBroker) SetEnabled(enabled bool) { | |
b.delayedEvalCancelFunc = cancel | ||
go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh) | ||
} | ||
b.l.Unlock() | ||
|
||
if !enabled { | ||
b.flush() | ||
} | ||
|
@@ -208,6 +210,11 @@ func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) { | |
// outstanding, the evaluation is blocked until an Ack/Nack is received. | ||
// processEnqueue must be called with the lock held. | ||
func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { | ||
// If we're not enabled, don't enable more queuing. | ||
if !b.enabled { | ||
return | ||
} | ||
|
||
// Check if already enqueued | ||
if _, ok := b.evals[eval.ID]; ok { | ||
if token == "" { | ||
|
@@ -259,8 +266,10 @@ func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) { | |
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { | ||
b.l.Lock() | ||
defer b.l.Unlock() | ||
|
||
delete(b.timeWait, eval.ID) | ||
b.stats.TotalWaiting -= 1 | ||
|
||
b.enqueueLocked(eval, eval.Type) | ||
} | ||
|
||
|
@@ -678,11 +687,9 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error { | |
return nil | ||
} | ||
|
||
// Flush is used to clear the state of the broker | ||
// Flush is used to clear the state of the broker. It must be called from within | ||
// the lock. | ||
func (b *EvalBroker) flush() { | ||
b.l.Lock() | ||
defer b.l.Unlock() | ||
|
||
// Unblock any waiters | ||
for _, waitCh := range b.waiting { | ||
close(waitCh) | ||
|
@@ -778,13 +785,13 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan | |
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time. | ||
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) { | ||
b.l.RLock() | ||
defer b.l.RUnlock() | ||
|
||
// If there is nothing wait for an update. | ||
if b.delayHeap.Length() == 0 { | ||
b.l.RUnlock() | ||
return nil, time.Time{} | ||
} | ||
nextEval := b.delayHeap.Peek() | ||
b.l.RUnlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lock was originally unlocked here rather than using defer to avoid holding on to it after peeking into the heap. I am not certain this was the root cause of the non leader enqueues. Was this more of a clarity fix? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Entirely a clarity fix - I can revert, I thought pulling out the eval would be fast enough that it’s not a big deal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its fine to leave it as is, the new lines of execution included in the lock's scope are not that expensive. |
||
if nextEval == nil { | ||
return nil, time.Time{} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a unit test in eval_broker_test.go. Suggestion - could create two eval brokers with one of them enabled, and then switch to disabling while enabling the other one in another goroutine. It should verify that the
flush
method drained everything on the previously enabled eval brokerThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be pretty hard to actually validate this on CI because we set GOMAXPROCS to one.
We’d only need a single broker in a test though bc they don’t interact with each other?