Skip to content

Commit

Permalink
func: add new picker dependency (#20029)
Browse files Browse the repository at this point in the history
This commit introduces the new options for reconciling a reconnecting allocation and its replacement:

Best score (Current implementation)
Keep original
Keep replacement
Keep the one that has run the longest time
It is achieved by adding a new dependency to the allocReconciler that calls the corresponding function depending on the task group's disconnect strategy. For more detailed information, refer to the new stanza for disconnected clientes RFC.

It resolves 15144
  • Loading branch information
Juanadelacuesta authored Mar 15, 2024
1 parent 13617ee commit ff72248
Show file tree
Hide file tree
Showing 10 changed files with 914 additions and 220 deletions.
3 changes: 3 additions & 0 deletions .changelog/20029.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
server: Add new options for reconcilation in case of disconnected nodes
```
10 changes: 10 additions & 0 deletions nomad/structs/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,13 @@ func (ds *DisconnectStrategy) Canonicalize() {
ds.Reconcile = ReconcileOptionBestScore
}
}

// ReconcileStrategy returns the strategy to be used when reconciling allocations
// after a client reconnects. Best score is the default one.
func (ds *DisconnectStrategy) ReconcileStrategy() string {
if ds == nil || ds.Reconcile == "" {
return ReconcileOptionBestScore
}

return ds.Reconcile
}
41 changes: 38 additions & 3 deletions nomad/structs/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestJobConfig_Validate_LostAfter_Disconnect(t *testing.T) {
must.NoError(t, err)
}

func TestDisconnectStategy_Validate(t *testing.T) {
func TestDisconnectStrategy_Validate(t *testing.T) {
ci.Parallel(t)

cases := []struct {
Expand Down Expand Up @@ -117,7 +117,42 @@ func TestDisconnectStategy_Validate(t *testing.T) {
}
}

func TestJobConfig_Validate_StopAferClient_Disconnect(t *testing.T) {
func TestReconcileStrategy(t *testing.T) {
ci.Parallel(t)

cases := []struct {
name string
disconnectBlock *DisconnectStrategy
expected string
}{
{
name: "nil_disconnect_default_to_best_score",
disconnectBlock: nil,
expected: ReconcileOptionBestScore,
},
{
name: "empty_reconcile_default_to_best_score",
disconnectBlock: &DisconnectStrategy{},
expected: ReconcileOptionBestScore,
},
{
name: "longest_running",
disconnectBlock: &DisconnectStrategy{
Reconcile: ReconcileOptionLongestRunning,
},
expected: ReconcileOptionLongestRunning,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
rs := c.disconnectBlock.ReconcileStrategy()
must.Eq(t, c.expected, rs)
})
}
}

func TestJobConfig_Validate_StopAfterClient_Disconnect(t *testing.T) {
ci.Parallel(t)
// Setup a system Job with Disconnect.StopOnClientAfter set, which is invalid
job := testJob()
Expand Down Expand Up @@ -153,7 +188,7 @@ func TestJobConfig_Validate_StopAferClient_Disconnect(t *testing.T) {

// Test using stop_after_client_disconnect, remove after its deprecated in favor
// of Disconnect.StopOnClientAfter introduced in 1.8.0.
func TestJobConfig_Validate_StopAferClientDisconnect(t *testing.T) {
func TestJobConfig_Validate_StopAfterClientDisconnect(t *testing.T) {
ci.Parallel(t)
// Setup a system Job with stop_after_client_disconnect set, which is invalid
job := testJob()
Expand Down
26 changes: 26 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11082,6 +11082,16 @@ func (a *Allocation) NextRescheduleTimeByTime(t time.Time) (time.Time, bool) {
return a.nextRescheduleTime(t, reschedulePolicy)
}

func (a *Allocation) RescheduleTimeOnDisconnect(now time.Time) (time.Time, bool) {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil || tg.Disconnect == nil || tg.Disconnect.Replace == nil {
// Kept to maintain backwards compatibility with behavior prior to 1.8.0
return a.NextRescheduleTimeByTime(now)
}

return now, *tg.Disconnect.Replace
}

// ShouldClientStop tests an alloc for StopAfterClient on the Disconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
Expand Down Expand Up @@ -11436,6 +11446,22 @@ func (a *Allocation) NeedsToReconnect() bool {
return disconnected
}

// LastStartOfTask returns the time of the last start event for the given task
// using the allocations TaskStates. If the task has not started, the zero time
// will be returned.
func (a *Allocation) LastStartOfTask(taskName string) time.Time {
task := a.TaskStates[taskName]
if task == nil {
return time.Time{}
}

if task.Restarts > 0 {
return task.LastRestart
}

return task.StartedAt
}

// IdentityClaims are the input to a JWT identifying a workload. It
// should never be serialized to msgpack unsigned.
type IdentityClaims struct {
Expand Down
117 changes: 117 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5901,6 +5901,123 @@ func TestAllocation_NeedsToReconnect(t *testing.T) {
}
}

func TestAllocation_RescheduleTimeOnDisconnect(t *testing.T) {
ci.Parallel(t)
testNow := time.Now()

testAlloc := MockAlloc()

testCases := []struct {
name string
taskGroup string
disconnectGroup *DisconnectStrategy
expected bool
expectedTime time.Time
}{
{
name: "missing_task_group",
taskGroup: "missing-task-group",
expected: false,
expectedTime: time.Time{},
},
{
name: "missing_disconnect_group",
taskGroup: "web",
disconnectGroup: nil,
expected: true,
expectedTime: testNow.Add(RestartPolicyMinInterval), // RestartPolicyMinInterval is the default value
},
{
name: "empty_disconnect_group",
taskGroup: "web",
disconnectGroup: &DisconnectStrategy{},
expected: true,
expectedTime: testNow.Add(RestartPolicyMinInterval), // RestartPolicyMinInterval is the default value
},
{
name: "replace_enabled",
taskGroup: "web",
disconnectGroup: &DisconnectStrategy{
Replace: pointer.Of(true),
},
expected: true,
expectedTime: testNow,
},
{
name: "replace_disabled",
taskGroup: "web",
disconnectGroup: &DisconnectStrategy{
Replace: pointer.Of(false),
},
expected: false,
expectedTime: testNow,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := testAlloc.Copy()

alloc.TaskGroup = tc.taskGroup
alloc.Job.TaskGroups[0].Disconnect = tc.disconnectGroup

time, eligible := alloc.RescheduleTimeOnDisconnect(testNow)

must.Eq(t, tc.expected, eligible)
must.Eq(t, tc.expectedTime, time)
})
}
}

func TestAllocation_LastStartOfTask(t *testing.T) {
ci.Parallel(t)
testNow := time.Now()

alloc := MockAlloc()
alloc.TaskStates = map[string]*TaskState{
"task-with-restarts": {
StartedAt: testNow.Add(-30 * time.Minute),
Restarts: 3,
LastRestart: testNow.Add(-5 * time.Minute),
},
"task-without-restarts": {
StartedAt: testNow.Add(-30 * time.Minute),
Restarts: 0,
},
}

testCases := []struct {
name string
taskName string
expected time.Time
}{
{
name: "missing_task",
taskName: "missing-task",
expected: time.Time{},
},
{
name: "task_with_restarts",
taskName: "task-with-restarts",
expected: testNow.Add(-5 * time.Minute),
},
{
name: "task_without_restarts",
taskName: "task-without-restarts",
expected: testNow.Add(-30 * time.Minute),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc.TaskGroup = "web"
got := alloc.LastStartOfTask(tc.taskName)

must.Eq(t, tc.expected, got)
})
}
}

func TestAllocation_Canonicalize_Old(t *testing.T) {
ci.Parallel(t)

Expand Down
55 changes: 13 additions & 42 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
reconnectingpicker "github.com/hashicorp/nomad/scheduler/reconnecting_picker"
)

const (
Expand All @@ -32,6 +33,10 @@ const (
rescheduleWindowSize = 1 * time.Second
)

type ReconnectingPicker interface {
PickReconnectingAlloc(disconnect *structs.DisconnectStrategy, original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation
}

// allocUpdateType takes an existing allocation and a new job definition and
// returns whether the allocation can ignore the change, requires a destructive
// update, or can be inplace updated. If it can be inplace updated, an updated
Expand Down Expand Up @@ -102,6 +107,8 @@ type allocReconciler struct {
// defaults to time.Now, and overridden in unit tests
now time.Time

reconnectingPicker ReconnectingPicker

// result is the results of the reconcile. During computation it can be
// used to store intermediate state
result *reconcileResults
Expand Down Expand Up @@ -195,6 +202,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string,
evalPriority int, supportsDisconnectedClients bool, opts ...AllocReconcilerOption) *allocReconciler {

ar := &allocReconciler{
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
Expand All @@ -216,6 +224,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
taskGroupAllocNameIndexes: make(map[string]*allocNameIndex),
},
reconnectingPicker: reconnectingpicker.New(logger),
}

for _, op := range opts {
Expand Down Expand Up @@ -461,7 +470,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
if len(reconnecting) > 0 {
// Pass all allocations because the replacements we need to find may be
// in any state, including themselves being reconnected.
reconnect, stop := a.reconcileReconnecting(reconnecting, all)
reconnect, stop := a.reconcileReconnecting(reconnecting, all, tg)

// Stop the reconciled allocations and remove them from the other sets
// since they have been already handled.
Expand Down Expand Up @@ -1145,7 +1154,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet) (allocSet, allocSet) {
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet, tg *structs.TaskGroup) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)

Expand Down Expand Up @@ -1199,8 +1208,8 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all alloc
continue
}

// Pick which allocation we want to keep.
keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc)
// Pick which allocation we want to keep using the disconnect reconcile strategy
keepAlloc := a.reconnectingPicker.PickReconnectingAlloc(tg.Disconnect, reconnectingAlloc, replacementAlloc)
if keepAlloc == replacementAlloc {
// The replacement allocation is preferred, so stop the one
// reconnecting if not stopped yet.
Expand Down Expand Up @@ -1235,44 +1244,6 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all alloc
return reconnect, stop
}

// pickReconnectingAlloc returns the allocation to keep between the original
// one that is reconnecting and one of its replacements.
//
// This function is not commutative, meaning that pickReconnectingAlloc(A, B)
// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep
// the original allocation when possible.
func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
// Check if the replacement is newer.
// Always prefer the replacement if true.
replacementIsNewer := replacement.Job.Version > original.Job.Version ||
replacement.Job.CreateIndex > original.Job.CreateIndex
if replacementIsNewer {
return replacement
}

// Check if the replacement has better placement score.
// If any of the scores is not available, only pick the replacement if
// itself does have scores.
originalMaxScoreMeta := original.Metrics.MaxNormScore()
replacementMaxScoreMeta := replacement.Metrics.MaxNormScore()

replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil ||
(originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil &&
replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore)

// Check if the replacement has better client status.
// Even with a better placement score make sure we don't replace a running
// allocation with one that is not.
replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning

if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) {
return replacement
}

return original
}

// computeUpdates determines which allocations for the passed group require
// updates. Three groups are returned:
// 1. Those that require no upgrades
Expand Down
Loading

0 comments on commit ff72248

Please sign in to comment.