Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: wait for batched driver updates before registering nodes #5585

Merged
merged 4 commits into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ const (
allocSyncRetryIntv = 5 * time.Second
)

var (
// grace period to allow for batch fingerprint processing
batchFirstFingerprintsProcessingGrace = batchFirstFingerprintsTimeout + 5*time.Second
)

// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
// Client
type ClientStatsReporter interface {
Expand Down Expand Up @@ -419,6 +424,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
return nil, fmt.Errorf("failed to setup vault client: %v", err)
}

// wait until drivers are healthy before restoring or registering with servers
select {
case <-c.Ready():
case <-time.After(batchFirstFingerprintsProcessingGrace):
logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far")
}

// Restore the state
if err := c.restoreState(); err != nil {
logger.Error("failed to restore state", "error", err)
Expand Down Expand Up @@ -1456,13 +1468,7 @@ func (c *Client) watchNodeEvents() {
// batchEvents stores events that have yet to be published
var batchEvents []*structs.NodeEvent

// Create and drain the timer
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
timer := stoppedTimer()
defer timer.Stop()

for {
Expand Down Expand Up @@ -1918,7 +1924,8 @@ func (c *Client) updateNodeLocked() {
// it will update the client node copy and re-register the node.
func (c *Client) watchNodeUpdates() {
var hasChanged bool
timer := time.NewTimer(c.retryIntv(nodeUpdateRetryIntv))

timer := stoppedTimer()
defer timer.Stop()

for {
Expand Down
2 changes: 1 addition & 1 deletion client/node_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var (
// batchFirstFingerprintsTimeout is the maximum amount of time to wait for
// initial fingerprinting to complete before sending a batched Node update
batchFirstFingerprintsTimeout = 5 * time.Second
batchFirstFingerprintsTimeout = 50 * time.Second
)

// batchFirstFingerprints waits for the first fingerprint response from all
Expand Down
20 changes: 13 additions & 7 deletions client/pluginmanager/drivermanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,26 +243,32 @@ func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.Ca
}

var mu sync.Mutex
var availDrivers []string
driversByStatus := map[drivers.HealthState][]string{}

var wg sync.WaitGroup

recordDriver := func(name string, lastHeath drivers.HealthState) {
mu.Lock()
defer mu.Unlock()

updated := append(driversByStatus[lastHeath], name)
driversByStatus[lastHeath] = updated
}

// loop through instances and wait for each to finish initial fingerprint
m.instancesMu.RLock()
for n, i := range m.instances {
wg.Add(1)
go func(name string, instance *instanceManager) {
defer wg.Done()
instance.WaitForFirstFingerprint(ctx)
if instance.getLastHealth() != drivers.HealthStateUndetected {
mu.Lock()
availDrivers = append(availDrivers, name)
mu.Unlock()
}
recordDriver(name, instance.getLastHealth())
}(n, i)
}
m.instancesMu.RUnlock()
wg.Wait()
m.logger.Debug("detected drivers", "drivers", availDrivers)

m.logger.Debug("detected drivers", "drivers", driversByStatus)
}

func (m *manager) loadReattachConfigs() error {
Expand Down
11 changes: 11 additions & 0 deletions client/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"fmt"
"math/rand"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -63,3 +64,13 @@ func shuffleStrings(list []string) {
list[i], list[j] = list[j], list[i]
}
}

// stoppedTimer returns a timer that's stopped and wouldn't fire until
// it's reset
func stoppedTimer() *time.Timer {
timer := time.NewTimer(0)
if !timer.Stop() {
<-timer.C
}
return timer
}