Skip to content

Commit

Permalink
snapshots and disruption integration
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Jan 29, 2025
1 parent afe2a47 commit e523238
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 72 deletions.
3 changes: 2 additions & 1 deletion kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"strings"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/samber/lo"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) {
}

// Return the hard-coded instance types.
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
return c.instanceTypes, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -189,7 +190,7 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) {
}), nil
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
if np != nil {
if err, ok := c.ErrorsForNodePool[np.Name]; ok {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/awslabs/operatorpkg/option"
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

Expand Down Expand Up @@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
return nodeClaims, err
}

func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
method := "GetInstanceTypes"
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)
Expand Down
16 changes: 15 additions & 1 deletion pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"sync"
"time"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -67,7 +69,7 @@ type CloudProvider interface {
// Availability of types or zone may vary by nodepool or over time. Regardless of
// availability, the GetInstanceTypes method should always return all instance types,
// even those with no offerings available.
GetInstanceTypes(context.Context, *v1.NodePool) ([]*InstanceType, error)
GetInstanceTypes(context.Context, *v1.NodePool, ...option.Function[GetInstanceTypeOptions]) ([]*InstanceType, error)
// IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements
// it is tied to.
IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error)
Expand All @@ -81,6 +83,18 @@ type CloudProvider interface {
GetSupportedNodeClasses() []status.Object
}

type GetInstanceTypeOptions struct {
AvailabilitySnapshotUUID types.UID
}

// GetInstanceTypes calls made with the same snapshot ID should have a consistent view of offering availability. This
// is crucial for offerings with capacity type "reserved" since cross-nodepool offerings may share availability.
func WithAvailabilitySnapshotUUID(uuid types.UID) option.Function[GetInstanceTypeOptions] {
return func(opts *GetInstanceTypeOptions) {
opts.AvailabilitySnapshotUUID = uuid
}
}

// InstanceType describes the properties of a potential node (either concrete attributes of an instance of this type
// or supported options in the case of arrays)
type InstanceType struct {
Expand Down
33 changes: 23 additions & 10 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand All @@ -48,7 +48,7 @@ var errCandidateDeleting = fmt.Errorf("candidate is deleting")
//nolint:gocyclo
func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
candidates ...*Candidate,
) (pscheduling.Results, error) {
) (scheduling.Results, error) {
candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...)
nodes := cluster.Nodes()
deletingNodes := nodes.Deleting()
Expand All @@ -62,33 +62,45 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
if _, ok := lo.Find(deletingNodes, func(n *state.StateNode) bool {
return candidateNames.Has(n.Name())
}); ok {
return pscheduling.Results{}, errCandidateDeleting
return scheduling.Results{}, errCandidateDeleting
}

// We get the pods that are on nodes that are deleting
deletingNodePods, err := deletingNodes.ReschedulablePods(ctx, kubeClient)
if err != nil {
return pscheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
return scheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
}
// start by getting all pending pods
pods, err := provisioner.GetPendingPods(ctx)
if err != nil {
return pscheduling.Results{}, fmt.Errorf("determining pending pods, %w", err)
return scheduling.Results{}, fmt.Errorf("determining pending pods, %w", err)
}
for _, n := range candidates {
pods = append(pods, n.reschedulablePods...)
}
pods = append(pods, deletingNodePods...)
scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes)
scheduler, err := provisioner.NewScheduler(
log.IntoContext(ctx, operatorlogging.NopLogger),
pods,
stateNodes,
// ReservedOfferingModeFallback is used for the following reasons:
// - For consolidation, we're only going to accept a decision if it lowers the cost of the cluster, and if it only
// requires a single additional nodeclaim. It doesn't matter in this scenario if we fallback.
// - For drift, fallback is required to ensure progress. Progress is only ensured with strict if multiple scheduling
// loops are allowed to proceed, but we need to ensure all pods on the drifted node are scheduled within a single
// iteration. This may result in non-ideal instance choices, but the alternative is deadlock.
// See issue TODO for more details.
scheduling.ReservedOfferingModeFallback,
)
if err != nil {
return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
return scheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
}

deletingNodePodKeys := lo.SliceToMap(deletingNodePods, func(p *corev1.Pod) (client.ObjectKey, interface{}) {
return client.ObjectKeyFromObject(p), nil
})

