From 116ad2a3ac0907159c00219126814d77f233209c Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 24 May 2023 09:34:14 -0400 Subject: [PATCH 1/3] prioritized client updates The allocrunner sends several updates to the server during the early lifecycle of an allocation and its tasks. Clients batch-up allocation updates every 200ms, but experiments like the C2M challenge has shown that even with this batching, servers can be overwhelmed with client updates during high volume deployments. Benchmarking done in #9451 has shown that client updates can easily represent ~70% of all Nomad Raft traffic. Each allocation sends many updates during its lifetime, but only those that change the `ClientStatus` field are critical for progressing a deployment or kicking off a reschedule to recover from failures. Add a priority to the client allocation sync and update the `syncTicker` receiver so that we only send an update if there's a high priority update waiting, or on every 5th tick. This means when there are no high priority updates, the client will send updates at most every 1s instead of 200ms. Benchmarks have shown this can reduce overall Raft traffic by 10%, as well as reduce client-to-server RPC traffic. This changeset also switches from a channel-based collection of updates to a shared buffer, so as to split batching from sending and prevent backpressure onto the allocrunner when the RPC is slow. This doesn't have a major performance benefit in the benchmarks but makes the implementation of the prioritized update simpler. Fixes: #9451 --- .changelog/17354.txt | 3 + client/allocrunner/alloc_runner.go | 30 +++--- client/allocrunner/alloc_runner_test.go | 20 ++-- client/allocrunner/interfaces/runner.go | 2 +- client/client.go | 130 ++++++++++++++++-------- client/client_interface_test.go | 8 +- client/client_test.go | 78 +++++++++----- client/structs/enum.go | 11 ++ 8 files changed, 193 insertions(+), 89 deletions(-) create mode 100644 .changelog/17354.txt create mode 100644 client/structs/enum.go diff --git a/.changelog/17354.txt b/.changelog/17354.txt new file mode 100644 index 00000000000..92a6e613f2a --- /dev/null +++ b/.changelog/17354.txt @@ -0,0 +1,3 @@ +```release-note:improvement +client: prioritize allocation updates to reduce Raft and RPC load +``` diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index d82b238ef4e..04de68ab442 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -1420,30 +1420,36 @@ func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) { } } -// LastAcknowledgedStateIsCurrent returns true if the current state matches the -// state that was last acknowledged from a server update. This is called from -// the client in the same goroutine that called AcknowledgeState so that we -// can't get a TOCTOU error. -func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool { +// GetUpdatePriority returns the update priority based the difference between +// the current state and the state that was last acknowledged from a server +// update. This is called from the client in the same goroutine that called +// AcknowledgeState so that we can't get a TOCTOU error. +func (ar *allocRunner) GetUpdatePriority(a *structs.Allocation) cstructs.AllocUpdatePriority { ar.stateLock.RLock() defer ar.stateLock.RUnlock() last := ar.lastAcknowledgedState if last == nil { - return false + return cstructs.AllocUpdatePriorityTypical } switch { case last.ClientStatus != a.ClientStatus: - return false + return cstructs.AllocUpdatePriorityUrgent case last.ClientDescription != a.ClientDescription: - return false + return cstructs.AllocUpdatePriorityTypical case !last.DeploymentStatus.Equal(a.DeploymentStatus): - return false + return cstructs.AllocUpdatePriorityTypical case !last.NetworkStatus.Equal(a.NetworkStatus): - return false + return cstructs.AllocUpdatePriorityTypical } - return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool { + + if !maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool { return st.Equal(o) - }) + + }) { + return cstructs.AllocUpdatePriorityTypical + } + + return cstructs.AllocUpdatePriorityNone } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index a477ae2432a..775b1d5ff6c 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -28,6 +28,7 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" "github.com/hashicorp/nomad/client/state" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -2458,7 +2459,7 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) { )) } -func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { +func TestAllocRunner_GetUpdatePriority(t *testing.T) { ci.Parallel(t) alloc := mock.Alloc() @@ -2489,12 +2490,12 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { NetworkStatus: calloc.NetworkStatus, }) - must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc)) // clientAlloc mutates the state, so verify this doesn't break the check // without state having been updated calloc = ar.clientAlloc(map[string]*structs.TaskState{}) - must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc)) // make a no-op state update ar.SetNetworkStatus(&structs.AllocNetworkStatus{ @@ -2503,14 +2504,19 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { DNS: &structs.DNSConfig{}, }) calloc = ar.clientAlloc(map[string]*structs.TaskState{}) - must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityNone, ar.GetUpdatePriority(calloc)) - // make a state update that should be detected as a change + // make a low priority state update ar.SetNetworkStatus(&structs.AllocNetworkStatus{ InterfaceName: "eth0", - Address: "192.168.2.1", + Address: "192.168.1.2", DNS: &structs.DNSConfig{}, }) calloc = ar.clientAlloc(map[string]*structs.TaskState{}) - must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + must.Eq(t, cstructs.AllocUpdatePriorityTypical, ar.GetUpdatePriority(calloc)) + + // make a state update that should be detected as high priority + ar.SetClientStatus(structs.AllocClientStatusFailed) + calloc = ar.clientAlloc(map[string]*structs.TaskState{}) + must.Eq(t, cstructs.AllocUpdatePriorityUrgent, ar.GetUpdatePriority(calloc)) } diff --git a/client/allocrunner/interfaces/runner.go b/client/allocrunner/interfaces/runner.go index d7889426643..6ed59e2582c 100644 --- a/client/allocrunner/interfaces/runner.go +++ b/client/allocrunner/interfaces/runner.go @@ -35,7 +35,7 @@ type AllocRunner interface { AllocState() *state.State PersistState() error AcknowledgeState(*state.State) - LastAcknowledgedStateIsCurrent(*structs.Allocation) bool + GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority SetClientStatus(string) Signal(taskName, signal string) error diff --git a/client/client.go b/client/client.go index daf7512a675..9fc93f8aec4 100644 --- a/client/client.go +++ b/client/client.go @@ -43,6 +43,7 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" @@ -213,8 +214,10 @@ type Client struct { invalidAllocs map[string]struct{} invalidAllocsLock sync.Mutex - // allocUpdates stores allocations that need to be synced to the server. - allocUpdates chan *structs.Allocation + // allocUpdates stores allocations that need to be synced to the server, and + // allocUpdatesLock guards access to it for concurrent updates. + allocUpdates map[string]*structs.Allocation + allocUpdatesLock sync.Mutex // consulService is the Consul handler implementation for managing services // and checks. @@ -366,7 +369,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie logger: logger, rpcLogger: logger.Named("rpc"), allocs: make(map[string]interfaces.AllocRunner), - allocUpdates: make(chan *structs.Allocation, 64), + allocUpdates: make(map[string]*structs.Allocation, 64), shutdownCh: make(chan struct{}), triggerDiscoveryCh: make(chan struct{}), triggerNodeUpdate: make(chan struct{}, 8), @@ -1322,10 +1325,11 @@ func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) { // Mark alloc as failed so server can handle this failed := makeFailedAlloc(alloc, err) - select { - case c.allocUpdates <- failed: - case <-c.shutdownCh: - } + + c.allocUpdatesLock.Lock() + defer c.allocUpdatesLock.Unlock() + + c.allocUpdates[alloc.ID] = failed } // saveState is used to snapshot our state into the data dir. @@ -2099,10 +2103,10 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) { stripped.DeploymentStatus = alloc.DeploymentStatus stripped.NetworkStatus = alloc.NetworkStatus - select { - case c.allocUpdates <- stripped: - case <-c.shutdownCh: - } + c.allocUpdatesLock.Lock() + defer c.allocUpdatesLock.Unlock() + + c.allocUpdates[stripped.ID] = stripped } // PutAllocation stores an allocation or returns an error if it could not be stored. @@ -2114,39 +2118,27 @@ func (c *Client) PutAllocation(alloc *structs.Allocation) error { // server. func (c *Client) allocSync() { syncTicker := time.NewTicker(allocSyncIntv) - updates := make(map[string]*structs.Allocation) + updateTicks := 0 + for { select { case <-c.shutdownCh: syncTicker.Stop() return - case alloc := <-c.allocUpdates: - // Batch the allocation updates until the timer triggers. - updates[alloc.ID] = alloc + case <-syncTicker.C: - // Fast path if there are no updates - if len(updates) == 0 { - continue - } - // Ensure we never send an update before we've had at least one sync - // from the server - select { - case <-c.serversContactedCh: - default: - continue - } - sync := c.filterAcknowledgedUpdates(updates) - if len(sync) == 0 { - // No updates to send - updates = make(map[string]*structs.Allocation, len(updates)) + updateTicks++ + toSync := c.updatesToSync(updateTicks) + + if len(toSync) == 0 { syncTicker.Reset(allocSyncIntv) continue } // Send to server. args := structs.AllocUpdateRequest{ - Alloc: sync, + Alloc: toSync, WriteRequest: structs.WriteRequest{Region: c.Region()}, } @@ -2156,12 +2148,25 @@ func (c *Client) allocSync() { // Error updating allocations, do *not* clear // updates and retry after backoff c.logger.Error("error updating allocations", "error", err) + + // refill the updates queue with updates that we failed to make, + // but only if a newer update for that alloc hasn't come in. + c.allocUpdatesLock.Lock() + for _, unsynced := range toSync { + if _, ok := c.allocUpdates[unsynced.ID]; !ok { + c.allocUpdates[unsynced.ID] = unsynced + } + } + c.allocUpdatesLock.Unlock() + syncTicker.Reset(c.retryIntv(allocSyncRetryIntv)) continue } + // Record that we've successfully synced these updates so that it's + // written to disk c.allocLock.RLock() - for _, update := range sync { + for _, update := range toSync { if ar, ok := c.allocs[update.ID]; ok { ar.AcknowledgeState(&arstate.State{ ClientStatus: update.ClientStatus, @@ -2174,25 +2179,68 @@ func (c *Client) allocSync() { } c.allocLock.RUnlock() - // Successfully updated allocs, reset map and ticker. - // Always reset ticker to give loop time to receive - // alloc updates. If the RPC took the ticker interval - // we may call it in a tight loop before draining - // buffered updates. - updates = make(map[string]*structs.Allocation, len(updates)) + // Successfully updated allocs. Reset ticker to give loop time to + // receive new alloc updates. Otherwise if the RPC took the ticker + // interval we may call it in a tight loop reading empty updates. + updateTicks = 0 syncTicker.Reset(allocSyncIntv) } } } -func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation { +// updatesToSync returns a list of client allocation updates we need to make in +// this tick of the allocSync. It returns nil if there's no updates to make +// yet. The caller is responsible for restoring the c.allocUpdates map if it +// can't successfully send the updates. +func (c *Client) updatesToSync(updateTicks int) []*structs.Allocation { + + c.allocUpdatesLock.Lock() + defer c.allocUpdatesLock.Unlock() + + // Fast path if there are no pending updates + if len(c.allocUpdates) == 0 { + return nil + } + + // Ensure we never send an update before we've had at least one sync from + // the server + select { + case <-c.serversContactedCh: + default: + return nil + } + + toSync, urgent := c.filterAcknowledgedUpdates(c.allocUpdates) + + // Only update every 5th tick if there's no priority updates + if updateTicks%5 != 0 && !urgent { + return nil + } + + // Clear here so that allocrunners can queue up the next set of updates + // while we're waiting to hear from the server + c.allocUpdates = make(map[string]*structs.Allocation, len(c.allocUpdates)) + + return toSync +} + +// filteredAcknowledgedUpdates returns a list of client alloc updates with the +// already-acknowledged updates removed, and the highest priority of any update. +func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) ([]*structs.Allocation, bool) { + var urgent bool sync := make([]*structs.Allocation, 0, len(updates)) c.allocLock.RLock() defer c.allocLock.RUnlock() for allocID, update := range updates { if ar, ok := c.allocs[allocID]; ok { - if !ar.LastAcknowledgedStateIsCurrent(update) { + switch ar.GetUpdatePriority(update) { + case cstructs.AllocUpdatePriorityUrgent: + sync = append(sync, update) + urgent = true + case cstructs.AllocUpdatePriorityTypical: sync = append(sync, update) + case cstructs.AllocUpdatePriorityNone: + // update is dropped } } else { // no allocrunner (typically a failed placement), so we need @@ -2200,7 +2248,7 @@ func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocatio sync = append(sync, update) } } - return sync + return sync, urgent } // allocUpdates holds the results of receiving updated allocations from the diff --git a/client/client_interface_test.go b/client/client_interface_test.go index 5a985b7bce4..326cd5484f5 100644 --- a/client/client_interface_test.go +++ b/client/client_interface_test.go @@ -120,9 +120,11 @@ func (ar *emptyAllocRunner) AllocState() *state.State { return ar.allocState.Copy() } -func (ar *emptyAllocRunner) PersistState() error { return nil } -func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {} -func (ar *emptyAllocRunner) LastAcknowledgedStateIsCurrent(*structs.Allocation) bool { return false } +func (ar *emptyAllocRunner) PersistState() error { return nil } +func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {} +func (ar *emptyAllocRunner) GetUpdatePriority(*structs.Allocation) cstructs.AllocUpdatePriority { + return cstructs.AllocUpdatePriorityUrgent +} func (ar *emptyAllocRunner) SetClientStatus(status string) { ar.allocLock.Lock() diff --git a/client/client_test.go b/client/client_test.go index 04148854c73..719d81c9edf 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -765,6 +765,24 @@ func TestClient_SaveRestoreState(t *testing.T) { return fmt.Errorf("expected running client status, got %v", ar.AllocState().ClientStatus) } + + case alloc3: + if ar.AllocState().ClientStatus != structs.AllocClientStatusComplete { + return fmt.Errorf("expected complete client status, got %v", + ar.AllocState().ClientStatus) + } + + // because the client's update will be batched, we need to + // ensure we wait for the server update too + a3, err := store.AllocByID(nil, alloc3) + must.NoError(t, err) + must.NotNil(t, a3) + if alloc3AllocModifyIndex != a3.AllocModifyIndex || + alloc3ModifyIndex >= a3.ModifyIndex { + return fmt.Errorf( + "alloc %s stopped during shutdown should have updated", a3.ID[:8]) + } + default: if ar.AllocState().ClientStatus != structs.AllocClientStatusComplete { return fmt.Errorf("expected complete client status, got %v", @@ -778,33 +796,43 @@ func TestClient_SaveRestoreState(t *testing.T) { wait.Gap(time.Millisecond*30), )) - a1, err = store.AllocByID(nil, alloc1) - must.NoError(t, err) - must.NotNil(t, a1) - test.Eq(t, alloc1AllocModifyIndex, a1.AllocModifyIndex) - test.Eq(t, alloc1ModifyIndex, a1.ModifyIndex, - test.Sprint("alloc still running should not have updated")) + // Because we're asserting that no changes have been made, we have to wait a + // sufficient amount of time to verify that + must.Wait(t, wait.ContinualSuccess( + wait.ErrorFunc(func() error { + a1, err = store.AllocByID(nil, alloc1) + must.NoError(t, err) + must.NotNil(t, a1) - a2, err := store.AllocByID(nil, alloc2) - must.NoError(t, err) - must.NotNil(t, a2) - test.Eq(t, alloc2AllocModifyIndex, a2.AllocModifyIndex) - test.Eq(t, alloc2ModifyIndex, a2.ModifyIndex, - test.Sprintf("alloc %s stopped before shutdown should not have updated", a2.ID[:8])) + if alloc1AllocModifyIndex != a1.AllocModifyIndex || + alloc1ModifyIndex != a1.ModifyIndex { + return fmt.Errorf("alloc still running should not have updated") + } + + a2, err := store.AllocByID(nil, alloc2) + must.NoError(t, err) + must.NotNil(t, a2) + if alloc2AllocModifyIndex != a2.AllocModifyIndex || + alloc2ModifyIndex != a2.ModifyIndex { + return fmt.Errorf( + "alloc %s stopped before shutdown should not have updated", a2.ID[:8]) + } + + // TODO: the alloc has been GC'd so the server will reject any + // update. It'd be nice if we could instrument the server here to + // ensure we didn't send one either. + a4, err := store.AllocByID(nil, alloc4) + must.NoError(t, err) + if a4 != nil { + return fmt.Errorf("garbage collected alloc should not exist") + } + + return nil + }), + wait.Timeout(time.Second*3), + wait.Gap(time.Millisecond*100), + )) - a3, err := store.AllocByID(nil, alloc3) - must.NoError(t, err) - must.NotNil(t, a3) - test.Eq(t, alloc3AllocModifyIndex, a3.AllocModifyIndex) - test.Greater(t, alloc3ModifyIndex, a3.ModifyIndex, - test.Sprintf("alloc %s stopped during shutdown should have updated", a3.ID[:8])) - - // TODO: the alloc has been GC'd so the server will reject any update. It'd - // be nice if we could instrument the server here to ensure we didn't send - // one either. - a4, err := store.AllocByID(nil, alloc4) - must.NoError(t, err) - test.Nil(t, a4, test.Sprint("garbage collected alloc should not exist")) } func TestClient_AddAllocError(t *testing.T) { diff --git a/client/structs/enum.go b/client/structs/enum.go new file mode 100644 index 00000000000..ed11bb1aa97 --- /dev/null +++ b/client/structs/enum.go @@ -0,0 +1,11 @@ +package structs + +// AllocUpdatePriority indicates the urgency of an allocation update so that the +// client can decide whether to wait longer +type AllocUpdatePriority int + +const ( + AllocUpdatePriorityNone AllocUpdatePriority = iota + AllocUpdatePriorityTypical + AllocUpdatePriorityUrgent +) From eae68543bce6f9d2d951cad9e7b9ab2a8dba1dbc Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 31 May 2023 10:27:21 -0400 Subject: [PATCH 2/3] refactor to improve ergnomics of allocUpdates lock --- client/client.go | 195 ++++++++++++++++++++++++++--------------------- 1 file changed, 107 insertions(+), 88 deletions(-) diff --git a/client/client.go b/client/client.go index 9fc93f8aec4..92e6e61af7a 100644 --- a/client/client.go +++ b/client/client.go @@ -214,10 +214,8 @@ type Client struct { invalidAllocs map[string]struct{} invalidAllocsLock sync.Mutex - // allocUpdates stores allocations that need to be synced to the server, and - // allocUpdatesLock guards access to it for concurrent updates. - allocUpdates map[string]*structs.Allocation - allocUpdatesLock sync.Mutex + // pendingUpdates stores allocations that need to be synced to the server. + pendingUpdates *pendingClientUpdates // consulService is the Consul handler implementation for managing services // and checks. @@ -369,7 +367,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie logger: logger, rpcLogger: logger.Named("rpc"), allocs: make(map[string]interfaces.AllocRunner), - allocUpdates: make(map[string]*structs.Allocation, 64), + pendingUpdates: newPendingClientUpdates(), shutdownCh: make(chan struct{}), triggerDiscoveryCh: make(chan struct{}), triggerNodeUpdate: make(chan struct{}, 8), @@ -1325,11 +1323,7 @@ func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) { // Mark alloc as failed so server can handle this failed := makeFailedAlloc(alloc, err) - - c.allocUpdatesLock.Lock() - defer c.allocUpdatesLock.Unlock() - - c.allocUpdates[alloc.ID] = failed + c.pendingUpdates.add(failed) } // saveState is used to snapshot our state into the data dir. @@ -2103,10 +2097,7 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) { stripped.DeploymentStatus = alloc.DeploymentStatus stripped.NetworkStatus = alloc.NetworkStatus - c.allocUpdatesLock.Lock() - defer c.allocUpdatesLock.Unlock() - - c.allocUpdates[stripped.ID] = stripped + c.pendingUpdates.add(stripped) } // PutAllocation stores an allocation or returns an error if it could not be stored. @@ -2129,7 +2120,7 @@ func (c *Client) allocSync() { case <-syncTicker.C: updateTicks++ - toSync := c.updatesToSync(updateTicks) + toSync := c.pendingUpdates.nextBatch(c, updateTicks) if len(toSync) == 0 { syncTicker.Reset(allocSyncIntv) @@ -2149,16 +2140,8 @@ func (c *Client) allocSync() { // updates and retry after backoff c.logger.Error("error updating allocations", "error", err) - // refill the updates queue with updates that we failed to make, - // but only if a newer update for that alloc hasn't come in. - c.allocUpdatesLock.Lock() - for _, unsynced := range toSync { - if _, ok := c.allocUpdates[unsynced.ID]; !ok { - c.allocUpdates[unsynced.ID] = unsynced - } - } - c.allocUpdatesLock.Unlock() - + // refill the updates queue with updates that we failed to make + c.pendingUpdates.restore(toSync) syncTicker.Reset(c.retryIntv(allocSyncRetryIntv)) continue } @@ -2188,69 +2171,6 @@ func (c *Client) allocSync() { } } -// updatesToSync returns a list of client allocation updates we need to make in -// this tick of the allocSync. It returns nil if there's no updates to make -// yet. The caller is responsible for restoring the c.allocUpdates map if it -// can't successfully send the updates. -func (c *Client) updatesToSync(updateTicks int) []*structs.Allocation { - - c.allocUpdatesLock.Lock() - defer c.allocUpdatesLock.Unlock() - - // Fast path if there are no pending updates - if len(c.allocUpdates) == 0 { - return nil - } - - // Ensure we never send an update before we've had at least one sync from - // the server - select { - case <-c.serversContactedCh: - default: - return nil - } - - toSync, urgent := c.filterAcknowledgedUpdates(c.allocUpdates) - - // Only update every 5th tick if there's no priority updates - if updateTicks%5 != 0 && !urgent { - return nil - } - - // Clear here so that allocrunners can queue up the next set of updates - // while we're waiting to hear from the server - c.allocUpdates = make(map[string]*structs.Allocation, len(c.allocUpdates)) - - return toSync -} - -// filteredAcknowledgedUpdates returns a list of client alloc updates with the -// already-acknowledged updates removed, and the highest priority of any update. -func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) ([]*structs.Allocation, bool) { - var urgent bool - sync := make([]*structs.Allocation, 0, len(updates)) - c.allocLock.RLock() - defer c.allocLock.RUnlock() - for allocID, update := range updates { - if ar, ok := c.allocs[allocID]; ok { - switch ar.GetUpdatePriority(update) { - case cstructs.AllocUpdatePriorityUrgent: - sync = append(sync, update) - urgent = true - case cstructs.AllocUpdatePriorityTypical: - sync = append(sync, update) - case cstructs.AllocUpdatePriorityNone: - // update is dropped - } - } else { - // no allocrunner (typically a failed placement), so we need - // to send update - sync = append(sync, update) - } - } - return sync, urgent -} - // allocUpdates holds the results of receiving updated allocations from the // servers. type allocUpdates struct { @@ -3354,3 +3274,102 @@ func (g *group) AddCh(ch <-chan struct{}) { func (g *group) Wait() { g.wg.Wait() } + +// pendingClientUpdates are the set of allocation updates that the client is +// waiting to send +type pendingClientUpdates struct { + updates map[string]*structs.Allocation + lock sync.Mutex +} + +func newPendingClientUpdates() *pendingClientUpdates { + return &pendingClientUpdates{ + updates: make(map[string]*structs.Allocation, 64), + } +} + +// add overwrites a pending update. The updates we get from the allocrunner are +// lightweight copies of its *structs.Allocation (i.e. just the client state), +// serialized with an internal lock. So the latest update is always the +// authoritative one, and the server only cares about that one. +func (p *pendingClientUpdates) add(alloc *structs.Allocation) { + p.lock.Lock() + defer p.lock.Unlock() + p.updates[alloc.ID] = alloc +} + +// restore refills the pending updates map, but only if a newer update hasn't come in +func (p *pendingClientUpdates) restore(toRestore []*structs.Allocation) { + p.lock.Lock() + defer p.lock.Unlock() + + for _, alloc := range toRestore { + if _, ok := p.updates[alloc.ID]; !ok { + p.updates[alloc.ID] = alloc + } + } +} + +// nextBatch returns a list of client allocation updates we need to make in this +// tick of the allocSync. It returns nil if there's no updates to make yet. The +// caller is responsible for calling restore() if it can't successfully send the +// updates. +func (p *pendingClientUpdates) nextBatch(c *Client, updateTicks int) []*structs.Allocation { + p.lock.Lock() + defer p.lock.Unlock() + + // Fast path if there are no pending updates + if len(p.updates) == 0 { + return nil + } + + // Ensure we never send an update before we've had at least one sync from + // the server + select { + case <-c.serversContactedCh: + default: + return nil + } + + toSync, urgent := p.filterAcknowledgedUpdatesLocked(c) + + // Only update every 5th tick if there's no priority updates + if updateTicks%5 != 0 && !urgent { + return nil + } + + // Clear here so that allocrunners can queue up the next set of updates + // while we're waiting to hear from the server + maps.Clear(p.updates) + + return toSync + +} + +// filteredAcknowledgedUpdatesLocked returns a list of client alloc updates with the +// already-acknowledged updates removed, and the highest priority of any update. +func (p *pendingClientUpdates) filterAcknowledgedUpdatesLocked(c *Client) ([]*structs.Allocation, bool) { + var urgent bool + sync := make([]*structs.Allocation, 0, len(p.updates)) + c.allocLock.RLock() + defer c.allocLock.RUnlock() + + for allocID, update := range p.updates { + if ar, ok := c.allocs[allocID]; ok { + switch ar.GetUpdatePriority(update) { + case cstructs.AllocUpdatePriorityUrgent: + sync = append(sync, update) + urgent = true + case cstructs.AllocUpdatePriorityTypical: + sync = append(sync, update) + case cstructs.AllocUpdatePriorityNone: + // update is dropped + } + } else { + // no allocrunner (typically a failed placement), so we need + // to send update + sync = append(sync, update) + } + } + return sync, urgent +} From 88527e563aef15d19038fd5cc2c8e7a3bd84e52e Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 31 May 2023 14:44:46 -0400 Subject: [PATCH 3/3] add note to method about lock --- client/client.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index 92e6e61af7a..18e4cbe86ff 100644 --- a/client/client.go +++ b/client/client.go @@ -3346,8 +3346,9 @@ func (p *pendingClientUpdates) nextBatch(c *Client, updateTicks int) []*structs. } -// filteredAcknowledgedUpdatesLocked returns a list of client alloc updates with the -// already-acknowledged updates removed, and the highest priority of any update. +// filteredAcknowledgedUpdatesLocked returns a list of client alloc updates with +// the already-acknowledged updates removed, and the highest priority of any +// update. note: this method requires that p.lock is held func (p *pendingClientUpdates) filterAcknowledgedUpdatesLocked(c *Client) ([]*structs.Allocation, bool) { var urgent bool sync := make([]*structs.Allocation, 0, len(p.updates))