Skip to content

Commit

Permalink
Add globalPruningGracePeriod flag which defaults to a long time for n…
Browse files Browse the repository at this point in the history
…on-breaking opt-in change

Signed-off-by: Max Cao <[email protected]>
  • Loading branch information
maxcao13 committed Jan 2, 2025
1 parent e6a8d24 commit 72ae5d7
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 66 deletions.
15 changes: 4 additions & 11 deletions vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,17 +373,10 @@ spec:
- "Off"
type: string
pruningGracePeriod:
description: PruningGracePeriod is the duration to wait
before pruning recommendations for containers that no
longer exist. This is useful for containers created by
Jobs from CronJobs, which are frequently created and deleted.
By setting a grace period, recommendations for these containers
are not pruned immediately after they are removed, providing
recommendations to new containers created by subsequent
Jobs. If not specified, recommendations for non-existent
containers are pruned the next time a recommendation loop
is run. However, if the targetRef points to a CronJob,
the default value is 24 hours.
description: |-
PruningGracePeriod is the duration to wait before pruning recommendations for containers that no longer exist.
By default, recommendations for non-existent containers are never pruned until its top-most controller is deleted,
after which the recommendations are subject to the VPA's recommendation garbage collector.
type: string
type: object
type: array
Expand Down
2 changes: 2 additions & 0 deletions vertical-pod-autoscaler/e2e/v1/recommender.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ var _ = RecommenderE2eDescribe("VPA CRD object", func() {
WithNamespace(f.Namespace.Name).
WithTargetRef(hamsterTargetRef).
WithContainer("*").
WithPruningGracePeriod("*", 0).
Get()

InstallVPA(f, vpaCRD)
Expand Down Expand Up @@ -477,6 +478,7 @@ var _ = RecommenderE2eDescribe("VPA CRD object", func() {
WithNamespace(f.Namespace.Name).
WithTargetRef(hamsterTargetRef).
WithContainer("*").
WithPruningGracePeriod("*", 0).
Get()

InstallVPA(f, vpaCRD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,8 @@ type ContainerResourcePolicy struct {
ControlledValues *ContainerControlledValues `json:"controlledValues,omitempty" protobuf:"bytes,6,rep,name=controlledValues"`

// PruningGracePeriod is the duration to wait before pruning recommendations for containers that no longer exist.
// This is useful for containers created by Jobs from CronJobs, which are frequently created and deleted.
// By setting a grace period, recommendations for these containers are not pruned immediately
// after they are removed, providing recommendations to new containers created by subsequent Jobs.
// If not specified, recommendations for non-existent containers are pruned the next time a recommendation
// loop is run. However, if the targetRef points to a CronJob, the default value is 24 hours.
// By default, recommendations for non-existent containers are never pruned until its top-most controller is deleted,
// after which the recommendations are subject to the VPA's recommendation garbage collector.
// +optional
PruningGracePeriod *metav1.Duration `json:"pruningGracePeriod,omitempty" protobuf:"bytes,4,opt,name=pruningGracePeriod"`
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() {
now := time.Now()
for _, vpa := range feeder.clusterState.Vpas {
for containerKey, container := range vpa.AggregateContainerStates() {
if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) {
if !container.IsUnderVPA && container.IsAggregateStale(now) {
klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
Expand All @@ -454,7 +454,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() {
}
}
for containerKey, container := range vpa.ContainersInitialAggregateState {
if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) {
if !container.IsUnderVPA && container.IsAggregateStale(now) {
klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ limitations under the License.
package model

import (
"flag"
"fmt"
"math"
"time"
Expand All @@ -46,6 +47,22 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util"
)

var (
globalPruningGracePeriodDuration = flag.String("pruning-grace-period-duration", "", `The grace period for deleting stale aggregates and recommendations. An empty duration will disable the grace period for all containers by default.`)
parsedPruningGracePeriodDuration = parsePruningGracePeriodDuration()
)

func parsePruningGracePeriodDuration() *time.Duration {
if globalPruningGracePeriodDuration == nil || *globalPruningGracePeriodDuration == "" {
return nil
}
duration, err := time.ParseDuration(*globalPruningGracePeriodDuration)
if err != nil {
panic(fmt.Sprintf("Failed to parse --pruning-grace-period-duration: %v", err))
}
return &duration
}

// ContainerNameToAggregateStateMap maps a container name to AggregateContainerState
// that aggregates state of containers with that name.
type ContainerNameToAggregateStateMap map[string]*AggregateContainerState
Expand Down Expand Up @@ -111,7 +128,7 @@ type AggregateContainerState struct {
ScalingMode *vpa_types.ContainerScalingMode
ControlledResources *[]ResourceName
LastUpdateTime time.Time
PruningGracePeriod metav1.Duration
PruningGracePeriod *time.Duration
}

// GetLastRecommendation returns last recorded recommendation.
Expand Down Expand Up @@ -145,14 +162,12 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName {
return DefaultControlledResources
}

// GetLastUpdate returns the time of the last update of the VPA object controlling this aggregator.
func (a *AggregateContainerState) GetLastUpdate() time.Time {
return a.LastUpdateTime
}

// GetPruningGracePeriod returns the pruning grace period set in the VPA object controlling this aggregator.
func (a *AggregateContainerState) GetPruningGracePeriod() metav1.Duration {
return a.PruningGracePeriod
// IsAggregateStale returns true if the last update time is past its grace period and the aggregate should be pruned.
func (a *AggregateContainerState) IsAggregateStale(now time.Time) bool {
if a.PruningGracePeriod == nil {
return false
}
return now.After(a.LastUpdateTime.Add(*a.PruningGracePeriod))
}

// MarkNotAutoscaled registers that this container state is not controlled by
Expand Down Expand Up @@ -303,6 +318,15 @@ func (a *AggregateContainerState) UpdateFromPolicy(resourcePolicy *vpa_types.Con
}
}

// UpdatePruningGracePeriod updates the an aggregate state with a containerPruningGracePeriod or the global pruning duration if nil.
func (a *AggregateContainerState) UpdatePruningGracePeriod(containerPruningGracePeriod *metav1.Duration) {
if containerPruningGracePeriod != nil {
a.PruningGracePeriod = &containerPruningGracePeriod.Duration
} else {
a.PruningGracePeriod = parsedPruningGracePeriodDuration
}
}

// AggregateStateByContainerName takes a set of AggregateContainerStates and merge them
// grouping by the container name. The result is a map from the container name to the aggregation
// from all input containers with the given name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package model

import (
"flag"
"testing"
"time"

Expand All @@ -26,6 +27,7 @@ import (
labels "k8s.io/apimachinery/pkg/labels"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util"
"k8s.io/utils/ptr"
)

var (
Expand Down Expand Up @@ -294,3 +296,44 @@ func TestUpdateFromPolicyControlledResources(t *testing.T) {
})
}
}

func TestParsePruningGracePeriodDuration(t *testing.T) {

testCases := []struct {
name string
initialFlag string
policy *vpa_types.ContainerResourcePolicy
expected *time.Duration
}{
{
name: "Explicit PruningGracePeriod",
initialFlag: "10m",
policy: &vpa_types.ContainerResourcePolicy{
PruningGracePeriod: &metav1.Duration{Duration: 10 * time.Minute},
},
expected: ptr.To(10 * time.Minute),
}, {
name: "No PruningGracePeriod specified - default to nil",
initialFlag: "",
policy: &vpa_types.ContainerResourcePolicy{},
expected: nil,
}, {
name: "Invalid policy - exit with error",
initialFlag: "badDuration",
policy: nil,
expected: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
flag.Set("pruning-grace-period-duration", tc.initialFlag)
cs := NewAggregateContainerState()
cs.UpdateFromPolicy(tc.policy)
if tc.policy != nil {
assert.Equal(t, tc.expected, parsePruningGracePeriodDuration())
} else {
assert.Panics(t, func() { parsePruningGracePeriodDuration() })
}
})
}
}
24 changes: 17 additions & 7 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,26 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto
}

vpa, vpaExists := cluster.Vpas[vpaID]
if vpaExists && (vpa.PodSelector.String() != selector.String()) {
// Pod selector was changed. Delete the VPA object and recreate
// it with the new selector.
if err := cluster.DeleteVpa(vpaID); err != nil {
return err
if vpaExists {
if vpa.PodSelector.String() != selector.String() {
// Pod selector was changed. Delete the VPA object and recreate
// it with the new selector.
if err := cluster.DeleteVpa(vpaID); err != nil {
return err
}

vpaExists = false
} else {
// Update the pruningGracePeriod to ensure a potential new grace period is applied.
// This prevents an old, excessively long grace period from persisting and
// potentially causing the VPA to keep stale aggregates with an outdated grace period.
for key, containerState := range vpa.aggregateContainerStates {
containerState.UpdatePruningGracePeriod(vpa_utils.GetContainerPruningGracePeriod(key.ContainerName(), apiObject.Spec.ResourcePolicy))
}
}
vpaExists = false
}
if !vpaExists {
vpa = NewVpa(vpaID, selector, apiObject.Spec.TargetRef, apiObject.CreationTimestamp.Time)
vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time)
cluster.Vpas[vpaID] = vpa
for aggregationKey, aggregation := range cluster.aggregateStateMap {
vpa.UseAggregationIfMatching(aggregationKey, aggregation)
Expand Down
5 changes: 2 additions & 3 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Vpa struct {

// NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the
// links to the matched aggregations.
func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVersionObjectReference, created time.Time) *Vpa {
func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa {
vpa := &Vpa{
ID: id,
PodSelector: selector,
Expand All @@ -128,7 +128,6 @@ func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVers
Created: created,
Annotations: make(vpaAnnotationsMap),
Conditions: make(vpaConditionsMap),
TargetRef: targetRef,
// APIVersion defaults to the version of the client used to read resources.
// If a new version is introduced that needs to be differentiated beyond the
// client conversion, this needs to be done based on the resource content.
Expand Down Expand Up @@ -161,7 +160,7 @@ func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggre
vpa.aggregateContainerStates[aggregationKey] = aggregation
aggregation.IsUnderVPA = true
aggregation.UpdateMode = vpa.UpdateMode
aggregation.PruningGracePeriod = vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy, vpa.TargetRef)
aggregation.UpdatePruningGracePeriod(vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy))
aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy))
}
}
Expand Down
18 changes: 6 additions & 12 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

autoscaling "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -31,20 +30,15 @@ import (
)

var (
anyTime = time.Unix(0, 0)
regularTargetRef = &autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "test-deployment",
APIVersion: "apps/v1",
}
anyTime = time.Unix(0, 0)
// TODO(maxcao13): write tests for new container policy field
)

func TestMergeAggregateContainerState(t *testing.T) {

containersInitialAggregateState := ContainerNameToAggregateStateMap{}
containersInitialAggregateState["test"] = NewAggregateContainerState()
vpa := NewVpa(VpaID{}, nil, nil, anyTime)
vpa := NewVpa(VpaID{}, nil, anyTime)
vpa.ContainersInitialAggregateState = containersInitialAggregateState

containerNameToAggregateStateMap := ContainerNameToAggregateStateMap{}
Expand Down Expand Up @@ -126,7 +120,7 @@ func TestUpdateConditions(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
containerName := "container"
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), nil, time.Unix(0, 0))
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), time.Unix(0, 0))
if tc.hasRecommendation {
vpa.Recommendation = test.Recommendation().WithContainer(containerName).WithTarget("5", "200").Get()
}
Expand Down Expand Up @@ -196,7 +190,7 @@ func TestUpdateRecommendation(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
namespace := "test-namespace"
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), regularTargetRef, anyTime)
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), anyTime)
for container, rec := range tc.containers {
state := &AggregateContainerState{}
if rec != nil {
Expand Down Expand Up @@ -362,7 +356,7 @@ func TestUseAggregationIfMatching(t *testing.T) {
if !assert.NoError(t, err) {
t.FailNow()
}
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime)
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, anyTime)
vpa.UpdateMode = tc.updateMode
key := mockAggregateStateKey{
namespace: namespace,
Expand Down Expand Up @@ -549,7 +543,7 @@ func TestSetResourcePolicy(t *testing.T) {
if !assert.NoError(t, err) {
t.FailNow()
}
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime)
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, anyTime)
for _, container := range tc.containers {
containerKey, aggregation := testAggregation(vpa, container, labels.Set(testLabels).String())
vpa.aggregateContainerStates[containerKey] = aggregation
Expand Down
20 changes: 15 additions & 5 deletions vertical-pod-autoscaler/pkg/utils/test/test_vpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type VerticalPodAutoscalerBuilder interface {
WithMinAllowed(containerName, cpu, memory string) VerticalPodAutoscalerBuilder
WithMaxAllowed(containerName, cpu, memory string) VerticalPodAutoscalerBuilder
WithControlledValues(containerName string, mode vpa_types.ContainerControlledValues) VerticalPodAutoscalerBuilder
WithPruningGracePeriod(containerName string, pruningGracePeriodSeconds int32) VerticalPodAutoscalerBuilder
WithScalingMode(containerName string, scalingMode vpa_types.ContainerScalingMode) VerticalPodAutoscalerBuilder
WithTarget(cpu, memory string) VerticalPodAutoscalerBuilder
WithTargetResource(resource core.ResourceName, value string) VerticalPodAutoscalerBuilder
Expand Down Expand Up @@ -66,6 +67,7 @@ func VerticalPodAutoscaler() VerticalPodAutoscalerBuilder {
minAllowed: map[string]core.ResourceList{},
maxAllowed: map[string]core.ResourceList{},
controlledValues: map[string]*vpa_types.ContainerControlledValues{},
pruningGracePeriod: map[string]*meta.Duration{},
scalingMode: map[string]*vpa_types.ContainerScalingMode{},
}
}
Expand All @@ -80,6 +82,7 @@ type verticalPodAutoscalerBuilder struct {
minAllowed map[string]core.ResourceList
maxAllowed map[string]core.ResourceList
controlledValues map[string]*vpa_types.ContainerControlledValues
pruningGracePeriod map[string]*meta.Duration
scalingMode map[string]*vpa_types.ContainerScalingMode
recommendation RecommendationBuilder
conditions []vpa_types.VerticalPodAutoscalerCondition
Expand Down Expand Up @@ -140,6 +143,12 @@ func (b *verticalPodAutoscalerBuilder) WithControlledValues(containerName string
return &c
}

func (b *verticalPodAutoscalerBuilder) WithPruningGracePeriod(containerName string, pruningGracePeriodSeconds int32) VerticalPodAutoscalerBuilder {
c := *b
c.pruningGracePeriod[containerName] = &meta.Duration{Duration: time.Duration(pruningGracePeriodSeconds) * time.Second}
return &c
}

func (b *verticalPodAutoscalerBuilder) WithScalingMode(containerName string, scalingMode vpa_types.ContainerScalingMode) VerticalPodAutoscalerBuilder {
c := *b
c.scalingMode[containerName] = &scalingMode
Expand Down Expand Up @@ -245,11 +254,12 @@ func (b *verticalPodAutoscalerBuilder) Get() *vpa_types.VerticalPodAutoscaler {
scalingModeAuto := vpa_types.ContainerScalingModeAuto
for _, containerName := range b.containerNames {
containerResourcePolicy := vpa_types.ContainerResourcePolicy{
ContainerName: containerName,
MinAllowed: b.minAllowed[containerName],
MaxAllowed: b.maxAllowed[containerName],
ControlledValues: b.controlledValues[containerName],
Mode: &scalingModeAuto,
ContainerName: containerName,
Mode: &scalingModeAuto,
MinAllowed: b.minAllowed[containerName],
MaxAllowed: b.maxAllowed[containerName],
ControlledValues: b.controlledValues[containerName],
PruningGracePeriod: b.pruningGracePeriod[containerName],
}
if scalingMode, ok := b.scalingMode[containerName]; ok {
containerResourcePolicy.Mode = scalingMode
Expand Down
Loading

0 comments on commit 72ae5d7

Please sign in to comment.