Skip to content

Commit

Permalink
VPA: track list of managed pods on VPA object
Browse files Browse the repository at this point in the history
Previously the VPA object didn't have any references to the pods it
was managing, only the count of those pods. This required us to do a lot
of looping over pods if we wanted to know any details about the pod
states of the pods that were being managed by a VPA.

This adds a ManagedPods map to the VPA object that contains references
to the cluster state for each of the pods the VPA is managing, which
lets us cheaply access them so we can do things like container cleanup.
  • Loading branch information
jkyros committed May 2, 2024
1 parent 761cb5d commit 6ddc208
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package metrics

import (
"context"
"time"

k8sapiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -28,7 +30,6 @@ import (
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
"k8s.io/metrics/pkg/client/external_metrics"
"time"
)

// PodMetricsLister wraps both metrics-client and External Metrics
Expand Down Expand Up @@ -82,7 +83,7 @@ func (s *externalMetricsClient) List(ctx context.Context, namespace string, opts
result := v1beta1.PodMetricsList{}

for _, vpa := range s.clusterState.Vpas {
if vpa.PodCount == 0 {
if vpa.PodCount() == 0 {
continue
}

Expand Down
33 changes: 7 additions & 26 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,34 +143,16 @@ func (cluster *ClusterState) AddOrUpdatePod(podID PodID, newLabels labels.Set, p

cluster.addPodToItsVpa(pod)
}
// Tally the number of containers for later when we're averaging the recommendations
cluster.setVPAContainersPerPod(pod)
pod.Phase = phase
}

// setVPAContainersPerPod sets the number of containers per pod seen for pods connected to this VPA
// so that later when we're splitting the minimum recommendations over containers, we're splitting them over
// the correct number and not just the number of aggregates that have *ever* been present. (We don't want
// minimum resources to erroneously shrink, either)
func (cluster *ClusterState) setVPAContainersPerPod(pod *PodState) {
for _, vpa := range cluster.Vpas {
if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) {
// We want the "high water mark" of the most containers in the pod in the event
// that we're rolling out a pod that has an additional container
if len(pod.Containers) > vpa.ContainersPerPod {
vpa.ContainersPerPod = len(pod.Containers)
}
}
}

pod.Phase = phase
}

// addPodToItsVpa increases the count of Pods associated with a VPA object.
// Does a scan similar to findOrCreateAggregateContainerState so could be optimized if needed.
func (cluster *ClusterState) addPodToItsVpa(pod *PodState) {
for _, vpa := range cluster.Vpas {
if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) {
vpa.PodCount++
vpa.ManagedPods[pod.ID] = pod
}
}
}
Expand All @@ -179,7 +161,7 @@ func (cluster *ClusterState) addPodToItsVpa(pod *PodState) {
func (cluster *ClusterState) removePodFromItsVpa(pod *PodState) {
for _, vpa := range cluster.Vpas {
if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) {
vpa.PodCount--
delete(vpa.ManagedPods, pod.ID)
}
}
}
Expand Down Expand Up @@ -293,13 +275,12 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto
for aggregationKey, aggregation := range cluster.aggregateStateMap {
vpa.UseAggregationIfMatching(aggregationKey, aggregation)
}
vpa.PodCount = len(cluster.GetMatchingPods(vpa))
// Populate the VPA's managed pods from the cluster state
for _, podId := range cluster.GetMatchingPods(vpa) {
vpa.ManagedPods[podId] = cluster.Pods[podId]
}
}

// Default this to the minimum, we will tally the true number when we load the pods later
// TODO(jkyros): This is gross, it depends on the order I know it currently loads things in, but
// that might not be the case someday
vpa.ContainersPerPod = 1
vpa.TargetRef = apiObject.Spec.TargetRef
vpa.Annotations = annotationsMap
vpa.Conditions = conditionsMap
Expand Down
6 changes: 3 additions & 3 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/klog/v2"

vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
)

