Skip to content

Commit

Permalink
Create helper functions for node lists
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Jan 20, 2023
1 parent bc11036 commit 430b00e
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 101 deletions.
42 changes: 15 additions & 27 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
pscheduling "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/scheduling"
nodeutils "github.com/aws/karpenter-core/pkg/utils/node"
"github.com/aws/karpenter-core/pkg/utils/pod"

v1 "k8s.io/api/core/v1"
Expand All @@ -40,45 +39,35 @@ import (

//nolint:gocyclo
func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
nodesToDelete ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) {
var stateNodes []*state.Node
var markedForDeletionNodes []*state.Node
candidateNodeIsDeleting := false
candidateNodeNames := sets.NewString(lo.Map(nodesToDelete, func(t CandidateNode, i int) string { return t.Name })...)
cluster.ForEachNode(func(n *state.Node) bool {
// not a candidate node
if _, ok := candidateNodeNames[n.Node.Name]; !ok {
if !n.MarkedForDeletion() {
stateNodes = append(stateNodes, n.DeepCopy())
} else {
markedForDeletionNodes = append(markedForDeletionNodes, n.DeepCopy())
}
} else if n.MarkedForDeletion() {
// candidate node and marked for deletion
candidateNodeIsDeleting = true
}
return true
candidateNodes ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) {

candidateNodeNames := sets.NewString(lo.Map(candidateNodes, func(t CandidateNode, i int) string { return t.Name })...)
allNodes := cluster.Nodes()
stateNodes := lo.Filter(allNodes, func(n *state.Node, _ int) bool {
return !candidateNodeNames.Has(n.Name())
})

// We do one final check to ensure that the node that we are attempting to consolidate isn't
// already handled for deletion by some other controller. This could happen if the node was markedForDeletion
// between returning the candidateNodes and getting the stateNodes above
if candidateNodeIsDeleting {
if _, ok := lo.Find(allNodes.DeletingNodes(), func(n *state.Node) bool {
return candidateNodeNames.Has(n.Name())
}); ok {
return nil, false, errCandidateNodeDeleting
}

// We get the pods that are on nodes that are deleting
deletingNodePods, err := nodeutils.GetNodePods(ctx, kubeClient, lo.Map(markedForDeletionNodes, func(n *state.Node, _ int) *v1.Node { return n.Node })...)
deletingNodePods, err := allNodes.DeletingNodes().Pods(ctx, kubeClient)
if err != nil {
return nil, false, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
}

// start by getting all pending pods
pods, err := provisioner.GetPendingPods(ctx)
if err != nil {
return nil, false, fmt.Errorf("determining pending pods, %w", err)
}

for _, n := range nodesToDelete {
for _, n := range candidateNodes {
pods = append(pods, n.pods...)
}
pods = append(pods, deletingNodePods...)
Expand Down Expand Up @@ -107,7 +96,7 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
// to schedule since we want to assume that we can delete a node and its pods will immediately
// move to an existing node which won't occur if that node isn't ready.
for _, n := range ifn {
if n.Labels[v1alpha5.LabelNodeInitialized] != "true" {
if !n.Initialized() {
return nil, false, nil
}
}
Expand Down Expand Up @@ -208,6 +197,7 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie
}

// Skip nodes that aren't initialized
// This also means that the real Node doesn't exist for it
if !n.Initialized() {
return true
}
Expand All @@ -216,16 +206,14 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie
return true
}

pods, err := nodeutils.GetNodePods(ctx, kubeClient, n.Node)
pods, err := n.Pods(ctx, kubeClient)
if err != nil {
logging.FromContext(ctx).Errorf("Determining node pods, %s", err)
return true
}

if !shouldDeprovision(ctx, n, provisioner, pods) {
return true
}

cn := CandidateNode{
Node: n.Node,
instanceType: instanceType,
Expand Down
15 changes: 9 additions & 6 deletions pkg/controllers/metrics/state/scraper/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,16 @@ func (ns *NodeScraper) Scrape(_ context.Context) {

// Populate metrics
ns.cluster.ForEachNode(func(n *state.Node) bool {
if n.Node == nil {
return true
}
for gaugeVec, resourceList := range map[*prometheus.GaugeVec]v1.ResourceList{
overheadGaugeVec: ns.getSystemOverhead(n),
overheadGaugeVec: ns.getSystemOverhead(n.Node),
podRequestsGaugeVec: resources.Subtract(n.PodRequests(), n.DaemonSetRequests()),
podLimitsGaugeVec: resources.Subtract(n.PodLimits(), n.DaemonSetLimits()),
daemonRequestsGaugeVec: n.DaemonSetRequests(),
daemonLimitsGaugeVec: n.DaemonSetLimits(),
allocatableGaugeVec: n.Allocatable(),
allocatableGaugeVec: n.Node.Status.Allocatable,
} {
for _, labels := range ns.set(gaugeVec, n.Node, resourceList) {
key := labelsToString(labels)
Expand Down Expand Up @@ -199,12 +202,12 @@ func (ns *NodeScraper) set(gaugeVec *prometheus.GaugeVec, node *v1.Node, resourc
return gaugeLabels
}

func (ns *NodeScraper) getSystemOverhead(n *state.Node) v1.ResourceList {
func (ns *NodeScraper) getSystemOverhead(node *v1.Node) v1.ResourceList {
systemOverhead := v1.ResourceList{}
if len(n.Allocatable()) > 0 {
if len(node.Status.Allocatable) > 0 {
// calculating system daemons overhead
for resourceName, quantity := range n.Allocatable() {
overhead := n.Capacity()[resourceName]
for resourceName, quantity := range node.Status.Allocatable {
overhead := node.Status.Capacity[resourceName]
overhead.Sub(quantity)
systemOverhead[resourceName] = overhead
}
Expand Down
27 changes: 7 additions & 20 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/utils/node"
"github.com/aws/karpenter-core/pkg/utils/pod"
)

Expand Down Expand Up @@ -114,19 +113,12 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul
// prevents over-provisioning at the cost of potentially under-provisioning which will self-heal during the next
// scheduling loop when we launch a new node. When this order is reversed, our node capacity may be reduced by pods
// that have bound which we then provision new un-needed capacity for.
var stateNodes []*state.Node
var markedForDeletionNodes []*state.Node
p.cluster.ForEachNode(func(node *state.Node) bool {
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
if !node.MarkedForDeletion() {
stateNodes = append(stateNodes, node.DeepCopy())
} else {
markedForDeletionNodes = append(markedForDeletionNodes, node.DeepCopy())
}
return true
})
// -------
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
allNodes := p.cluster.Nodes()
stateNodes := allNodes.ActiveNodes()

// Get pods, exit if nothing to do
pendingPods, err := p.GetPendingPods(ctx)
Expand All @@ -137,12 +129,7 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul
// We do this after getting the pending pods so that we undershoot if pods are
// actively migrating from a node that is being deleted
// NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them
deletingNodePods, err := node.GetNodePods(ctx, p.kubeClient,
lo.Map(lo.Reject(markedForDeletionNodes, func(n *state.Node, _ int) bool {
return n.Node == nil
}), func(n *state.Node, _ int) *v1.Node {
return n.Node
})...)
deletingNodePods, err := allNodes.DeletingNodes().Pods(ctx, p.kubeClient)
if err != nil {
return reconcile.Result{}, err
}
Expand Down
50 changes: 19 additions & 31 deletions pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,12 @@ import (
)

type ExistingNode struct {
*v1.Node
name string
initialized bool
Pods []*v1.Pod
requests v1.ResourceList
topology *Topology
requirements scheduling.Requirements
available v1.ResourceList
taints []v1.Taint
hostPortUsage *scheduling.HostPortUsage
volumeUsage *scheduling.VolumeUsage
volumeLimits scheduling.VolumeCount
*state.Node

Pods []*v1.Pod
topology *Topology
requests v1.ResourceList
requirements scheduling.Requirements
}

func NewExistingNode(n *state.Node, topology *Topology, daemonResources v1.ResourceList) *ExistingNode {
Expand All @@ -54,17 +48,11 @@ func NewExistingNode(n *state.Node, topology *Topology, daemonResources v1.Resou
}
}
node := &ExistingNode{
Node: n.Node, // The real node that
name: n.Name(),
initialized: n.Initialized(),
available: n.Available(),
taints: n.Taints(),
topology: topology,
requests: remainingDaemonResources,
requirements: scheduling.NewLabelRequirements(n.Labels()),
hostPortUsage: n.HostPortUsage(),
volumeUsage: n.VolumeUsage(),
volumeLimits: n.VolumeLimits(),
Node: n,

topology: topology,
requests: remainingDaemonResources,
requirements: scheduling.NewLabelRequirements(n.Labels()),
}
node.requirements.Add(scheduling.NewRequirement(v1.LabelHostname, v1.NodeSelectorOpIn, n.HostName()))
topology.Register(v1.LabelHostname, n.HostName())
Expand All @@ -73,35 +61,35 @@ func NewExistingNode(n *state.Node, topology *Topology, daemonResources v1.Resou

func (n *ExistingNode) Add(ctx context.Context, pod *v1.Pod) error {
// Check Taints
if err := scheduling.Taints(n.taints).Tolerates(pod); err != nil {
if err := scheduling.Taints(n.Taints()).Tolerates(pod); err != nil {
return err
}

if err := n.hostPortUsage.Validate(pod); err != nil {
if err := n.HostPortUsage().Validate(pod); err != nil {
return err
}

// determine the number of volumes that will be mounted if the pod schedules
mountedVolumeCount, err := n.volumeUsage.Validate(ctx, pod)
mountedVolumeCount, err := n.VolumeUsage().Validate(ctx, pod)
if err != nil {
return err
}
if mountedVolumeCount.Exceeds(n.volumeLimits) {
if mountedVolumeCount.Exceeds(n.VolumeLimits()) {
return fmt.Errorf("would exceed node volume limits")
}

// check resource requests first since that's a pretty likely reason the pod won't schedule on an in-flight
// node, which at this point can't be increased in size
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))

if !resources.Fits(requests, n.available) {
if !resources.Fits(requests, n.Available()) {
return fmt.Errorf("exceeds node resources")
}

nodeRequirements := scheduling.NewRequirements(n.requirements.Values()...)
podRequirements := scheduling.NewPodRequirements(pod)
// Check Node Affinity Requirements
if err := nodeRequirements.Compatible(podRequirements); err != nil {
if err = nodeRequirements.Compatible(podRequirements); err != nil {
return err
}
nodeRequirements.Add(podRequirements.Values()...)
Expand All @@ -121,7 +109,7 @@ func (n *ExistingNode) Add(ctx context.Context, pod *v1.Pod) error {
n.requests = requests
n.requirements = nodeRequirements
n.topology.Record(pod, nodeRequirements)
n.hostPortUsage.Add(ctx, pod)
n.volumeUsage.Add(ctx, pod)
n.HostPortUsage().Add(ctx, pod)
n.VolumeUsage().Add(ctx, pod)
return nil
}
10 changes: 5 additions & 5 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod,

for _, node := range s.existingNodes {
if len(node.Pods) > 0 {
s.cluster.NominateNodeForPod(ctx, node.Name)
s.cluster.NominateNodeForPod(ctx, node.Name())
}
for _, pod := range node.Pods {
// If node is inflight, it won't have a real node to represent it
if node.Node != nil {
s.recorder.Publish(events.NominatePod(pod, node.Node))
if node.Node.Node != nil {
s.recorder.Publish(events.NominatePod(pod, node.Node.Node))
}
}
}
Expand Down Expand Up @@ -244,8 +244,8 @@ func (s *Scheduler) calculateExistingMachines(stateNodes []*state.Node, daemonSe
// We don't use the status field and instead recompute the remaining resources to ensure we have a consistent view
// of the cluster during scheduling. Depending on how node creation falls out, this will also work for cases where
// we don't create Node resources.
if _, ok := s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]]; ok {
s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]] = resources.Subtract(s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]], node.Capacity())
if _, ok := s.remainingResources[node.Labels()[v1alpha5.ProvisionerNameLabelKey]]; ok {
s.remainingResources[node.Labels()[v1alpha5.ProvisionerNameLabelKey]] = resources.Subtract(s.remainingResources[node.Labels()[v1alpha5.ProvisionerNameLabelKey]], node.Capacity())
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *Cluster) ForPodsWithAntiAffinity(fn func(p *v1.Pod, n *v1.Node) bool) {
return true
}
node, ok := c.nodes[c.nameToProviderID[nodeName]]
if !ok {
if !ok || node.Node == nil {
// if we receive the node deletion event before the pod deletion event, this can happen
return true
}
Expand All @@ -102,6 +102,17 @@ func (c *Cluster) ForEachNode(f func(n *Node) bool) {
}
}

// Nodes creates a DeepCopy of all state nodes.
// NOTE: This is very inefficient so this should only be used when DeepCopying is absolutely necessary
func (c *Cluster) Nodes() Nodes {
c.mu.RLock()
defer c.mu.RUnlock()

return lo.Map(lo.Values(c.nodes), func(n *Node, _ int) *Node {
return n.DeepCopy()
})
}

// IsNodeNominated returns true if the given node was expected to have a pod bound to it during a recent scheduling
// batch
func (c *Cluster) IsNodeNominated(name string) bool {
Expand Down Expand Up @@ -254,7 +265,6 @@ func (c *Cluster) newStateFromMachine(machine *v1alpha5.Machine, oldNode *Node)
n := &Node{
Node: oldNode.Node,
Machine: machine,
ProviderID: machine.Status.ProviderID,
hostPortUsage: oldNode.hostPortUsage,
volumeUsage: oldNode.volumeUsage,
daemonSetRequests: oldNode.daemonSetRequests,
Expand Down Expand Up @@ -293,7 +303,6 @@ func (c *Cluster) newStateFromNode(ctx context.Context, node *v1.Node, oldNode *
n := &Node{
Node: node,
Machine: oldNode.Machine,
ProviderID: node.Spec.ProviderID,
hostPortUsage: scheduling.NewHostPortUsage(),
volumeUsage: scheduling.NewVolumeLimits(c.kubeClient),
volumeLimits: scheduling.VolumeCount{},
Expand Down
Loading

0 comments on commit 430b00e

Please sign in to comment.