Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Fix server shutdown not waiting for worker run completion into release/1.6.x #19635

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/19560.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: Fix server not waiting for workers to submit nacks for dequeued evaluations before shutting down
```
32 changes: 3 additions & 29 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/pool"
hstats "github.com/hashicorp/nomad/helper/stats"
Expand Down Expand Up @@ -255,7 +256,7 @@ type Client struct {

// shutdownGroup are goroutines that exit when shutdownCh is closed.
// Shutdown() blocks on Wait() after closing shutdownCh.
shutdownGroup group
shutdownGroup group.Group

// tokensClient is Nomad Client's custom Consul client for requesting Consul
// Service Identity tokens through Nomad Server.
Expand Down Expand Up @@ -840,7 +841,7 @@ func (c *Client) Shutdown() error {
// Stop Garbage collector
c.garbageCollector.Stop()

arGroup := group{}
arGroup := group.Group{}
if c.GetConfig().DevMode {
// In DevMode destroy all the running allocations.
for _, ar := range c.getAllocRunners() {
Expand Down Expand Up @@ -3354,33 +3355,6 @@ func (c *Client) GetTaskEventHandler(allocID, taskName string) drivermanager.Eve
return nil
}

// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type group struct {
wg sync.WaitGroup
}

// Go starts f in a goroutine and must be called before Wait.
func (g *group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}

func (g *group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}

// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *group) Wait() {
g.wg.Wait()
}

// pendingClientUpdates are the set of allocation updates that the client is
// waiting to send
type pendingClientUpdates struct {
Expand Down
23 changes: 18 additions & 5 deletions helper/broker/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package broker

import (
"context"
"time"

"github.com/hashicorp/nomad/helper"
Expand All @@ -21,15 +22,18 @@ type GenericNotifier struct {
// subscription membership mapping.
subscribeCh chan chan interface{}
unsubscribeCh chan chan interface{}

ctx context.Context
}

// NewGenericNotifier returns a generic notifier which can be used by a process
// to notify many subscribers when a specific update is triggered.
func NewGenericNotifier() *GenericNotifier {
func NewGenericNotifier(ctx context.Context) *GenericNotifier {
return &GenericNotifier{
publishCh: make(chan interface{}, 1),
subscribeCh: make(chan chan interface{}, 1),
unsubscribeCh: make(chan chan interface{}, 1),
ctx: ctx,
}
}

Expand All @@ -46,7 +50,7 @@ func (g *GenericNotifier) Notify(msg interface{}) {
// Run is a long-lived process which handles updating subscribers as well as
// ensuring any update is sent to them. The passed stopCh is used to coordinate
// shutdown.
func (g *GenericNotifier) Run(stopCh <-chan struct{}) {
func (g *GenericNotifier) Run() {

// Store our subscribers inline with a map. This map can only be accessed
// via a single channel update at a time, meaning we can manage without
Expand All @@ -55,7 +59,7 @@ func (g *GenericNotifier) Run(stopCh <-chan struct{}) {

for {
select {
case <-stopCh:
case <-g.ctx.Done():
return
case msgCh := <-g.subscribeCh:
subscribers[msgCh] = struct{}{}
Expand Down Expand Up @@ -83,7 +87,11 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// Create a channel and subscribe to any update. This channel is buffered
// to ensure we do not block the main broker process.
updateCh := make(chan interface{}, 1)
g.subscribeCh <- updateCh
select {
case <-g.ctx.Done():
return "shutting down"
case g.subscribeCh <- updateCh:
}

// Create a timeout timer and use the helper to ensure this routine doesn't
// panic and making the stop call clear.
Expand All @@ -93,14 +101,19 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// subscriber once it has been notified of a change, or reached its wait
// timeout.
defer func() {
g.unsubscribeCh <- updateCh
select {
case <-g.ctx.Done():
case g.unsubscribeCh <- updateCh:
}
close(updateCh)
timeoutStop()
}()

// Enter the main loop which listens for an update or timeout and returns
// this information to the subscriber.
select {
case <-g.ctx.Done():
return "shutting down"
case <-timeoutTimer.C:
return "wait timed out after " + timeout.String()
case update := <-updateCh:
Expand Down
9 changes: 5 additions & 4 deletions helper/broker/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package broker

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -16,11 +17,11 @@ func TestGenericNotifier(t *testing.T) {
ci.Parallel(t)

// Create the new notifier.
stopChan := make(chan struct{})
defer close(stopChan)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

notifier := NewGenericNotifier()
go notifier.Run(stopChan)
notifier := NewGenericNotifier(ctx)
go notifier.Run()

// Ensure we have buffered channels.
require.Equal(t, 1, cap(notifier.publishCh))
Expand Down
50 changes: 50 additions & 0 deletions helper/group/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package group

import (
"context"
"sync"
)

// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type Group struct {
wg sync.WaitGroup
}

// Go starts f in a goroutine and must be called before Wait.
func (g *Group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}

func (g *Group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}

// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *Group) Wait() {
g.wg.Wait()
}

// Wait for all goroutines to exit, or for the context to finish.
// Must be called after all calls to Go complete.
func (g *Group) WaitWithContext(ctx context.Context) {
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
g.Wait()
}()
select {
case <-doneCh:
case <-ctx.Done():
}
}
3 changes: 2 additions & 1 deletion helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package raftutil

import (
"context"
"fmt"
"io"
"path/filepath"
Expand Down Expand Up @@ -79,7 +80,7 @@ func dummyFSM(logger hclog.Logger) (nomadFSM, error) {
// use dummy non-enabled FSM dependencies
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
blockedEvals := nomad.NewBlockedEvals(nil, logger)
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
evalBroker, err := nomad.NewEvalBroker(context.Background(), 1, 1, 1, 1)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ type PendingEvaluations []*structs.Evaluation
// initialNackDelay is the delay before making a Nacked evaluation available
// again for the first Nack and subsequentNackDelay is the compounding delay
// after the first Nack.
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
func NewEvalBroker(ctx context.Context, timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
if timeout < 0 {
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
enabledNotifier: broker.NewGenericNotifier(ctx),
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
Expand Down
3 changes: 2 additions & 1 deletion nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package nomad

import (
"container/heap"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -54,7 +55,7 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
}

func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker {
b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
b, err := NewEvalBroker(context.Background(), c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
47 changes: 33 additions & 14 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/helper/tlsutil"
Expand Down Expand Up @@ -83,6 +84,10 @@ const (
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second

// workerShutdownGracePeriod is the maximum time we will wait for workers to stop
// gracefully when the server shuts down
workerShutdownGracePeriod = 5 * time.Second

// defaultConsulDiscoveryInterval is how often to poll Consul for new
// servers if there is no leader.
defaultConsulDiscoveryInterval time.Duration = 3 * time.Second
Expand Down Expand Up @@ -265,6 +270,10 @@ type Server struct {
// aclCache is used to maintain the parsed ACL objects
aclCache *structs.ACLCache[*acl.ACL]

// workerShutdownGroup tracks the running worker goroutines so that Shutdown()
// can wait on their completion
workerShutdownGroup group.Group

// oidcProviderCache maintains a cache of OIDC providers. This is useful as
// the provider performs background HTTP requests. When the Nomad server is
// shutting down, the oidcProviderCache.Shutdown() function must be called.
Expand Down Expand Up @@ -299,16 +308,6 @@ type Server struct {
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntries consul.ConfigAPI, consulACLs consul.ACLsAPI) (*Server, error) {

// Create an eval broker
evalBroker, err := NewEvalBroker(
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}

// Configure TLS
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true)
if err != nil {
Expand Down Expand Up @@ -347,9 +346,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
reconcileCh: make(chan serf.Member, 32),
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
reapCancelableEvalsCh: make(chan struct{}),
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
Expand All @@ -358,6 +355,21 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()

// Create an eval broker
evalBroker, err := NewEvalBroker(
s.shutdownCtx,
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}
s.evalBroker = evalBroker

// Create the blocked evals
s.blockedEvals = NewBlockedEvals(s.evalBroker, s.logger)

// Create the RPC handler
s.rpcHandler = newRpcHandler(s)

Expand Down Expand Up @@ -456,7 +468,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr

// Start the eval broker notification system so any subscribers can get
// updates when the processes SetEnabled is triggered.
go s.evalBroker.enabledNotifier.Run(s.shutdownCh)
go s.evalBroker.enabledNotifier.Run()

// Setup the node drainer.
s.setupNodeDrainer()
Expand Down Expand Up @@ -673,6 +685,13 @@ func (s *Server) Shutdown() error {
s.shutdown = true
s.shutdownCancel()

s.workerLock.Lock()
defer s.workerLock.Unlock()
s.stopOldWorkers(s.workers)
workerShutdownTimeoutCtx, cancelWorkerShutdownTimeoutCtx := context.WithTimeout(context.Background(), workerShutdownGracePeriod)
defer cancelWorkerShutdownTimeoutCtx()
s.workerShutdownGroup.WaitWithContext(workerShutdownTimeoutCtx)

if s.serf != nil {
s.serf.Shutdown()
}
Expand Down Expand Up @@ -1743,7 +1762,7 @@ func (s *Server) setupWorkersLocked(ctx context.Context, poolArgs SchedulerWorke
return err
} else {
s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers)

s.workerShutdownGroup.AddCh(w.ShutdownCh())
s.workers = append(s.workers, w)
}
}
Expand Down
Loading