results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(scheduling.MaxInstanceTypes)
for _, n := range results.ExistingNodes {
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them
Expand All @@ -100,6 +112,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
// If the pod is on a deleting node, we assume one of two things has already happened:
// 1. The node was manually terminated, at which the provisioning controller has scheduled or is scheduling a node
// for the pod.
// TODO: clarify this point, not clear to me
// 2. The node was chosen for a previous disruption command, we assume that the uninitialized node will come up
// for this command, and we assume it will be successful. If it is not successful, the node will become
// not terminating, and we will no longer need to consider these pods.
Expand All @@ -115,10 +128,10 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
// UninitializedNodeError tracks a special pod error for disruption where pods schedule to a node
// that hasn't been initialized yet, meaning that we can't be confident to make a disruption decision based off of it
type UninitializedNodeError struct {
*pscheduling.ExistingNode
*scheduling.ExistingNode
}

func NewUninitializedNodeError(node *pscheduling.ExistingNode) *UninitializedNodeError {
func NewUninitializedNodeError(node *scheduling.ExistingNode) *UninitializedNodeError {
return &UninitializedNodeError{ExistingNode: node}
}

Expand Down
16 changes: 12 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -212,7 +213,12 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, pods []*corev1.
var ErrNodePoolsNotFound = errors.New("no nodepools found")

//nolint:gocyclo
func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stateNodes []*state.StateNode) (*scheduler.Scheduler, error) {
func (p *Provisioner) NewScheduler(
ctx context.Context,
pods []*corev1.Pod,
stateNodes []*state.StateNode,
reservedOfferingMode scheduler.ReservedOfferingMode,
) (*scheduler.Scheduler, error) {
nodePools, err := nodepoolutils.ListManaged(ctx, p.kubeClient, p.cloudProvider)
if err != nil {
return nil, fmt.Errorf("listing nodepools, %w", err)
Expand All @@ -228,6 +234,8 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
return nil, ErrNodePoolsNotFound
}

schedulerID := uuid.NewUUID()

// nodeTemplates generated from NodePools are ordered by weight
// since they are stored within a slice and scheduling
// will always attempt to schedule on the first nodeTemplate
Expand All @@ -236,7 +244,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
instanceTypes := map[string][]*cloudprovider.InstanceType{}
domains := map[string]sets.Set[string]{}
for _, np := range nodePools {
its, err := p.cloudProvider.GetInstanceTypes(ctx, np)
its, err := p.cloudProvider.GetInstanceTypes(ctx, np, cloudprovider.WithAvailabilitySnapshotUUID(schedulerID))
if err != nil {
log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Error(err, "skipping, unable to resolve instance types")
continue
Expand Down Expand Up @@ -295,7 +303,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
if err != nil {
return nil, fmt.Errorf("getting daemon pods, %w", err)
}
return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil
return scheduler.NewScheduler(ctx, schedulerID, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, reservedOfferingMode), nil
}

func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
Expand Down Expand Up @@ -332,7 +340,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
if len(pods) == 0 {
return scheduler.Results{}, nil
}
s, err := p.NewScheduler(ctx, pods, nodes.Active())
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.ReservedOfferingModeStrict)
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
log.FromContext(ctx).Info("no nodepools found")
Expand Down
47 changes: 24 additions & 23 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ import (
type NodeClaim struct {
NodeClaimTemplate

Pods []*corev1.Pod
reservedOfferings map[string]cloudprovider.Offerings
topology *Topology
hostPortUsage *scheduling.HostPortUsage
daemonResources corev1.ResourceList
hostname string
Pods []*corev1.Pod
reservedOfferings map[string]cloudprovider.Offerings
reservedOfferingMode ReservedOfferingMode
topology *Topology
hostPortUsage *scheduling.HostPortUsage
daemonResources corev1.ResourceList
hostname string
}

type NodePoolLimitsExceededError struct {
Expand All @@ -58,11 +59,6 @@ func (e NodePoolLimitsExceededError) Error() string {
return fmt.Sprintf("all available instance types exceed limits for nodepool: %q", e.nodePool)
}

// type IncompatibleNodeClaimTemplateError struct {
// nodePool string
// daemonSetOverhead corev1.ResourceList
// }

// ReservedOfferingError indicates a NodeClaim couldn't be created or a pod couldn't be added to an exxisting NodeClaim
// due to
type ReservedOfferingError struct {
Expand All @@ -88,6 +84,7 @@ func NewNodeClaimForPod(
remainingResources corev1.ResourceList,
pod *corev1.Pod,
podRequests corev1.ResourceList,
reservedOfferingMode ReservedOfferingMode,
) (*NodeClaim, error) {
// Ensure we don't consider instance types which would exceed the limits of the NodePool
instanceTypes := filterByRemainingResources(nodeClaimTemplate.InstanceTypeOptions, remainingResources)
Expand Down Expand Up @@ -200,6 +197,7 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podRequests corev1.ResourceList) error
// offerings in a map from instance type name to offerings. Additionally, a slice of offerings to release is returned.
// This is based on the previously reserved offerings which are no longer compatible with the nodeclaim. These should
// not be released until we're ready to persist the changes to the nodeclaim.
// nolint:gocyclo
func (n *NodeClaim) reserveOfferings(
instanceTypes []*cloudprovider.InstanceType,
nodeClaimRequirements scheduling.Requirements,
Expand All @@ -225,18 +223,21 @@ func (n *NodeClaim) reserveOfferings(
reservedOfferings[it.Name] = reserved
}
}
// If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should
// fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential
// offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to
// on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod
// during this scheduling simulation, but with the possibility of success on subsequent simulations.
if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 {
return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
}
// If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out,
// we should fail to add the pod to this nodeclaim.
if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 {
return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying constraints would result "))
if n.reservedOfferingMode == ReservedOfferingModeStrict {
// If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should
// fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential
// offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to
// on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod
// during this scheduling simulation, but with the possibility of success on subsequent simulations.
// Note: while this can occur both on initial creation and on
if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 {
return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved"))
}
// If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out,
// we should fail to add the pod to this nodeclaim.
if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 {
return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying constraints would result "))
}
}
// Ensure we release any offerings for instance types which are no longer compatible with nodeClaimRequirements
for instanceName, offerings := range n.reservedOfferings {
Expand Down
Loading

0 comments on commit e523238

Please sign in to comment.