Skip to content

Commit

Permalink
func: add new picker dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Juanadelacuesta committed Feb 22, 2024
1 parent 14280e0 commit 87be0e8
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 43 deletions.
15 changes: 15 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11074,6 +11074,21 @@ func (a *Allocation) NextRescheduleTimeByTime(t time.Time) (time.Time, bool) {
return a.nextRescheduleTime(t, reschedulePolicy)
}

// ShouldBeReplaced tests an alloc for replace in case of disconnection
func (a *Allocation) ShouldBeReplaced() *bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)

if tg == nil {
return nil
}

if tg.Disconnect == nil || tg.Disconnect.Replace == nil {
return nil
}

return tg.Disconnect.Replace
}

// ShouldClientStop tests an alloc for StopAfterClient on the Disconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
Expand Down
55 changes: 13 additions & 42 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
reconnectingpicker "github.com/hashicorp/nomad/scheduler/reconnecting_picker"
)

const (
Expand All @@ -32,6 +33,10 @@ const (
rescheduleWindowSize = 1 * time.Second
)

type ReconnectingPicker interface {
PickReconnectingAlloc(disconnect *structs.DisconnectStrategy, original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation
}

// allocUpdateType takes an existing allocation and a new job definition and
// returns whether the allocation can ignore the change, requires a destructive
// update, or can be inplace updated. If it can be inplace updated, an updated
Expand Down Expand Up @@ -102,6 +107,8 @@ type allocReconciler struct {
// defaults to time.Now, and overridden in unit tests
now time.Time

reconnectingPicker ReconnectingPicker

// result is the results of the reconcile. During computation it can be
// used to store intermediate state
result *reconcileResults
Expand Down Expand Up @@ -195,6 +202,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string,
evalPriority int, supportsDisconnectedClients bool, opts ...AllocReconcilerOption) *allocReconciler {

ar := &allocReconciler{
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
Expand All @@ -216,6 +224,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
taskGroupAllocNameIndexes: make(map[string]*allocNameIndex),
},
reconnectingPicker: reconnectingpicker.New(logger),
}

for _, op := range opts {
Expand Down Expand Up @@ -461,7 +470,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
if len(reconnecting) > 0 {
// Pass all allocations because the replacements we need to find may be
// in any state, including themselves being reconnected.
reconnect, stop := a.reconcileReconnecting(reconnecting, all)
reconnect, stop := a.reconcileReconnecting(reconnecting, all, tg)

// Stop the reconciled allocations and remove them from the other sets
// since they have been already handled.
Expand Down Expand Up @@ -1145,7 +1154,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet) (allocSet, allocSet) {
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet, tg *structs.TaskGroup) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)

Expand Down Expand Up @@ -1199,8 +1208,8 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all alloc
continue
}

// Pick which allocation we want to keep.
keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc)
// Pick which allocation we want to keep using the disconnect reconcile strategy
keepAlloc := a.reconnectingPicker.PickReconnectingAlloc(tg.Disconnect, reconnectingAlloc, replacementAlloc)
if keepAlloc == replacementAlloc {
// The replacement allocation is preferred, so stop the one
// reconnecting if not stopped yet.
Expand Down Expand Up @@ -1235,44 +1244,6 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all alloc
return reconnect, stop
}

// pickReconnectingAlloc returns the allocation to keep between the original
// one that is reconnecting and one of its replacements.
//
// This function is not commutative, meaning that pickReconnectingAlloc(A, B)
// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep
// the original allocation when possible.
func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
// Check if the replacement is newer.
// Always prefer the replacement if true.
replacementIsNewer := replacement.Job.Version > original.Job.Version ||
replacement.Job.CreateIndex > original.Job.CreateIndex
if replacementIsNewer {
return replacement
}

// Check if the replacement has better placement score.
// If any of the scores is not available, only pick the replacement if
// itself does have scores.
originalMaxScoreMeta := original.Metrics.MaxNormScore()
replacementMaxScoreMeta := replacement.Metrics.MaxNormScore()

replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil ||
(originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil &&
replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore)

// Check if the replacement has better client status.
// Even with a better placement score make sure we don't replace a running
// allocation with one that is not.
replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning

if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) {
return replacement
}

return original
}

// computeUpdates determines which allocations for the passed group require
// updates. Three groups are returned:
// 1. Those that require no upgrades
Expand Down
9 changes: 8 additions & 1 deletion scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,14 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri
var eligible bool
switch {
case isDisconnecting:
rescheduleTime, eligible = alloc.NextRescheduleTimeByTime(now)
replace := alloc.ShouldBeReplaced()
if replace != nil {
eligible = *replace
rescheduleTime = now
} else {
// Kept to maintain backwards compatibility with behavior prior to 1.8.0
rescheduleTime, eligible = alloc.NextRescheduleTimeByTime(now)
}

case alloc.ClientStatus == structs.AllocClientStatusUnknown && alloc.FollowupEvalID == evalID:
lastDisconnectTime := alloc.LastUnknown()
Expand Down
108 changes: 108 additions & 0 deletions scheduler/reconnecting_picker/reconnecting_picker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package reconnectingpicker

import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)

// type picker func(*structs.Allocation, *structs.Allocation) *structs.Allocation
type ReconnectingPicker struct {
logger log.Logger
}

func New(logger log.Logger) *ReconnectingPicker {
rp := ReconnectingPicker{
logger: log.L().Named("reconnecting-picker"),
}

return &rp
}

func (rp *ReconnectingPicker) PickReconnectingAlloc(ds *structs.DisconnectStrategy, original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
// Check if the replacement is newer.
// Always prefer the replacement if true.
replacementIsNewer := replacement.Job.Version > original.Job.Version ||
replacement.Job.CreateIndex > original.Job.CreateIndex
if replacementIsNewer {
rp.logger.Debug("replacement has a newer version, keeping replacement")
return replacement
}

// Best score is the default strategy.
strategy := structs.ReconcileOptionBestScore
if ds != nil || ds.Reconcile != "" {
strategy = ds.Reconcile
}

rp.logger.Debug("picking according to strategy", "strategy", strategy)

var picker func(*structs.Allocation, *structs.Allocation) *structs.Allocation
switch strategy {
case structs.ReconcileOptionBestScore:
picker = pickBestScore

case structs.ReconcileOptionKeepOriginal:
picker = pickOriginal

case structs.ReconcileOptionKeepReplacement:
picker = pickReplacement

case structs.ReconcileOptionLongestRunning:
picker = pickLongestRunning
}

return picker(original, replacement)
}

// pickReconnectingAlloc returns the allocation to keep between the original
// one that is reconnecting and one of its replacements.
//
// This function is not commutative, meaning that pickReconnectingAlloc(A, B)
// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep
// the original allocation when possible.
func pickBestScore(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {

// Check if the replacement has better placement score.
// If any of the scores is not available, only pick the replacement if
// itself does have scores.
originalMaxScoreMeta := original.Metrics.MaxNormScore()
replacementMaxScoreMeta := replacement.Metrics.MaxNormScore()

replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil ||
(originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil &&
replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore)

// Check if the replacement has better client status.
// Even with a better placement score make sure we don't replace a running
// allocation with one that is not.
replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning

if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) {
return replacement
}

return original
}

func pickOriginal(original *structs.Allocation, _ *structs.Allocation) *structs.Allocation {
return original
}

func pickReplacement(_ *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
return replacement
}

func pickLongestRunning(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
// Check if the replacement has been running longer.
// Always prefer the replacement if true.
replacementIsLongerRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
if replacementIsLongerRunning {
return replacement
}

return original
}

0 comments on commit 87be0e8

Please sign in to comment.