Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prioritized client updates #17354

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17354.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
client: prioritize allocation updates to reduce Raft and RPC load
```
30 changes: 18 additions & 12 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,30 +1420,36 @@ func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) {
}
}

// LastAcknowledgedStateIsCurrent returns true if the current state matches the
// state that was last acknowledged from a server update. This is called from
// the client in the same goroutine that called AcknowledgeState so that we
// can't get a TOCTOU error.
func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool {
// GetUpdatePriority returns the update priority based the difference between
// the current state and the state that was last acknowledged from a server
// update. This is called from the client in the same goroutine that called
// AcknowledgeState so that we can't get a TOCTOU error.
func (ar *allocRunner) GetUpdatePriority(a *structs.Allocation) cstructs.AllocUpdatePriority {
ar.stateLock.RLock()
defer ar.stateLock.RUnlock()

last := ar.lastAcknowledgedState
if last == nil {
return false
return cstructs.AllocUpdatePriorityTypical
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't know what it was before, how can we assume the change is typical? Seems worth a comment especially since all of the other code in this method must check from Highest Priority to Lowest in order to ensure a change to a low priority field doesn't demote an actually high priority update.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that we don't know for sure. In practice an allocation will never become healthy quickly enough that the first update we send is that update. That being said we probably should account for allocations that quickly fail because there's a bunch of things that can go unrecoverably wrong on the client before we ever hit the task runner, and it'd be nice to be able to send those failure states to the server more quickly.

}

switch {
case last.ClientStatus != a.ClientStatus:
return false
return cstructs.AllocUpdatePriorityUrgent
case last.ClientDescription != a.ClientDescription:
return false
return cstructs.AllocUpdatePriorityTypical
case !last.DeploymentStatus.Equal(a.DeploymentStatus):
return false
return cstructs.AllocUpdatePriorityTypical
Comment on lines 1441 to +1442
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically deployments are gated by this field, so it could considered critical since it can cause a scheduling decision...

...but nothing about deployments is concerned with sub-second latencies, so I think it's fine to leave this as Typical.

If you're in this code again maybe add a comment pointing out that while deployment status changes are not urgent, they can affect scheduling but not in a way that sub-second skew is significant.

If you really want to tidy things up the PR description misses this too:

Each allocation sends many updates during its lifetime, but only those that change the ClientStatus field are critical for progressing a deployment or kicking off a reschedule to recover from failures.

DeploymentStatus is critical for progressing a deployment as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically deployments are gated by this field, so it could considered critical since it can cause a scheduling decision...

...but nothing about deployments is concerned with sub-second latencies, so I think it's fine to leave this as Typical

Somehow I missed that, so yeah I would've set it to urgent based on the reasoning I had in the PR. I'll keep (for now at least) and I'll add some commentary here around reasoning for things.

case !last.NetworkStatus.Equal(a.NetworkStatus):
return false
return cstructs.AllocUpdatePriorityTypical
}
return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool {

if !maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool {
return st.Equal(o)
})

}) {
return cstructs.AllocUpdatePriorityTypical
}

return cstructs.AllocUpdatePriorityNone
}
20 changes: 13 additions & 7 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration"
regMock "github.com/hashicorp/nomad/client/serviceregistration/mock"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -2458,7 +2459,7 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) {
))
}

func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
func TestAllocRunner_GetUpdatePriority(t *testing.T) {
ci.Parallel(t)

alloc := mock.Alloc()
Expand Down Expand Up @@ -2489,12 +2490,12 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
NetworkStatus: calloc.NetworkStatus,
})

must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc))

// clientAlloc mutates the state, so verify this doesn't break the check
// without state having been updated
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc))

// make a no-op state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
Expand All @@ -2503,14 +2504,19 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc))

// make a state update that should be detected as a change
// make a low priority state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.2.1",
Address: "192.168.1.2",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc))
must.Eq(t, cstructs.AllocUpdatePriorityTypical, ar.GetUpdatePriority(calloc))

// make a state update that should be detected as high priority
ar.SetClientStatus(structs.AllocClientStatusFailed)
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.Eq(t, cstructs.AllocUpdatePriorityUrgent, ar.GetUpdatePriority(calloc))
}
2 changes: 1 addition & 1 deletion client/allocrunner/interfaces/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type AllocRunner interface {
AllocState() *state.State
PersistState() error
AcknowledgeState(*state.State)
LastAcknowledgedStateIsCurrent(*structs.Allocation) bool
GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority
SetClientStatus(string)

Signal(taskName, signal string) error
Expand Down
180 changes: 124 additions & 56 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
Expand Down Expand Up @@ -213,8 +214,8 @@ type Client struct {
invalidAllocs map[string]struct{}
invalidAllocsLock sync.Mutex

// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
// pendingUpdates stores allocations that need to be synced to the server.
pendingUpdates *pendingClientUpdates

// consulService is the Consul handler implementation for managing services
// and checks.
Expand Down Expand Up @@ -366,7 +367,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
logger: logger,
rpcLogger: logger.Named("rpc"),
allocs: make(map[string]interfaces.AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
pendingUpdates: newPendingClientUpdates(),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
Expand Down Expand Up @@ -1322,10 +1323,7 @@ func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) {

// Mark alloc as failed so server can handle this
failed := makeFailedAlloc(alloc, err)
select {
case c.allocUpdates <- failed:
case <-c.shutdownCh:
}
c.pendingUpdates.add(failed)
}

// saveState is used to snapshot our state into the data dir.
Expand Down Expand Up @@ -2099,10 +2097,7 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
stripped.DeploymentStatus = alloc.DeploymentStatus
stripped.NetworkStatus = alloc.NetworkStatus

select {
case c.allocUpdates <- stripped:
case <-c.shutdownCh:
}
c.pendingUpdates.add(stripped)
}

// PutAllocation stores an allocation or returns an error if it could not be stored.
Expand All @@ -2114,39 +2109,27 @@ func (c *Client) PutAllocation(alloc *structs.Allocation) error {
// server.
func (c *Client) allocSync() {
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
updateTicks := 0

for {
select {
case <-c.shutdownCh:
syncTicker.Stop()
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc

case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
continue
}
// Ensure we never send an update before we've had at least one sync
// from the server
select {
case <-c.serversContactedCh:
default:
continue
}

sync := c.filterAcknowledgedUpdates(updates)
if len(sync) == 0 {
// No updates to send
updates = make(map[string]*structs.Allocation, len(updates))
updateTicks++
toSync := c.pendingUpdates.nextBatch(c, updateTicks)

if len(toSync) == 0 {
syncTicker.Reset(allocSyncIntv)
continue
}

// Send to server.
args := structs.AllocUpdateRequest{
Alloc: sync,
Alloc: toSync,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}

Expand All @@ -2156,12 +2139,17 @@ func (c *Client) allocSync() {
// Error updating allocations, do *not* clear
// updates and retry after backoff
c.logger.Error("error updating allocations", "error", err)

// refill the updates queue with updates that we failed to make
c.pendingUpdates.restore(toSync)
syncTicker.Reset(c.retryIntv(allocSyncRetryIntv))
continue
}

// Record that we've successfully synced these updates so that it's
// written to disk
c.allocLock.RLock()
for _, update := range sync {
for _, update := range toSync {
if ar, ok := c.allocs[update.ID]; ok {
ar.AcknowledgeState(&arstate.State{
ClientStatus: update.ClientStatus,
Expand All @@ -2174,35 +2162,15 @@ func (c *Client) allocSync() {
}
c.allocLock.RUnlock()

// Successfully updated allocs, reset map and ticker.
// Always reset ticker to give loop time to receive
// alloc updates. If the RPC took the ticker interval
// we may call it in a tight loop before draining
// buffered updates.
updates = make(map[string]*structs.Allocation, len(updates))
// Successfully updated allocs. Reset ticker to give loop time to
// receive new alloc updates. Otherwise if the RPC took the ticker
// interval we may call it in a tight loop reading empty updates.
updateTicks = 0
syncTicker.Reset(allocSyncIntv)
}
}
}

func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation {
sync := make([]*structs.Allocation, 0, len(updates))
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for allocID, update := range updates {
if ar, ok := c.allocs[allocID]; ok {
if !ar.LastAcknowledgedStateIsCurrent(update) {
sync = append(sync, update)
}
} else {
// no allocrunner (typically a failed placement), so we need
// to send update
sync = append(sync, update)
}
}
return sync
}

// allocUpdates holds the results of receiving updated allocations from the
// servers.
type allocUpdates struct {
Expand Down Expand Up @@ -3306,3 +3274,103 @@ func (g *group) AddCh(ch <-chan struct{}) {
func (g *group) Wait() {
g.wg.Wait()
}

// pendingClientUpdates are the set of allocation updates that the client is
// waiting to send
type pendingClientUpdates struct {
updates map[string]*structs.Allocation
lock sync.Mutex
}

func newPendingClientUpdates() *pendingClientUpdates {
return &pendingClientUpdates{
updates: make(map[string]*structs.Allocation, 64),
}
}

// add overwrites a pending update. The updates we get from the allocrunner are
// lightweight copies of its *structs.Allocation (i.e. just the client state),
// serialized with an internal lock. So the latest update is always the
// authoritative one, and the server only cares about that one.
func (p *pendingClientUpdates) add(alloc *structs.Allocation) {
p.lock.Lock()
defer p.lock.Unlock()
p.updates[alloc.ID] = alloc
}

// restore refills the pending updates map, but only if a newer update hasn't come in
func (p *pendingClientUpdates) restore(toRestore []*structs.Allocation) {
p.lock.Lock()
defer p.lock.Unlock()

for _, alloc := range toRestore {
if _, ok := p.updates[alloc.ID]; !ok {
p.updates[alloc.ID] = alloc
}
}
}

// nextBatch returns a list of client allocation updates we need to make in this
// tick of the allocSync. It returns nil if there's no updates to make yet. The
// caller is responsible for calling restore() if it can't successfully send the
// updates.
func (p *pendingClientUpdates) nextBatch(c *Client, updateTicks int) []*structs.Allocation {
p.lock.Lock()
defer p.lock.Unlock()

// Fast path if there are no pending updates
if len(p.updates) == 0 {
return nil
}

// Ensure we never send an update before we've had at least one sync from
// the server
select {
case <-c.serversContactedCh:
default:
return nil
}

toSync, urgent := p.filterAcknowledgedUpdatesLocked(c)

// Only update every 5th tick if there's no priority updates
if updateTicks%5 != 0 && !urgent {
return nil
}

// Clear here so that allocrunners can queue up the next set of updates
// while we're waiting to hear from the server
maps.Clear(p.updates)

return toSync

}

// filteredAcknowledgedUpdatesLocked returns a list of client alloc updates with
// the already-acknowledged updates removed, and the highest priority of any
// update. note: this method requires that p.lock is held
func (p *pendingClientUpdates) filterAcknowledgedUpdatesLocked(c *Client) ([]*structs.Allocation, bool) {
var urgent bool
sync := make([]*structs.Allocation, 0, len(p.updates))
c.allocLock.RLock()
defer c.allocLock.RUnlock()

for allocID, update := range p.updates {
if ar, ok := c.allocs[allocID]; ok {
switch ar.GetUpdatePriority(update) {
case cstructs.AllocUpdatePriorityUrgent:
sync = append(sync, update)
urgent = true
case cstructs.AllocUpdatePriorityTypical:
sync = append(sync, update)
case cstructs.AllocUpdatePriorityNone:
// update is dropped
}
} else {
// no allocrunner (typically a failed placement), so we need
// to send update
sync = append(sync, update)
}
}
return sync, urgent
}
8 changes: 5 additions & 3 deletions client/client_interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ func (ar *emptyAllocRunner) AllocState() *state.State {
return ar.allocState.Copy()
}

func (ar *emptyAllocRunner) PersistState() error { return nil }
func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {}
func (ar *emptyAllocRunner) LastAcknowledgedStateIsCurrent(*structs.Allocation) bool { return false }
func (ar *emptyAllocRunner) PersistState() error { return nil }
func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {}
func (ar *emptyAllocRunner) GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority {
return cstructs.AllocUpdatePriorityUrgent
}

func (ar *emptyAllocRunner) SetClientStatus(status string) {
ar.allocLock.Lock()
Expand Down
Loading