From 430b00e0615f32c73a49a91b273e573a941239aa Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Thu, 19 Jan 2023 15:24:45 -0800 Subject: [PATCH] Create helper functions for node lists --- pkg/controllers/deprovisioning/helpers.go | 42 ++++++---------- pkg/controllers/metrics/state/scraper/node.go | 15 +++--- pkg/controllers/provisioning/provisioner.go | 27 +++------- .../provisioning/scheduling/existingnode.go | 50 +++++++------------ .../provisioning/scheduling/scheduler.go | 10 ++-- pkg/controllers/state/cluster.go | 15 ++++-- pkg/controllers/state/node.go | 41 ++++++++++++++- pkg/controllers/state/suite_test.go | 29 ++++++++--- 8 files changed, 128 insertions(+), 101 deletions(-) diff --git a/pkg/controllers/deprovisioning/helpers.go b/pkg/controllers/deprovisioning/helpers.go index a6d435bba2..6c604b622b 100644 --- a/pkg/controllers/deprovisioning/helpers.go +++ b/pkg/controllers/deprovisioning/helpers.go @@ -28,7 +28,6 @@ import ( pscheduling "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling" "github.com/aws/karpenter-core/pkg/controllers/state" "github.com/aws/karpenter-core/pkg/scheduling" - nodeutils "github.com/aws/karpenter-core/pkg/utils/node" "github.com/aws/karpenter-core/pkg/utils/pod" v1 "k8s.io/api/core/v1" @@ -40,45 +39,35 @@ import ( //nolint:gocyclo func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, - nodesToDelete ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) { - var stateNodes []*state.Node - var markedForDeletionNodes []*state.Node - candidateNodeIsDeleting := false - candidateNodeNames := sets.NewString(lo.Map(nodesToDelete, func(t CandidateNode, i int) string { return t.Name })...) - cluster.ForEachNode(func(n *state.Node) bool { - // not a candidate node - if _, ok := candidateNodeNames[n.Node.Name]; !ok { - if !n.MarkedForDeletion() { - stateNodes = append(stateNodes, n.DeepCopy()) - } else { - markedForDeletionNodes = append(markedForDeletionNodes, n.DeepCopy()) - } - } else if n.MarkedForDeletion() { - // candidate node and marked for deletion - candidateNodeIsDeleting = true - } - return true + candidateNodes ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) { + + candidateNodeNames := sets.NewString(lo.Map(candidateNodes, func(t CandidateNode, i int) string { return t.Name })...) + allNodes := cluster.Nodes() + stateNodes := lo.Filter(allNodes, func(n *state.Node, _ int) bool { + return !candidateNodeNames.Has(n.Name()) }) + // We do one final check to ensure that the node that we are attempting to consolidate isn't // already handled for deletion by some other controller. This could happen if the node was markedForDeletion // between returning the candidateNodes and getting the stateNodes above - if candidateNodeIsDeleting { + if _, ok := lo.Find(allNodes.DeletingNodes(), func(n *state.Node) bool { + return candidateNodeNames.Has(n.Name()) + }); ok { return nil, false, errCandidateNodeDeleting } // We get the pods that are on nodes that are deleting - deletingNodePods, err := nodeutils.GetNodePods(ctx, kubeClient, lo.Map(markedForDeletionNodes, func(n *state.Node, _ int) *v1.Node { return n.Node })...) + deletingNodePods, err := allNodes.DeletingNodes().Pods(ctx, kubeClient) if err != nil { return nil, false, fmt.Errorf("failed to get pods from deleting nodes, %w", err) } - // start by getting all pending pods pods, err := provisioner.GetPendingPods(ctx) if err != nil { return nil, false, fmt.Errorf("determining pending pods, %w", err) } - for _, n := range nodesToDelete { + for _, n := range candidateNodes { pods = append(pods, n.pods...) } pods = append(pods, deletingNodePods...) @@ -107,7 +96,7 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster * // to schedule since we want to assume that we can delete a node and its pods will immediately // move to an existing node which won't occur if that node isn't ready. for _, n := range ifn { - if n.Labels[v1alpha5.LabelNodeInitialized] != "true" { + if !n.Initialized() { return nil, false, nil } } @@ -208,6 +197,7 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie } // Skip nodes that aren't initialized + // This also means that the real Node doesn't exist for it if !n.Initialized() { return true } @@ -216,16 +206,14 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie return true } - pods, err := nodeutils.GetNodePods(ctx, kubeClient, n.Node) + pods, err := n.Pods(ctx, kubeClient) if err != nil { logging.FromContext(ctx).Errorf("Determining node pods, %s", err) return true } - if !shouldDeprovision(ctx, n, provisioner, pods) { return true } - cn := CandidateNode{ Node: n.Node, instanceType: instanceType, diff --git a/pkg/controllers/metrics/state/scraper/node.go b/pkg/controllers/metrics/state/scraper/node.go index 270fb02849..17a5581256 100644 --- a/pkg/controllers/metrics/state/scraper/node.go +++ b/pkg/controllers/metrics/state/scraper/node.go @@ -157,13 +157,16 @@ func (ns *NodeScraper) Scrape(_ context.Context) { // Populate metrics ns.cluster.ForEachNode(func(n *state.Node) bool { + if n.Node == nil { + return true + } for gaugeVec, resourceList := range map[*prometheus.GaugeVec]v1.ResourceList{ - overheadGaugeVec: ns.getSystemOverhead(n), + overheadGaugeVec: ns.getSystemOverhead(n.Node), podRequestsGaugeVec: resources.Subtract(n.PodRequests(), n.DaemonSetRequests()), podLimitsGaugeVec: resources.Subtract(n.PodLimits(), n.DaemonSetLimits()), daemonRequestsGaugeVec: n.DaemonSetRequests(), daemonLimitsGaugeVec: n.DaemonSetLimits(), - allocatableGaugeVec: n.Allocatable(), + allocatableGaugeVec: n.Node.Status.Allocatable, } { for _, labels := range ns.set(gaugeVec, n.Node, resourceList) { key := labelsToString(labels) @@ -199,12 +202,12 @@ func (ns *NodeScraper) set(gaugeVec *prometheus.GaugeVec, node *v1.Node, resourc return gaugeLabels } -func (ns *NodeScraper) getSystemOverhead(n *state.Node) v1.ResourceList { +func (ns *NodeScraper) getSystemOverhead(node *v1.Node) v1.ResourceList { systemOverhead := v1.ResourceList{} - if len(n.Allocatable()) > 0 { + if len(node.Status.Allocatable) > 0 { // calculating system daemons overhead - for resourceName, quantity := range n.Allocatable() { - overhead := n.Capacity()[resourceName] + for resourceName, quantity := range node.Status.Allocatable { + overhead := node.Status.Capacity[resourceName] overhead.Sub(quantity) systemOverhead[resourceName] = overhead } diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 55e0a29f5a..2e0163675f 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -48,7 +48,6 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/state" "github.com/aws/karpenter-core/pkg/events" "github.com/aws/karpenter-core/pkg/metrics" - "github.com/aws/karpenter-core/pkg/utils/node" "github.com/aws/karpenter-core/pkg/utils/pod" ) @@ -114,19 +113,12 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul // prevents over-provisioning at the cost of potentially under-provisioning which will self-heal during the next // scheduling loop when we launch a new node. When this order is reversed, our node capacity may be reduced by pods // that have bound which we then provision new un-needed capacity for. - var stateNodes []*state.Node - var markedForDeletionNodes []*state.Node - p.cluster.ForEachNode(func(node *state.Node) bool { - // We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered - // as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for - // the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered. - if !node.MarkedForDeletion() { - stateNodes = append(stateNodes, node.DeepCopy()) - } else { - markedForDeletionNodes = append(markedForDeletionNodes, node.DeepCopy()) - } - return true - }) + // ------- + // We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered + // as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for + // the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered. + allNodes := p.cluster.Nodes() + stateNodes := allNodes.ActiveNodes() // Get pods, exit if nothing to do pendingPods, err := p.GetPendingPods(ctx) @@ -137,12 +129,7 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul // We do this after getting the pending pods so that we undershoot if pods are // actively migrating from a node that is being deleted // NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them - deletingNodePods, err := node.GetNodePods(ctx, p.kubeClient, - lo.Map(lo.Reject(markedForDeletionNodes, func(n *state.Node, _ int) bool { - return n.Node == nil - }), func(n *state.Node, _ int) *v1.Node { - return n.Node - })...) + deletingNodePods, err := allNodes.DeletingNodes().Pods(ctx, p.kubeClient) if err != nil { return reconcile.Result{}, err } diff --git a/pkg/controllers/provisioning/scheduling/existingnode.go b/pkg/controllers/provisioning/scheduling/existingnode.go index e72a51de5c..800e3f2f47 100644 --- a/pkg/controllers/provisioning/scheduling/existingnode.go +++ b/pkg/controllers/provisioning/scheduling/existingnode.go @@ -26,18 +26,12 @@ import ( ) type ExistingNode struct { - *v1.Node - name string - initialized bool - Pods []*v1.Pod - requests v1.ResourceList - topology *Topology - requirements scheduling.Requirements - available v1.ResourceList - taints []v1.Taint - hostPortUsage *scheduling.HostPortUsage - volumeUsage *scheduling.VolumeUsage - volumeLimits scheduling.VolumeCount + *state.Node + + Pods []*v1.Pod + topology *Topology + requests v1.ResourceList + requirements scheduling.Requirements } func NewExistingNode(n *state.Node, topology *Topology, daemonResources v1.ResourceList) *ExistingNode { @@ -54,17 +48,11 @@ func NewExistingNode(n *state.Node, topology *Topology, daemonResources v1.Resou } } node := &ExistingNode{ - Node: n.Node, // The real node that - name: n.Name(), - initialized: n.Initialized(), - available: n.Available(), - taints: n.Taints(), - topology: topology, - requests: remainingDaemonResources, - requirements: scheduling.NewLabelRequirements(n.Labels()), - hostPortUsage: n.HostPortUsage(), - volumeUsage: n.VolumeUsage(), - volumeLimits: n.VolumeLimits(), + Node: n, + + topology: topology, + requests: remainingDaemonResources, + requirements: scheduling.NewLabelRequirements(n.Labels()), } node.requirements.Add(scheduling.NewRequirement(v1.LabelHostname, v1.NodeSelectorOpIn, n.HostName())) topology.Register(v1.LabelHostname, n.HostName()) @@ -73,20 +61,20 @@ func NewExistingNode(n *state.Node, topology *Topology, daemonResources v1.Resou func (n *ExistingNode) Add(ctx context.Context, pod *v1.Pod) error { // Check Taints - if err := scheduling.Taints(n.taints).Tolerates(pod); err != nil { + if err := scheduling.Taints(n.Taints()).Tolerates(pod); err != nil { return err } - if err := n.hostPortUsage.Validate(pod); err != nil { + if err := n.HostPortUsage().Validate(pod); err != nil { return err } // determine the number of volumes that will be mounted if the pod schedules - mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod) + mountedVolumeCount, err := n.VolumeUsage().Validate(ctx, pod) if err != nil { return err } - if mountedVolumeCount.Exceeds(n.volumeLimits) { + if mountedVolumeCount.Exceeds(n.VolumeLimits()) { return fmt.Errorf("would exceed node volume limits") } @@ -94,14 +82,14 @@ func (n *ExistingNode) Add(ctx context.Context, pod *v1.Pod) error { // node, which at this point can't be increased in size requests := resources.Merge(n.requests, resources.RequestsForPods(pod)) - if !resources.Fits(requests, n.available) { + if !resources.Fits(requests, n.Available()) { return fmt.Errorf("exceeds node resources") } nodeRequirements := scheduling.NewRequirements(n.requirements.Values()...) podRequirements := scheduling.NewPodRequirements(pod) // Check Node Affinity Requirements - if err := nodeRequirements.Compatible(podRequirements); err != nil { + if err = nodeRequirements.Compatible(podRequirements); err != nil { return err } nodeRequirements.Add(podRequirements.Values()...) @@ -121,7 +109,7 @@ func (n *ExistingNode) Add(ctx context.Context, pod *v1.Pod) error { n.requests = requests n.requirements = nodeRequirements n.topology.Record(pod, nodeRequirements) - n.hostPortUsage.Add(ctx, pod) - n.volumeUsage.Add(ctx, pod) + n.HostPortUsage().Add(ctx, pod) + n.VolumeUsage().Add(ctx, pod) return nil } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index c980205a94..46413fb6f0 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -141,12 +141,12 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, for _, node := range s.existingNodes { if len(node.Pods) > 0 { - s.cluster.NominateNodeForPod(ctx, node.Name) + s.cluster.NominateNodeForPod(ctx, node.Name()) } for _, pod := range node.Pods { // If node is inflight, it won't have a real node to represent it - if node.Node != nil { - s.recorder.Publish(events.NominatePod(pod, node.Node)) + if node.Node.Node != nil { + s.recorder.Publish(events.NominatePod(pod, node.Node.Node)) } } } @@ -244,8 +244,8 @@ func (s *Scheduler) calculateExistingMachines(stateNodes []*state.Node, daemonSe // We don't use the status field and instead recompute the remaining resources to ensure we have a consistent view // of the cluster during scheduling. Depending on how node creation falls out, this will also work for cases where // we don't create Node resources. - if _, ok := s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]]; ok { - s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]] = resources.Subtract(s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]], node.Capacity()) + if _, ok := s.remainingResources[node.Labels()[v1alpha5.ProvisionerNameLabelKey]]; ok { + s.remainingResources[node.Labels()[v1alpha5.ProvisionerNameLabelKey]] = resources.Subtract(s.remainingResources[node.Labels()[v1alpha5.ProvisionerNameLabelKey]], node.Capacity()) } } } diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index aea8dcc971..8c5df7fd75 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -81,7 +81,7 @@ func (c *Cluster) ForPodsWithAntiAffinity(fn func(p *v1.Pod, n *v1.Node) bool) { return true } node, ok := c.nodes[c.nameToProviderID[nodeName]] - if !ok { + if !ok || node.Node == nil { // if we receive the node deletion event before the pod deletion event, this can happen return true } @@ -102,6 +102,17 @@ func (c *Cluster) ForEachNode(f func(n *Node) bool) { } } +// Nodes creates a DeepCopy of all state nodes. +// NOTE: This is very inefficient so this should only be used when DeepCopying is absolutely necessary +func (c *Cluster) Nodes() Nodes { + c.mu.RLock() + defer c.mu.RUnlock() + + return lo.Map(lo.Values(c.nodes), func(n *Node, _ int) *Node { + return n.DeepCopy() + }) +} + // IsNodeNominated returns true if the given node was expected to have a pod bound to it during a recent scheduling // batch func (c *Cluster) IsNodeNominated(name string) bool { @@ -254,7 +265,6 @@ func (c *Cluster) newStateFromMachine(machine *v1alpha5.Machine, oldNode *Node) n := &Node{ Node: oldNode.Node, Machine: machine, - ProviderID: machine.Status.ProviderID, hostPortUsage: oldNode.hostPortUsage, volumeUsage: oldNode.volumeUsage, daemonSetRequests: oldNode.daemonSetRequests, @@ -293,7 +303,6 @@ func (c *Cluster) newStateFromNode(ctx context.Context, node *v1.Node, oldNode * n := &Node{ Node: node, Machine: oldNode.Machine, - ProviderID: node.Spec.ProviderID, hostPortUsage: scheduling.NewHostPortUsage(), volumeUsage: scheduling.NewVolumeLimits(c.kubeClient), volumeLimits: scheduling.VolumeCount{}, diff --git a/pkg/controllers/state/node.go b/pkg/controllers/state/node.go index f3a1511a70..11d6c9d757 100644 --- a/pkg/controllers/state/node.go +++ b/pkg/controllers/state/node.go @@ -27,10 +27,41 @@ import ( "github.com/aws/karpenter-core/pkg/apis/config/settings" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/scheduling" + nodeutils "github.com/aws/karpenter-core/pkg/utils/node" podutils "github.com/aws/karpenter-core/pkg/utils/pod" "github.com/aws/karpenter-core/pkg/utils/resources" ) +// Nodes is a typed version of a list of *Node +type Nodes []*Node + +// ActiveNodes filters nodes that are not in a MarkedForDeletion state +func (n Nodes) ActiveNodes() Nodes { + return lo.Filter(n, func(node *Node, _ int) bool { + return !node.MarkedForDeletion() + }) +} + +// DeletingNodes filters nodes that are in a MarkedForDeletion state +func (n Nodes) DeletingNodes() Nodes { + return lo.Filter(n, func(node *Node, _ int) bool { + return node.MarkedForDeletion() + }) +} + +// Pods gets the pods assigned to all Nodes based on the kubernetes api-server bindings +func (n Nodes) Pods(ctx context.Context, c client.Client) ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, node := range n { + p, err := node.Pods(ctx, c) + if err != nil { + return nil, err + } + pods = append(pods, p...) + } + return pods, nil +} + // Node is a cached version of a node in the cluster that maintains state which is expensive to compute every time it's // needed. This currently contains node utilization across all the allocatable resources, but will soon be used to // compute topology information. @@ -39,8 +70,6 @@ type Node struct { Node *v1.Node Machine *v1alpha5.Machine - ProviderID string - inflightAllocatable v1.ResourceList // TODO @joinnis: This can be removed when machine is added inflightCapacity v1.ResourceList // TODO @joinnis: This can be removed when machine is added startupTaints []v1.Taint // TODO: @joinnis: This can be removed when machine is added @@ -83,6 +112,14 @@ func (in *Node) Name() string { return in.Node.Name } +// Pods gets the pods assigned to the Node based on the kubernetes api-server bindings +func (in *Node) Pods(ctx context.Context, c client.Client) ([]*v1.Pod, error) { + if in.Node == nil { + return nil, nil + } + return nodeutils.GetNodePods(ctx, c, in.Node) +} + func (in *Node) HostName() string { if in.Labels()[v1.LabelHostname] == "" { return in.Name() diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 536d877284..c26714cfb7 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -215,6 +215,9 @@ var _ = Describe("Inflight Nodes", func() { Name: "default", }, }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.ProviderID(), + }, }) ExpectApplied(ctx, env.Client, machine) ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) @@ -474,26 +477,38 @@ var _ = Describe("Inflight Nodes", func() { }, ExpectStateNodeExistsForMachine(machine).Allocatable()) }) It("should continue node nomination when an inflight node becomes a real node", func() { - machine := test.Machine() + machine := test.Machine(v1alpha5.Machine{ + Status: v1alpha5.MachineStatus{ + ProviderID: test.ProviderID(), + }, + }) ExpectApplied(ctx, env.Client, machine) ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) cluster.NominateNodeForPod(ctx, machine.Name) Expect(ExpectStateNodeExistsForMachine(machine).Nominated()).To(BeTrue()) - node := test.Node() + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + }) node.Spec.ProviderID = machine.Status.ProviderID ExpectApplied(ctx, env.Client, node) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) Expect(ExpectStateNodeExists(node).Nominated()).To(BeTrue()) }) It("should continue MarkedForDeletion when an inflight node becomes a real node", func() { - machine := test.Machine() + machine := test.Machine(v1alpha5.Machine{ + Status: v1alpha5.MachineStatus{ + ProviderID: test.ProviderID(), + }, + }) ExpectApplied(ctx, env.Client, machine) ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) cluster.MarkForDeletion(machine.Name) Expect(ExpectStateNodeExistsForMachine(machine).MarkedForDeletion()).To(BeTrue()) - node := test.Node() + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + }) node.Spec.ProviderID = machine.Status.ProviderID ExpectApplied(ctx, env.Client, node) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) @@ -1201,7 +1216,7 @@ func ExpectStateNodeCount(comparator string, count int) int { func ExpectStateNodeExistsWithOffset(offset int, node *v1.Node) *state.Node { var ret *state.Node cluster.ForEachNode(func(n *state.Node) bool { - if n.ProviderID != node.Spec.ProviderID { + if n.Node.Name != node.Name { return true } ret = n.DeepCopy() @@ -1218,7 +1233,7 @@ func ExpectStateNodeExists(node *v1.Node) *state.Node { func ExpectStateNodeExistsForMachine(machine *v1alpha5.Machine) *state.Node { var ret *state.Node cluster.ForEachNode(func(n *state.Node) bool { - if n.ProviderID != machine.Status.ProviderID { + if n.Machine.Name != machine.Name { return true } ret = n.DeepCopy() @@ -1231,7 +1246,7 @@ func ExpectStateNodeExistsForMachine(machine *v1alpha5.Machine) *state.Node { func ExpectStateNodeNotFoundForMachine(machine *v1alpha5.Machine) *state.Node { var ret *state.Node cluster.ForEachNode(func(n *state.Node) bool { - if n.ProviderID != machine.Status.ProviderID { + if n.Machine.Name != machine.Name { return true } ret = n.DeepCopy()