Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scaling: fix state store corruption bug for job scaling events #23673

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/23673.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scaling: Fixed a bug where state store corruption could occur when writing scaling events
```
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR

var jobEvents *structs.JobScalingEvents
if existing != nil {
jobEvents = existing.(*structs.JobScalingEvents)
jobEvents = existing.(*structs.JobScalingEvents).Copy()
} else {
jobEvents = &structs.JobScalingEvents{
Namespace: req.Namespace,
Expand Down
15 changes: 9 additions & 6 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10636,9 +10636,10 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) {
job := mock.Job()
groupName := job.TaskGroups[0].Name

newEvent := structs.NewScalingEvent("message 1").SetMeta(map[string]interface{}{
newEvent := structs.NewScalingEvent("message 1")
newEvent.Meta = map[string]interface{}{
"a": 1,
})
}

wsAll := memdb.NewWatchSet()
all, err := state.ScalingEvents(wsAll)
Expand Down Expand Up @@ -10709,10 +10710,11 @@ func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) {

index := uint64(1000)
for i := 1; i <= structs.JobTrackedScalingEvents+10; i++ {
newEvent := structs.NewScalingEvent("").SetMeta(map[string]interface{}{
newEvent := structs.NewScalingEvent("")
newEvent.Meta = map[string]interface{}{
"i": i,
"group": group1,
})
}
err := state.UpsertScalingEvent(index, &structs.ScalingEventRequest{
Namespace: namespace,
JobID: jobID,
Expand All @@ -10722,10 +10724,11 @@ func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) {
index++
require.NoError(err)

newEvent = structs.NewScalingEvent("").SetMeta(map[string]interface{}{
newEvent = structs.NewScalingEvent("")
newEvent.Meta = map[string]interface{}{
"i": i,
"group": group2,
})
}
err = state.UpsertScalingEvent(index, &structs.ScalingEventRequest{
Namespace: namespace,
JobID: jobID,
Expand Down
37 changes: 25 additions & 12 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6096,6 +6096,21 @@ type JobScalingEvents struct {
ModifyIndex uint64
}

func (j *JobScalingEvents) Copy() *JobScalingEvents {
if j == nil {
return nil
}
njse := new(JobScalingEvents)
*njse = *j

njse.ScalingEvents = map[string][]*ScalingEvent{}
tgross marked this conversation as resolved.
Show resolved Hide resolved
for taskGroup, events := range j.ScalingEvents {
njse.ScalingEvents[taskGroup] = helper.CopySlice(events)
}

return njse
}

// NewScalingEvent method for ScalingEvent objects.
func NewScalingEvent(message string) *ScalingEvent {
return &ScalingEvent{
Expand Down Expand Up @@ -6131,19 +6146,17 @@ type ScalingEvent struct {
CreateIndex uint64
}

func (e *ScalingEvent) SetError(error bool) *ScalingEvent {
e.Error = error
return e
}

func (e *ScalingEvent) SetMeta(meta map[string]interface{}) *ScalingEvent {
e.Meta = meta
return e
}
func (e *ScalingEvent) Copy() *ScalingEvent {
if e == nil {
return nil
}
ne := new(ScalingEvent)
*ne = *e

func (e *ScalingEvent) SetEvalID(evalID string) *ScalingEvent {
e.EvalID = &evalID
return e
ne.Count = pointer.Copy(e.Count)
ne.Meta = maps.Clone(e.Meta)
ne.EvalID = pointer.Copy(e.EvalID)
return ne
}

// ScalingEventRequest is by for Job.Scale endpoint
Expand Down