Expand Down Expand Up @@ -909,7 +909,7 @@ func TestVPAWithMatchingPods(t *testing.T) {
containerID := ContainerID{testPodID, "foo"}
assert.NoError(t, cluster.AddOrUpdateContainer(containerID, testRequest))
}
assert.Equal(t, tc.expectedMatch, cluster.Vpas[vpa.ID].PodCount)
assert.Equal(t, tc.expectedMatch, cluster.Vpas[vpa.ID].PodCount())
})
}
// Run with adding Pods first
Expand All @@ -922,7 +922,7 @@ func TestVPAWithMatchingPods(t *testing.T) {
assert.NoError(t, cluster.AddOrUpdateContainer(containerID, testRequest))
}
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector, testTargetRef)
assert.Equal(t, tc.expectedMatch, cluster.Vpas[vpa.ID].PodCount)
assert.Equal(t, tc.expectedMatch, cluster.Vpas[vpa.ID].PodCount())
})
}
}
35 changes: 25 additions & 10 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,8 @@ type Vpa struct {
APIVersion string
// TargetRef points to the controller managing the set of pods.
TargetRef *autoscaling.CrossVersionObjectReference
// PodCount contains number of live Pods matching a given VPA object.
PodCount int
// ContainersPerPod contains the "high water mark" of the number of containers
// per pod to average the recommendation across. Used to make sure we aren't
// "fractionalizing" minResources erroneously during a redeploy when when a pod's
// container is removed or renamed
ContainersPerPod int
// ManagedPods contains the list of live Pods matching a given VPA object.
ManagedPods map[PodID]*PodState
}

// NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the
Expand All @@ -133,8 +128,8 @@ func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa {
// client conversion, this needs to be done based on the resource content.
// The K8s client will not return the resource apiVersion as it's converted
// to the version requested by the client server side.
APIVersion: vpa_types.SchemeGroupVersion.Version,
PodCount: 0,
APIVersion: vpa_types.SchemeGroupVersion.Version,
ManagedPods: make(map[PodID]*PodState),
}
return vpa
}
Expand Down Expand Up @@ -170,7 +165,7 @@ func (vpa *Vpa) UpdateRecommendation(recommendation *vpa_types.RecommendedPodRes
for _, containerRecommendation := range recommendation.ContainerRecommendations {
for container, state := range vpa.aggregateContainerStates {
if container.ContainerName() == containerRecommendation.ContainerName {
metrics_quality.ObserveRecommendationChange(state.LastRecommendation, containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount)
metrics_quality.ObserveRecommendationChange(state.LastRecommendation, containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount())
state.LastRecommendation = containerRecommendation.UncappedTarget
}
}
Expand Down Expand Up @@ -302,3 +297,23 @@ func (vpa *Vpa) HasMatchedPods() bool {
}
return true
}

// PodCount returns the number of pods managed by this VPA, which is just
// the length of its ManagedPods map
func (vpa *Vpa) PodCount() int {
return len(vpa.ManagedPods)
}

// MaxContainersPerPod calculates the maxium number of containers across
// all pods managed by this VPA. Used when splitting resources across containers
// to prevent resources being erroneously spread too thin during rollouts when
// the pod's container list is changing.
func (vpa *Vpa) MaxContainersPerPod() int {
maxContainers := 0
for _, pod := range vpa.ManagedPods {
if len(pod.Containers) > maxContainers {
maxContainers = len(pod.Containers)
}
}
return maxContainers
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *recommender) UpdateVPAs() {
if !found {
continue
}
resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa), vpa.ContainersPerPod)
resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa), vpa.MaxContainersPerPod())
had := vpa.HasRecommendation()

listOfResourceRecommendation := logic.MapToListOfRecommendedContainerResources(resources)
Expand All @@ -104,18 +104,18 @@ func (r *recommender) UpdateVPAs() {
if vpa.HasRecommendation() && !had {
metrics_recommender.ObserveRecommendationLatency(vpa.Created)
}
hasMatchingPods := vpa.PodCount > 0
hasMatchingPods := vpa.PodCount() > 0
vpa.UpdateConditions(hasMatchingPods)
if err := r.clusterState.RecordRecommendation(vpa, time.Now()); err != nil {
klog.Warningf("%v", err)
if klog.V(4).Enabled() {
klog.Infof("VPA dump")
klog.Infof("%+v", vpa)
klog.Infof("HasMatchingPods: %v", hasMatchingPods)
klog.Infof("PodCount: %v", vpa.PodCount)
klog.Infof("PodCount: %v", vpa.PodCount())
pods := r.clusterState.GetMatchingPods(vpa)
klog.Infof("MatchingPods: %+v", pods)
if len(pods) != vpa.PodCount {
if len(pods) != vpa.PodCount() {
klog.Errorf("ClusterState pod count and matching pods disagree for vpa %v/%v", vpa.ID.Namespace, vpa.ID.VpaName)
}
}
Expand Down

0 comments on commit 6ddc208

Please sign in to comment.