Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix server shutdown not waiting for worker run completion #19560

32 changes: 3 additions & 29 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/hashicorp/nomad/helper/goruntime"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/tlsutil"
Expand Down Expand Up @@ -259,7 +260,7 @@ type Client struct {

// shutdownGroup are goroutines that exit when shutdownCh is closed.
// Shutdown() blocks on Wait() after closing shutdownCh.
shutdownGroup group
shutdownGroup group.Group

// tokensClient is Nomad Client's custom Consul client for requesting Consul
// Service Identity tokens through Nomad Server.
Expand Down Expand Up @@ -876,7 +877,7 @@ func (c *Client) Shutdown() error {
// Stop Garbage collector
c.garbageCollector.Stop()

arGroup := group{}
arGroup := group.Group{}
if c.GetConfig().DevMode {
// In DevMode destroy all the running allocations.
for _, ar := range c.getAllocRunners() {
Expand Down Expand Up @@ -3390,33 +3391,6 @@ func (c *Client) GetTaskEventHandler(allocID, taskName string) drivermanager.Eve
return nil
}

// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type group struct {
wg sync.WaitGroup
}

// Go starts f in a goroutine and must be called before Wait.
func (g *group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}

func (g *group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}

// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *group) Wait() {
g.wg.Wait()
}

// pendingClientUpdates are the set of allocation updates that the client is
// waiting to send
type pendingClientUpdates struct {
Expand Down
23 changes: 18 additions & 5 deletions helper/broker/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package broker

import (
"context"
"time"

"github.com/hashicorp/nomad/helper"
Expand All @@ -21,15 +22,18 @@ type GenericNotifier struct {
// subscription membership mapping.
subscribeCh chan chan interface{}
unsubscribeCh chan chan interface{}

ctx context.Context
}

// NewGenericNotifier returns a generic notifier which can be used by a process
// to notify many subscribers when a specific update is triggered.
func NewGenericNotifier() *GenericNotifier {
func NewGenericNotifier(ctx context.Context) *GenericNotifier {
return &GenericNotifier{
publishCh: make(chan interface{}, 1),
subscribeCh: make(chan chan interface{}, 1),
unsubscribeCh: make(chan chan interface{}, 1),
ctx: ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a question for the general Nomad maintainers, but my prior is that the Notifier is designed to be context-free and focus on channels. The shutdown channel being passed into Run was inline with that design decision and I think we could continue with it here. Is there a deeper reason for passing the context in that I'm missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I ran into with that was that I needed a way to coordinate between WaitForChange and Run.

In particular, the race condition that I encountered was:

  • WaitForChange tries to write to the unsubscribeCh and blocks because it is full
  • In Run because shutdownCh was closed and the function returns
  • Since Run has terminated nothing reads from unsubscribeCh and causes WaitForChange to block indefinitely

So, I think we need a way to have WaitForChange to unblock when it detects that the notifier has shut down. I suppose we could also represent this with another shutdownCh for the notifier itself, but I'm not sure which one is more idiomatic. Hopefully maintainers can chime in on which approach they prefer (or if there is a better way to do this!)

Copy link
Contributor

@stswidwinski stswidwinski Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that I understand why using ctx here resolves the race which you describe. Doesn't it still occur based on subscribeCh in the same way, except on a different channel:

  1. Run shuts down because shutdownCtx was closed
  2. WaitForChange tries to write to subscribeCh and blocks because it is full (line 90)
  3. We block on line 90

I think the scenario in thish the context does help as you imply is one in which we may miss the notification on the shutdownCh and thus hang awaiting that event. That is the scenario of:

  1. WaitForChange times out
  2. shutdownCh is signalled
  3. Run quits
  4. WaitForChange is entered again
  5. We block as you imply

In this scenario 4 misses the notification on 2 because it wasn't actually awaiting a signal (my limited understanding of channels is such that they do not buffer events but rather notify the subscribers at the time of emission). This is where the context allows you to carry a signal in a more persistent way which ensures that this other race cannot happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I missed the other write to subscribeCh on L90. I'll push a fix.

we may miss the notification on the shutdownCh and thus hang awaiting that event

shutdownCh is closed when shutdown starts (rather than signaled). So, I don't think that's possible.

The race I was thinking is more long the lines of the producer not realizing that the consumer has stopped, and thus blocks indefinitely waiting for the consumer to do work.

I think in the scenario you described, the events I'm worried about is:

  1. WaitForChange blocks waiting for timeout
  2. shutdownCh is closed
  3. Run quits
  4. WaitForChange hits the timeout and unblocks, then as part of the deferred function it tries to to write to unsubscribeCh and gets blocked because it is full
  5. Because Run has quit, nothing reads from unsubscribeCh and so WaitForChange blocks forever

So, the shared context acts as a way for the producer to detect that the consumer has (or is soon going to) shut down and to not block waiting for it to do work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I missed the other write to subscribeCh on L90. I'll push a fix.

👍

shutdownCh is closed when shutdown starts (rather than signaled). So, I don't think that's possible.

Right, my bad. Makes sense :)

(my limited understanding of channels is such that they do not buffer events but rather notify the subscribers at the time of emission). This is where the context allows you to carry a signal in a more persistent way which ensures that this other race cannot happen

Yup +1. I think this is what I had in mind, but I failed to describe it (you've captured it better). Agreed on the problem, up to the Hashi team to decide which way they would rather go :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a question for the general Nomad maintainers, but my prior is that the Notifier is designed to be context-free and focus on channels

FWIW the channel-heavy pattern is an unfortunate result of Nomad predating the context package by about a year. It would be nice to go back and clean things up but that was never a priority, and now it's channels all the way down.

}
}

Expand All @@ -46,7 +50,7 @@ func (g *GenericNotifier) Notify(msg interface{}) {
// Run is a long-lived process which handles updating subscribers as well as
// ensuring any update is sent to them. The passed stopCh is used to coordinate
// shutdown.
func (g *GenericNotifier) Run(stopCh <-chan struct{}) {
func (g *GenericNotifier) Run() {

// Store our subscribers inline with a map. This map can only be accessed
// via a single channel update at a time, meaning we can manage without
Expand All @@ -55,7 +59,7 @@ func (g *GenericNotifier) Run(stopCh <-chan struct{}) {

for {
select {
case <-stopCh:
case <-g.ctx.Done():
return
case msgCh := <-g.subscribeCh:
subscribers[msgCh] = struct{}{}
Expand Down Expand Up @@ -83,7 +87,11 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// Create a channel and subscribe to any update. This channel is buffered
// to ensure we do not block the main broker process.
updateCh := make(chan interface{}, 1)
g.subscribeCh <- updateCh
select {
case <-g.ctx.Done():
return "shutting down"
case g.subscribeCh <- updateCh:
}

// Create a timeout timer and use the helper to ensure this routine doesn't
// panic and making the stop call clear.
Expand All @@ -93,14 +101,19 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// subscriber once it has been notified of a change, or reached its wait
// timeout.
defer func() {
g.unsubscribeCh <- updateCh
select {
case <-g.ctx.Done():
case g.unsubscribeCh <- updateCh:
}
close(updateCh)
timeoutStop()
}()

// Enter the main loop which listens for an update or timeout and returns
// this information to the subscriber.
select {
case <-g.ctx.Done():
return "shutting down"
case <-timeoutTimer.C:
return "wait timed out after " + timeout.String()
case update := <-updateCh:
Expand Down
9 changes: 5 additions & 4 deletions helper/broker/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package broker

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -16,11 +17,11 @@ func TestGenericNotifier(t *testing.T) {
ci.Parallel(t)

// Create the new notifier.
stopChan := make(chan struct{})
defer close(stopChan)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

notifier := NewGenericNotifier()
go notifier.Run(stopChan)
notifier := NewGenericNotifier(ctx)
go notifier.Run()

// Ensure we have buffered channels.
require.Equal(t, 1, cap(notifier.publishCh))
Expand Down
50 changes: 50 additions & 0 deletions helper/group/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package group

import (
"context"
"sync"
)

// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type Group struct {
wg sync.WaitGroup
}

// Go starts f in a goroutine and must be called before Wait.
func (g *Group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}

func (g *Group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}

// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *Group) Wait() {
g.wg.Wait()
}

// Wait for all goroutines to exit, or for the context to finish.
// Must be called after all calls to Go complete.
func (g *Group) WaitWithContext(ctx context.Context) {
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
g.Wait()
}()
select {
case <-doneCh:
case <-ctx.Done():
}
}
3 changes: 2 additions & 1 deletion helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package raftutil

import (
"context"
"fmt"
"io"
"path/filepath"
Expand Down Expand Up @@ -79,7 +80,7 @@ func dummyFSM(logger hclog.Logger) (nomadFSM, error) {
// use dummy non-enabled FSM dependencies
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
blockedEvals := nomad.NewBlockedEvals(nil, logger)
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
evalBroker, err := nomad.NewEvalBroker(context.Background(), 1, 1, 1, 1)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ type PendingEvaluations []*structs.Evaluation
// initialNackDelay is the delay before making a Nacked evaluation available
// again for the first Nack and subsequentNackDelay is the compounding delay
// after the first Nack.
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
func NewEvalBroker(ctx context.Context, timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
if timeout < 0 {
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
enabledNotifier: broker.NewGenericNotifier(ctx),
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
Expand Down
3 changes: 2 additions & 1 deletion nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package nomad

import (
"container/heap"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -54,7 +55,7 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
}

func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker {
b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
b, err := NewEvalBroker(context.Background(), c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
48 changes: 33 additions & 15 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/goruntime"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/iterator"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/tlsutil"
Expand Down Expand Up @@ -86,6 +87,10 @@ const (
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second

// workerShutdownGracePeriod is the maximum time we will wait for workers to stop
// gracefully when the server shuts down
workerShutdownGracePeriod = 5 * time.Second

// defaultConsulDiscoveryInterval is how often to poll Consul for new
// servers if there is no leader.
defaultConsulDiscoveryInterval time.Duration = 3 * time.Second
Expand Down Expand Up @@ -263,6 +268,10 @@ type Server struct {
workerConfigLock sync.RWMutex
workersEventCh chan interface{}

// workerShutdownGroup tracks the running worker goroutines so that Shutdown()
// can wait on their completion
workerShutdownGroup group.Group

// oidcProviderCache maintains a cache of OIDC providers. This is useful as
// the provider performs background HTTP requests. When the Nomad server is
// shutting down, the oidcProviderCache.Shutdown() function must be called.
Expand Down Expand Up @@ -315,17 +324,6 @@ type Server struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc consul.ConfigAPIFunc, consulACLs consul.ACLsAPI) (*Server, error) {

// Create an eval broker
evalBroker, err := NewEvalBroker(
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}

// Configure TLS
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true)
if err != nil {
Expand Down Expand Up @@ -361,9 +359,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
reconcileCh: make(chan serf.Member, 32),
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
reapCancelableEvalsCh: make(chan struct{}),
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
workersEventCh: make(chan interface{}, 1),
lockTTLTimer: lock.NewTTLTimer(),
Expand All @@ -373,6 +369,21 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()

// Create an eval broker
evalBroker, err := NewEvalBroker(
s.shutdownCtx,
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}
s.evalBroker = evalBroker

// Create the blocked evals
s.blockedEvals = NewBlockedEvals(s.evalBroker, s.logger)

// Create the RPC handler
s.rpcHandler = newRpcHandler(s)

Expand Down Expand Up @@ -494,7 +505,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc

// Start the eval broker notification system so any subscribers can get
// updates when the processes SetEnabled is triggered.
go s.evalBroker.enabledNotifier.Run(s.shutdownCh)
go s.evalBroker.enabledNotifier.Run()

// Setup the node drainer.
s.setupNodeDrainer()
Expand Down Expand Up @@ -711,6 +722,13 @@ func (s *Server) Shutdown() error {
s.shutdown = true
s.shutdownCancel()

s.workerLock.Lock()
defer s.workerLock.Unlock()
s.stopOldWorkers(s.workers)
workerShutdownTimeoutCtx, cancelWorkerShutdownTimeoutCtx := context.WithTimeout(context.Background(), workerShutdownGracePeriod)
defer cancelWorkerShutdownTimeoutCtx()
s.workerShutdownGroup.WaitWithContext(workerShutdownTimeoutCtx)

if s.serf != nil {
s.serf.Shutdown()
}
Expand Down Expand Up @@ -1788,7 +1806,7 @@ func (s *Server) setupWorkersLocked(ctx context.Context, poolArgs SchedulerWorke
return err
} else {
s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers)

s.workerShutdownGroup.AddCh(w.ShutdownCh())
s.workers = append(s.workers, w)
}
}
Expand Down
Loading
Loading