Skip to content

Commit

Permalink
VPA: immediately prune stale vpa aggregates
Browse files Browse the repository at this point in the history
Previously we were letting the rate limited garbage collector clean up
the aggregate states, and that works really well in most cases, but when
the list of containers in a pod changes, either due to the removal or
rename of a container, the aggregates for the old containers stick
around forever and cause problems.

To get around this, this marks all existing aggregates/initial
aggregates in the list for each VPA as "not under a VPA" every time
before we LoadPods(), and then LoadPods() will re-mark the aggregates as
"under a VPA" for all the ones that are still there, which lets us
easily prune the stale container aggregates that are still marked as
"not under a VPA" but are still wrongly in the VPA's list.

This does leave the ultimate garbage collection to the rate limited
garbage collector, which should be fine, we just needed the stale
entries to get removed from the per-VPA lists so they didn't affect VPA
behavior.
  • Loading branch information
jkyros committed Apr 20, 2024
1 parent a5ed910 commit c34b2ba
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 2 deletions.
55 changes: 55 additions & 0 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type ClusterStateFeeder interface {

// GarbageCollectCheckpoints removes historical checkpoints that don't have a matching VPA.
GarbageCollectCheckpoints()

// MarkAggregates marks all aggregates in all VPAs as not under VPAs
MarkAggregates()

// SweepAggregates garbage collects all aggregates in all VPAs aggregate lists that are no longer under VPAs
SweepAggregates()
}

// ClusterStateFeederFactory makes instances of ClusterStateFeeder.
Expand Down Expand Up @@ -400,6 +406,55 @@ func (feeder *clusterStateFeeder) LoadVPAs() {
feeder.clusterState.ObservedVpas = vpaCRDs
}

// MarkAggregates marks all aggregates IsUnderVPA=false, so when we go
// through LoadPods(), the valid ones will get marked back to true, and
// we can garbage collect the false ones from the VPAs' aggregate lists.
func (feeder *clusterStateFeeder) MarkAggregates() {
for _, vpa := range feeder.clusterState.Vpas {
for _, container := range vpa.AggregateContainerStates() {
container.IsUnderVPA = false
}
for _, container := range vpa.ContainersInitialAggregateState {
container.IsUnderVPA = false
}
}
}

// SweepAggregates garbage collects all aggregates/initial aggregates from the VPA where the
// aggregate's container no longer exists.
func (feeder *clusterStateFeeder) SweepAggregates() {

var aggregatesPruned int
var initialAggregatesPruned int

// TODO(jkyros): This only removes the container state from the VPA's aggregate states, there
// is still a reference to them in feeder.clusterState.aggregateStateMap, and those get
// garbage collected eventually by the rate limited aggregate garbage collector later.
// Maybe we should clean those up here too since we know which ones are stale?
for _, vpa := range feeder.clusterState.Vpas {

for containerKey, container := range vpa.AggregateContainerStates() {
if !container.IsUnderVPA {
klog.V(4).Infof("Deleting Aggregate for VPA %s/%s: container %s no longer present", vpa.ID.Namespace, vpa.ID.VpaName, containerKey.ContainerName())
vpa.DeleteAggregation(containerKey)
aggregatesPruned = aggregatesPruned + 1

}
}
for containerKey, container := range vpa.ContainersInitialAggregateState {
if !container.IsUnderVPA {
klog.V(4).Infof("Deleting Initial Aggregate for VPA %s/%s: container %s no longer present", vpa.ID.Namespace, vpa.ID.VpaName, containerKey)
delete(vpa.ContainersInitialAggregateState, containerKey)
initialAggregatesPruned = initialAggregatesPruned + 1

}
}
}
if initialAggregatesPruned > 0 || aggregatesPruned > 0 {
klog.Infof("Pruned %d aggregate and %d initial aggregate containers", aggregatesPruned, initialAggregatesPruned)
}
}

// LoadPods loads pod into the cluster state.
func (feeder *clusterStateFeeder) LoadPods() {
podSpecs, err := feeder.specClient.GetPodSpecs()
Expand Down
4 changes: 3 additions & 1 deletion vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ func (cluster *ClusterState) AddOrUpdateContainer(containerID ContainerID, reque
if !podExists {
return NewKeyError(containerID.PodID)
}
aggregateState := cluster.findOrCreateAggregateContainerState(containerID)
if container, containerExists := pod.Containers[containerID.ContainerName]; !containerExists {
cluster.findOrCreateAggregateContainerState(containerID)
pod.Containers[containerID.ContainerName] = NewContainerState(request, NewContainerStateAggregatorProxy(cluster, containerID))
} else {
// Container aleady exists. Possibly update the request.
container.Request = request
// Mark this container as still managed so the aggregates don't get garbage collected
aggregateState.IsUnderVPA = true
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/logic"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
metrics_recommender "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/recommender"
vpa_utils "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
)
Expand Down Expand Up @@ -157,9 +157,15 @@ func (r *recommender) RunOnce() {
r.clusterStateFeeder.LoadVPAs()
timer.ObserveStep("LoadVPAs")

r.clusterStateFeeder.MarkAggregates()
timer.ObserveStep("MarkAggregates")

r.clusterStateFeeder.LoadPods()
timer.ObserveStep("LoadPods")

r.clusterStateFeeder.SweepAggregates()
timer.ObserveStep("SweepAggregates")

r.clusterStateFeeder.LoadRealTimeMetrics()
timer.ObserveStep("LoadMetrics")
klog.V(3).Infof("ClusterState is tracking %v PodStates and %v VPAs", len(r.clusterState.Pods), len(r.clusterState.Vpas))
Expand Down

0 comments on commit c34b2ba

Please sign in to comment.