Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

func: add new picker dependency #20029

Merged
merged 18 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20029.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
server: Add new options for reconcilation in case of disconnected nodes
```
10 changes: 10 additions & 0 deletions nomad/structs/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,13 @@ func (ds *DisconnectStrategy) Canonicalize() {
ds.Reconcile = ReconcileOptionBestScore
}
}

// ReconcileStrategy returns the strategy to be used when reconciling allocations
// after a client reconnects. Best score is the default one.
func (ds *DisconnectStrategy) ReconcileStrategy() string {
if ds == nil || ds.Reconcile == "" {
return ReconcileOptionBestScore
}

return ds.Reconcile
}
41 changes: 38 additions & 3 deletions nomad/structs/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestJobConfig_Validate_LostAfter_Disconnect(t *testing.T) {
must.NoError(t, err)
}

func TestDisconnectStategy_Validate(t *testing.T) {
func TestDisconnectStrategy_Validate(t *testing.T) {
ci.Parallel(t)

cases := []struct {
Expand Down Expand Up @@ -117,7 +117,42 @@ func TestDisconnectStategy_Validate(t *testing.T) {
}
}

func TestJobConfig_Validate_StopAferClient_Disconnect(t *testing.T) {
func TestReconcileStrategy(t *testing.T) {
ci.Parallel(t)

cases := []struct {
name string
disconnectBlock *DisconnectStrategy
expected string
}{
{
name: "nil_disconnect_default_to_best_score",
disconnectBlock: nil,
expected: ReconcileOptionBestScore,
},
{
name: "empty_reconcile_default_to_best_score",
disconnectBlock: &DisconnectStrategy{},
expected: ReconcileOptionBestScore,
},
{
name: "longest_running",
disconnectBlock: &DisconnectStrategy{
Reconcile: ReconcileOptionLongestRunning,
},
expected: ReconcileOptionLongestRunning,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
rs := c.disconnectBlock.ReconcileStrategy()
must.Eq(t, c.expected, rs)
})
}
}

func TestJobConfig_Validate_StopAfterClient_Disconnect(t *testing.T) {
ci.Parallel(t)
// Setup a system Job with Disconnect.StopOnClientAfter set, which is invalid
job := testJob()
Expand Down Expand Up @@ -153,7 +188,7 @@ func TestJobConfig_Validate_StopAferClient_Disconnect(t *testing.T) {

// Test using stop_after_client_disconnect, remove after its deprecated in favor
// of Disconnect.StopOnClientAfter introduced in 1.8.0.
func TestJobConfig_Validate_StopAferClientDisconnect(t *testing.T) {
func TestJobConfig_Validate_StopAfterClientDisconnect(t *testing.T) {
ci.Parallel(t)
// Setup a system Job with stop_after_client_disconnect set, which is invalid
job := testJob()
Expand Down
26 changes: 26 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11074,6 +11074,16 @@ func (a *Allocation) NextRescheduleTimeByTime(t time.Time) (time.Time, bool) {
return a.nextRescheduleTime(t, reschedulePolicy)
}

func (a *Allocation) RescheduleTimeOnDisconnect(now time.Time) (time.Time, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a godoc comment on this one. And do we have a path to deprecate the previous behaviour?

tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil || tg.Disconnect == nil || tg.Disconnect.Replace == nil {
// Kept to maintain backwards compatibility with behavior prior to 1.8.0
return a.NextRescheduleTimeByTime(now)
}

return now, *tg.Disconnect.Replace
}

// ShouldClientStop tests an alloc for StopAfterClient on the Disconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
Expand Down Expand Up @@ -11428,6 +11438,22 @@ func (a *Allocation) NeedsToReconnect() bool {
return disconnected
}

// LastStartOfTask returns the time of the last start event for the given task
// using the allocations TaskStates. If the task has not started, the zero time
// will be returned.
func (a *Allocation) LastStartOfTask(taskName string) time.Time {
task := a.TaskStates[taskName]
if task == nil {
return time.Time{}
}

if task.Restarts > 0 {
return task.LastRestart
}

return task.StartedAt
}

// IdentityClaims are the input to a JWT identifying a workload. It
// should never be serialized to msgpack unsigned.
type IdentityClaims struct {
Expand Down
117 changes: 117 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5891,6 +5891,123 @@ func TestAllocation_NeedsToReconnect(t *testing.T) {
}
}

func TestAllocation_RescheduleTimeOnDisconnect(t *testing.T) {
ci.Parallel(t)
testNow := time.Now()

testAlloc := MockAlloc()

testCases := []struct {
name string
taskGroup string
disconnectGroup *DisconnectStrategy
expected bool
expectedTime time.Time
}{
{
name: "missing_task_group",
taskGroup: "missing-task-group",
expected: false,
expectedTime: time.Time{},
},
{
name: "missing_disconnect_group",
taskGroup: "web",
disconnectGroup: nil,
expected: true,
expectedTime: testNow.Add(RestartPolicyMinInterval), // RestartPolicyMinInterval is the default value
},
{
name: "empty_disconnect_group",
taskGroup: "web",
disconnectGroup: &DisconnectStrategy{},
expected: true,
expectedTime: testNow.Add(RestartPolicyMinInterval), // RestartPolicyMinInterval is the default value
},
{
name: "replace_enabled",
taskGroup: "web",
disconnectGroup: &DisconnectStrategy{
Replace: pointer.Of(true),
},
expected: true,
expectedTime: testNow,
},
{
name: "replace_disabled",
taskGroup: "web",
disconnectGroup: &DisconnectStrategy{
Replace: pointer.Of(false),
},
expected: false,
expectedTime: testNow,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := testAlloc.Copy()
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

alloc.TaskGroup = tc.taskGroup
alloc.Job.TaskGroups[0].Disconnect = tc.disconnectGroup

time, eligible := alloc.RescheduleTimeOnDisconnect(testNow)

must.Eq(t, tc.expected, eligible)
must.Eq(t, tc.expectedTime, time)
})
}
}

func TestAllocation_LastStartOfTask(t *testing.T) {
ci.Parallel(t)
testNow := time.Now()

alloc := MockAlloc()
alloc.TaskStates = map[string]*TaskState{
"task-with-restarts": {
StartedAt: testNow.Add(-30 * time.Minute),
Restarts: 3,
LastRestart: testNow.Add(-5 * time.Minute),
},
"task-without-restarts": {
StartedAt: testNow.Add(-30 * time.Minute),
Restarts: 0,
},
}

testCases := []struct {
name string
taskName string
expected time.Time
}{
{
name: "missing_task",
taskName: "missing-task",
expected: time.Time{},
},
{
name: "task_with_restarts",
taskName: "task-with-restarts",
expected: testNow.Add(-5 * time.Minute),
},
{
name: "task_without_restarts",
taskName: "task-without-restarts",
expected: testNow.Add(-30 * time.Minute),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc.TaskGroup = "web"
got := alloc.LastStartOfTask(tc.taskName)

must.Eq(t, tc.expected, got)
})
}
}

func TestAllocation_Canonicalize_Old(t *testing.T) {
ci.Parallel(t)

Expand Down
55 changes: 13 additions & 42 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
reconnectingpicker "github.com/hashicorp/nomad/scheduler/reconnecting_picker"
)

const (
Expand All @@ -32,6 +33,10 @@ const (
rescheduleWindowSize = 1 * time.Second
)

type ReconnectingPicker interface {
PickReconnectingAlloc(disconnect *structs.DisconnectStrategy, original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation
}
Comment on lines +36 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this interface? reconnectingpicker.ReconnectingPicker seems to be the only implementation.

I could see a case where the drain strategy could be an interface with one implementation per strategy, but ReconnectingPicker seems to be handling all of them already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im aiming at removing the responsibility of the picking from the allocReconciler, that is the main reason of te interface :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interfaces in Go are a bit strange, and work differently than in other languages, like Java or C#.

Since they're implicitly defined pre-creating them can often lead to a confusing implementation. Specially for people new to the code base, trying to "jump to definition" in an interface is quite frustrating 😬

Sometimes that's unavoidable because we want to have multiple implementations (like a real thing and a test implementation), or if want to restrict what to expose (like only exposing certain methods of a struct).

But in this case I can't really think of a different implementations of ReconnectingPicker. The entire decision is already encapsulated in the reconnectingpicker.ReconnectingPicker implementation (the fact they have the same name kind of point to that as well 😅).

So we don't need this interface to remove responsibility from the reconciler. Using the concrete struct already accomplishes that. And having it creates unnecessary indirection that makes the code harder to follow.

If, later on, we do find a need for an interface, the implicit nature of Go's interface can help. If the reconciler ends up calling X(), Y(), and Z(), where each behaves differently under some conditions, we can easily add them to a new interface without much code change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having that interface in place gives us a define frontier between the alloc reconciler and the reconnecting picker: Imagine we refactor the reconciler and we want to unit test the logic. If we remove the interface, we remove the possibility of having a look at the internals of the reconciliation, we will need to write tests that also take into account the picker logic, no way to isolate components. If we leave the interface where it is, it is the perfect cutting point where we can add a test implementation of the picker just for the purpose of verifying the behaviour of the reconciliation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in PickReconnectingAlloc is already pretty self-contained, it doesn't access external data and doesn't cause any side-effects, so I'm not sure we would need to isolate it in tests. I would expect the need would have showed up in the PR already.

But nevertheless, the nice things about implicit interfaces is that, in case we do find ourselves needing alternative implementation, the only line we would need to change is the type of reconnectingPicker, everything else stays the same.

reconnectingPicker ReconnectingPicker

The part of the code that has the biggest impact (and it's not even that big) on the scenario you mentioned is this one:

reconnectingPicker: reconnectingpicker.New(logger),

This is what is preventing us from feeding the reconciler with different implementation because the reconciler itself is making a decision on what to instantiate, instead of the dependency being fed externally, so it's violating the isolation you mentioned.

But, for the same reason we don't need this interface, we don't need to change this logic. If the need does arise to provide alternate implementation, it's pretty straightforward to update the code. But until then, this interfaces creates unnecessary indirection, making the code hard to read.

https://100go.co/5-interface-pollution/ has some good discussion about interfaces in Go.

Copy link
Member Author

@Juanadelacuesta Juanadelacuesta Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is meant to isolate the logic of the reconciler, not of the picker. When we get to the point of refactoring the reconciler, we can add a configuration option to change the picker, as done in other parts of the code. I you ask me, there are not enough interfaces in the nomad code, isolating components is almost impossible in some parts :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're refactoring the reconciler then adding this interface if necessary won't be any extra work. But until then, this interface is not bringing any benefits, and it's making the code harder to read.


// allocUpdateType takes an existing allocation and a new job definition and
// returns whether the allocation can ignore the change, requires a destructive
// update, or can be inplace updated. If it can be inplace updated, an updated
Expand Down Expand Up @@ -102,6 +107,8 @@ type allocReconciler struct {
// defaults to time.Now, and overridden in unit tests
now time.Time

reconnectingPicker ReconnectingPicker

// result is the results of the reconcile. During computation it can be
// used to store intermediate state
result *reconcileResults
Expand Down Expand Up @@ -195,6 +202,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string,
evalPriority int, supportsDisconnectedClients bool, opts ...AllocReconcilerOption) *allocReconciler {

ar := &allocReconciler{
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
Expand All @@ -216,6 +224,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
taskGroupAllocNameIndexes: make(map[string]*allocNameIndex),
},
reconnectingPicker: reconnectingpicker.New(logger),
}

for _, op := range opts {
Expand Down Expand Up @@ -461,7 +470,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
if len(reconnecting) > 0 {
// Pass all allocations because the replacements we need to find may be
// in any state, including themselves being reconnected.
reconnect, stop := a.reconcileReconnecting(reconnecting, all)
reconnect, stop := a.reconcileReconnecting(reconnecting, all, tg)

// Stop the reconciled allocations and remove them from the other sets
// since they have been already handled.
Expand Down Expand Up @@ -1145,7 +1154,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// - If the reconnecting allocation is to be stopped, its replacements may
// not be present in any of the returned sets. The rest of the reconciler
// logic will handle them.
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet) (allocSet, allocSet) {
func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all allocSet, tg *structs.TaskGroup) (allocSet, allocSet) {
stop := make(allocSet)
reconnect := make(allocSet)

Expand Down Expand Up @@ -1199,8 +1208,8 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all alloc
continue
}

// Pick which allocation we want to keep.
keepAlloc := pickReconnectingAlloc(reconnectingAlloc, replacementAlloc)
// Pick which allocation we want to keep using the disconnect reconcile strategy
keepAlloc := a.reconnectingPicker.PickReconnectingAlloc(tg.Disconnect, reconnectingAlloc, replacementAlloc)
if keepAlloc == replacementAlloc {
// The replacement allocation is preferred, so stop the one
// reconnecting if not stopped yet.
Expand Down Expand Up @@ -1235,44 +1244,6 @@ func (a *allocReconciler) reconcileReconnecting(reconnecting allocSet, all alloc
return reconnect, stop
}

// pickReconnectingAlloc returns the allocation to keep between the original
// one that is reconnecting and one of its replacements.
//
// This function is not commutative, meaning that pickReconnectingAlloc(A, B)
// is not the same as pickReconnectingAlloc(B, A). Preference is given to keep
// the original allocation when possible.
func pickReconnectingAlloc(original *structs.Allocation, replacement *structs.Allocation) *structs.Allocation {
// Check if the replacement is newer.
// Always prefer the replacement if true.
replacementIsNewer := replacement.Job.Version > original.Job.Version ||
replacement.Job.CreateIndex > original.Job.CreateIndex
if replacementIsNewer {
return replacement
}

// Check if the replacement has better placement score.
// If any of the scores is not available, only pick the replacement if
// itself does have scores.
originalMaxScoreMeta := original.Metrics.MaxNormScore()
replacementMaxScoreMeta := replacement.Metrics.MaxNormScore()

replacementHasBetterScore := originalMaxScoreMeta == nil && replacementMaxScoreMeta != nil ||
(originalMaxScoreMeta != nil && replacementMaxScoreMeta != nil &&
replacementMaxScoreMeta.NormScore > originalMaxScoreMeta.NormScore)

// Check if the replacement has better client status.
// Even with a better placement score make sure we don't replace a running
// allocation with one that is not.
replacementIsRunning := replacement.ClientStatus == structs.AllocClientStatusRunning
originalNotRunning := original.ClientStatus != structs.AllocClientStatusRunning

if replacementHasBetterScore && (replacementIsRunning || originalNotRunning) {
return replacement
}

return original
}

// computeUpdates determines which allocations for the passed group require
// updates. Three groups are returned:
// 1. Those that require no upgrades
Expand Down
Loading
Loading