Skip to content

Commit

Permalink
chore: Keep Track of v1alpha5.Machine in Cluster State (#170)
Browse files Browse the repository at this point in the history
* Add handling for machine reconciliation in state

* Add Node() and Annotations() helpers

* Add testing for machine tracking

* Create helper functions for node lists

* Add cluster state syncing
  • Loading branch information
jonathan-innis authored Jan 25, 2023
1 parent a2d4c9b commit d04b91e
Show file tree
Hide file tree
Showing 16 changed files with 1,027 additions and 166 deletions.
63 changes: 11 additions & 52 deletions pkg/controllers/counter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package counter

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/controllers/state"
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/utils/functional"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -61,14 +61,6 @@ func (c *Controller) Name() string {
// Reconcile a control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner) (reconcile.Result, error) {
stored := provisioner.DeepCopy()
nodes := v1.NodeList{}
if err := c.kubeClient.List(ctx, &nodes, client.MatchingLabels{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}); err != nil {
return reconcile.Result{}, err
}
// Nodes aren't synced yet, so return an error which will cause retry with backoff.
if !c.nodesSynced(nodes.Items, provisioner.Name) {
return reconcile.Result{RequeueAfter: 250 * time.Millisecond}, nil
}
// Determine resource usage and update provisioner.status.resources
provisioner.Status.Resources = c.resourceCountsFor(provisioner.Name)
if !equality.Semantic.DeepEqual(stored, provisioner) {
Expand All @@ -80,26 +72,22 @@ func (c *Controller) Reconcile(ctx context.Context, provisioner *v1alpha5.Provis
}

func (c *Controller) resourceCountsFor(provisionerName string) v1.ResourceList {
var provisioned []v1.ResourceList
var res v1.ResourceList
// Record all resources provisioned by the provisioners, we look at the cluster state nodes as their capacity
// is accurately reported even for nodes that haven't fully started yet. This allows us to update our provisioner
// status immediately upon node creation instead of waiting for the node to become ready.
c.cluster.ForEachNode(func(n *state.Node) bool {
if n.Node.Labels[v1alpha5.ProvisionerNameLabelKey] == provisionerName {
provisioned = append(provisioned, n.Capacity())
// Don't count nodes that we are planning to delete. This is to ensure that we are consistent throughout
// our provisioning and deprovisioning loops
if n.MarkedForDeletion() {
return true
}
if n.Labels()[v1alpha5.ProvisionerNameLabelKey] == provisionerName {
res = resources.Merge(res, n.Capacity())
}
return true
})

result := v1.ResourceList{}
// only report the non-zero resources
for key, value := range resources.Merge(provisioned...) {
if value.IsZero() {
continue
}
result[key] = value
}
return result
return functional.FilterMap(res, func(_ v1.ResourceName, v resource.Quantity) bool { return !v.IsZero() })
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder {
Expand All @@ -117,32 +105,3 @@ func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontrolle
).
WithOptions(controller.Options{MaxConcurrentReconciles: 10}))
}

// nodesSynced returns true if the cluster state is synced with the current list cache state with respect to the nodes
// created by the specified provisioner. Since updates may occur for the counting controller at a different time than
// the cluster state controller, we don't update the counter state until the states are synced. An alternative solution
// would be to add event support to cluster state and listen for those node events instead.
func (c *Controller) nodesSynced(nodes []v1.Node, provisionerName string) bool {
extraNodes := sets.String{}
for _, n := range nodes {
extraNodes.Insert(n.Name)
}
missingNode := false
c.cluster.ForEachNode(func(n *state.Node) bool {
// skip any nodes not created by this provisioner
if n.Node.Labels[v1alpha5.ProvisionerNameLabelKey] != provisionerName {
return true
}
if !extraNodes.Has(n.Node.Name) {
missingNode = true
return false
}
extraNodes.Delete(n.Node.Name)
return true
})

if !missingNode && len(extraNodes) == 0 {
return true
}
return false
}
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []Can

// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (c *consolidation) ShouldDeprovision(ctx context.Context, n *state.Node, provisioner *v1alpha5.Provisioner, _ []*v1.Pod) bool {
if val, ok := n.Node.Annotations[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
if val, ok := n.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
c.reporter.RecordUnconsolidatableReason(ctx, n.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))
return val != "true"
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (d *Drift) ShouldDeprovision(ctx context.Context, n *state.Node, provisione
if !settings.FromContext(ctx).DriftEnabled {
return false
}
return n.Node.Annotations[v1alpha5.VoluntaryDisruptionAnnotationKey] == v1alpha5.VoluntaryDisruptionDriftedAnnotationValue
return n.Annotations()[v1alpha5.VoluntaryDisruptionAnnotationKey] == v1alpha5.VoluntaryDisruptionDriftedAnnotationValue
}

// ComputeCommand generates a deprovisioning command given deprovisionable nodes
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (e *Emptiness) ShouldDeprovision(ctx context.Context, n *state.Node, provis
return false
}

emptinessTimestamp, hasEmptinessTimestamp := n.Node.Annotations[v1alpha5.EmptinessTimestampAnnotationKey]
emptinessTimestamp, hasEmptinessTimestamp := n.Annotations()[v1alpha5.EmptinessTimestampAnnotationKey]
if !hasEmptinessTimestamp {
return false
}
Expand Down
53 changes: 21 additions & 32 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
pscheduling "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/scheduling"
nodeutils "github.com/aws/karpenter-core/pkg/utils/node"
"github.com/aws/karpenter-core/pkg/utils/pod"

v1 "k8s.io/api/core/v1"
Expand All @@ -40,45 +39,36 @@ import (

//nolint:gocyclo
func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
nodesToDelete ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) {
var stateNodes []*state.Node
var markedForDeletionNodes []*state.Node
candidateNodeIsDeleting := false
candidateNodeNames := sets.NewString(lo.Map(nodesToDelete, func(t CandidateNode, i int) string { return t.Name })...)
cluster.ForEachNode(func(n *state.Node) bool {
// not a candidate node
if _, ok := candidateNodeNames[n.Node.Name]; !ok {
if !n.MarkedForDeletion() {
stateNodes = append(stateNodes, n.DeepCopy())
} else {
markedForDeletionNodes = append(markedForDeletionNodes, n.DeepCopy())
}
} else if n.MarkedForDeletion() {
// candidate node and marked for deletion
candidateNodeIsDeleting = true
}
return true
candidateNodes ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) {

candidateNodeNames := sets.NewString(lo.Map(candidateNodes, func(t CandidateNode, i int) string { return t.Name })...)
allNodes := cluster.Nodes()
deletingNodes := allNodes.DeletingNodes()
stateNodes := lo.Filter(allNodes, func(n *state.Node, _ int) bool {
return !candidateNodeNames.Has(n.Name())
})

// We do one final check to ensure that the node that we are attempting to consolidate isn't
// already handled for deletion by some other controller. This could happen if the node was markedForDeletion
// between returning the candidateNodes and getting the stateNodes above
if candidateNodeIsDeleting {
if _, ok := lo.Find(deletingNodes, func(n *state.Node) bool {
return candidateNodeNames.Has(n.Name())
}); ok {
return nil, false, errCandidateNodeDeleting
}

// We get the pods that are on nodes that are deleting
deletingNodePods, err := nodeutils.GetNodePods(ctx, kubeClient, lo.Map(markedForDeletionNodes, func(n *state.Node, _ int) *v1.Node { return n.Node })...)
deletingNodePods, err := deletingNodes.Pods(ctx, kubeClient)
if err != nil {
return nil, false, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
}

// start by getting all pending pods
pods, err := provisioner.GetPendingPods(ctx)
if err != nil {
return nil, false, fmt.Errorf("determining pending pods, %w", err)
}

for _, n := range nodesToDelete {
for _, n := range candidateNodes {
pods = append(pods, n.pods...)
}
pods = append(pods, deletingNodePods...)
Expand Down Expand Up @@ -107,7 +97,7 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
// to schedule since we want to assume that we can delete a node and its pods will immediately
// move to an existing node which won't occur if that node isn't ready.
for _, n := range ifn {
if n.Node.Labels[v1alpha5.LabelNodeInitialized] != "true" {
if !n.Initialized() {
return nil, false, nil
}
}
Expand Down Expand Up @@ -178,7 +168,7 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie
cluster.ForEachNode(func(n *state.Node) bool {
var provisioner *v1alpha5.Provisioner
var instanceTypeMap map[string]*cloudprovider.InstanceType
if provName, ok := n.Node.Labels[v1alpha5.ProvisionerNameLabelKey]; ok {
if provName, ok := n.Labels()[v1alpha5.ProvisionerNameLabelKey]; ok {
provisioner = provisioners[provName]
instanceTypeMap = instanceTypesByProvisioner[provName]
}
Expand All @@ -191,23 +181,24 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie
return true
}

instanceType, ok := instanceTypeMap[n.Node.Labels[v1.LabelInstanceTypeStable]]
instanceType, ok := instanceTypeMap[n.Labels()[v1.LabelInstanceTypeStable]]
// skip any nodes that we can't determine the instance of
if !ok {
return true
}

// skip any nodes that we can't determine the capacity type or the topology zone for
ct, ok := n.Node.Labels[v1alpha5.LabelCapacityType]
ct, ok := n.Labels()[v1alpha5.LabelCapacityType]
if !ok {
return true
}
az, ok := n.Node.Labels[v1.LabelTopologyZone]
az, ok := n.Labels()[v1.LabelTopologyZone]
if !ok {
return true
}

// Skip nodes that aren't initialized
// This also means that the real Node doesn't exist for it
if !n.Initialized() {
return true
}
Expand All @@ -216,18 +207,16 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie
return true
}

pods, err := nodeutils.GetNodePods(ctx, kubeClient, n.Node)
pods, err := n.Pods(ctx, kubeClient)
if err != nil {
logging.FromContext(ctx).Errorf("Determining node pods, %s", err)
return true
}

if !shouldDeprovision(ctx, n, provisioner, pods) {
return true
}

cn := CandidateNode{
Node: n.Node,
Node: n.Node.DeepCopy(),
instanceType: instanceType,
capacityType: ct,
zone: az,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {

// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (v *Validation) ShouldDeprovision(_ context.Context, n *state.Node, provisioner *v1alpha5.Provisioner, _ []*v1.Pod) bool {
if val, ok := n.Node.Annotations[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
if val, ok := n.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
return val != "true"
}
return provisioner != nil && provisioner.Spec.Consolidation != nil && ptr.BoolValue(provisioner.Spec.Consolidation.Enabled)
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/metrics/state/scraper/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,16 @@ func (ns *NodeScraper) Scrape(_ context.Context) {

// Populate metrics
ns.cluster.ForEachNode(func(n *state.Node) bool {
if n.Node == nil {
return true
}
for gaugeVec, resourceList := range map[*prometheus.GaugeVec]v1.ResourceList{
overheadGaugeVec: ns.getSystemOverhead(n.Node),
podRequestsGaugeVec: resources.Subtract(n.PodRequests(), n.DaemonSetRequests()),
podLimitsGaugeVec: resources.Subtract(n.PodLimits(), n.DaemonSetLimits()),
daemonRequestsGaugeVec: n.DaemonSetRequests(),
daemonLimitsGaugeVec: n.DaemonSetLimits(),
allocatableGaugeVec: n.Allocatable(),
allocatableGaugeVec: n.Node.Status.Allocatable,
} {
for _, labels := range ns.set(gaugeVec, n.Node, resourceList) {
key := labelsToString(labels)
Expand Down
22 changes: 7 additions & 15 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/utils/node"
"github.com/aws/karpenter-core/pkg/utils/pod"
)

Expand Down Expand Up @@ -114,19 +113,12 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul
// prevents over-provisioning at the cost of potentially under-provisioning which will self-heal during the next
// scheduling loop when we launch a new node. When this order is reversed, our node capacity may be reduced by pods
// that have bound which we then provision new un-needed capacity for.
var stateNodes []*state.Node
var markedForDeletionNodes []*state.Node
p.cluster.ForEachNode(func(node *state.Node) bool {
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
if !node.MarkedForDeletion() {
stateNodes = append(stateNodes, node.DeepCopy())
} else {
markedForDeletionNodes = append(markedForDeletionNodes, node.DeepCopy())
}
return true
})
// -------
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
allNodes := p.cluster.Nodes()
stateNodes := allNodes.ActiveNodes()

// Get pods, exit if nothing to do
pendingPods, err := p.GetPendingPods(ctx)
Expand All @@ -137,7 +129,7 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul
// We do this after getting the pending pods so that we undershoot if pods are
// actively migrating from a node that is being deleted
// NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them
deletingNodePods, err := node.GetNodePods(ctx, p.kubeClient, lo.Map(markedForDeletionNodes, func(n *state.Node, _ int) *v1.Node { return n.Node })...)
deletingNodePods, err := allNodes.DeletingNodes().Pods(ctx, p.kubeClient)
if err != nil {
return reconcile.Result{}, err
}
Expand Down
Loading

0 comments on commit d04b91e

Please sign in to comment.