diff --git a/kwok/cloudprovider/cloudprovider.go b/kwok/cloudprovider/cloudprovider.go index 75f2b816fc..5b0cc4c2ba 100644 --- a/kwok/cloudprovider/cloudprovider.go +++ b/kwok/cloudprovider/cloudprovider.go @@ -23,6 +23,7 @@ import ( "math/rand" "strings" + "github.com/awslabs/operatorpkg/option" "github.com/awslabs/operatorpkg/status" "github.com/docker/docker/pkg/namesgenerator" "github.com/samber/lo" @@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) { } // Return the hard-coded instance types. -func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) { +func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) { return c.instanceTypes, nil } diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 946a56ac3f..3ae827f679 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/awslabs/operatorpkg/option" "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" @@ -189,7 +190,7 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) { }), nil } -func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) { +func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) { if np != nil { if err, ok := c.ErrorsForNodePool[np.Name]; ok { return nil, err diff --git a/pkg/cloudprovider/metrics/cloudprovider.go b/pkg/cloudprovider/metrics/cloudprovider.go index 79ae60b028..8abcd2f2da 100644 --- a/pkg/cloudprovider/metrics/cloudprovider.go +++ b/pkg/cloudprovider/metrics/cloudprovider.go @@ -20,6 +20,7 @@ import ( "context" opmetrics "github.com/awslabs/operatorpkg/metrics" + "github.com/awslabs/operatorpkg/option" "github.com/prometheus/client_golang/prometheus" crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) { return nodeClaims, err } -func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) { +func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) { method := "GetInstanceTypes" defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))() instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool) diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 14a7451b97..ebd2cfd1ef 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -25,9 +25,11 @@ import ( "sync" "time" + "github.com/awslabs/operatorpkg/option" "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -67,7 +69,7 @@ type CloudProvider interface { // Availability of types or zone may vary by nodepool or over time. Regardless of // availability, the GetInstanceTypes method should always return all instance types, // even those with no offerings available. - GetInstanceTypes(context.Context, *v1.NodePool) ([]*InstanceType, error) + GetInstanceTypes(context.Context, *v1.NodePool, ...option.Function[GetInstanceTypeOptions]) ([]*InstanceType, error) // IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements // it is tied to. IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error) @@ -81,6 +83,18 @@ type CloudProvider interface { GetSupportedNodeClasses() []status.Object } +type GetInstanceTypeOptions struct { + AvailabilitySnapshotUUID types.UID +} + +// GetInstanceTypes calls made with the same snapshot ID should have a consistent view of offering availability. This +// is crucial for offerings with capacity type "reserved" since cross-nodepool offerings may share availability. +func WithAvailabilitySnapshotUUID(uuid types.UID) option.Function[GetInstanceTypeOptions] { + return func(opts *GetInstanceTypeOptions) { + opts.AvailabilitySnapshotUUID = uuid + } +} + // InstanceType describes the properties of a potential node (either concrete attributes of an instance of this type // or supported options in the case of arrays) type InstanceType struct { diff --git a/pkg/controllers/disruption/helpers.go b/pkg/controllers/disruption/helpers.go index 85654fe597..2170922a60 100644 --- a/pkg/controllers/disruption/helpers.go +++ b/pkg/controllers/disruption/helpers.go @@ -33,7 +33,7 @@ import ( disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" - pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" + "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" @@ -48,7 +48,7 @@ var errCandidateDeleting = fmt.Errorf("candidate is deleting") //nolint:gocyclo func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, candidates ...*Candidate, -) (pscheduling.Results, error) { +) (scheduling.Results, error) { candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...) nodes := cluster.Nodes() deletingNodes := nodes.Deleting() @@ -62,33 +62,45 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * if _, ok := lo.Find(deletingNodes, func(n *state.StateNode) bool { return candidateNames.Has(n.Name()) }); ok { - return pscheduling.Results{}, errCandidateDeleting + return scheduling.Results{}, errCandidateDeleting } // We get the pods that are on nodes that are deleting deletingNodePods, err := deletingNodes.ReschedulablePods(ctx, kubeClient) if err != nil { - return pscheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err) + return scheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err) } // start by getting all pending pods pods, err := provisioner.GetPendingPods(ctx) if err != nil { - return pscheduling.Results{}, fmt.Errorf("determining pending pods, %w", err) + return scheduling.Results{}, fmt.Errorf("determining pending pods, %w", err) } for _, n := range candidates { pods = append(pods, n.reschedulablePods...) } pods = append(pods, deletingNodePods...) - scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes) + scheduler, err := provisioner.NewScheduler( + log.IntoContext(ctx, operatorlogging.NopLogger), + pods, + stateNodes, + // ReservedOfferingModeFallback is used for the following reasons: + // - For consolidation, we're only going to accept a decision if it lowers the cost of the cluster, and if it only + // requires a single additional nodeclaim. It doesn't matter in this scenario if we fallback. + // - For drift, fallback is required to ensure progress. Progress is only ensured with strict if multiple scheduling + // loops are allowed to proceed, but we need to ensure all pods on the drifted node are scheduled within a single + // iteration. This may result in non-ideal instance choices, but the alternative is deadlock. + // See issue TODO for more details. + scheduling.ReservedOfferingModeFallback, + ) if err != nil { - return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err) + return scheduling.Results{}, fmt.Errorf("creating scheduler, %w", err) } deletingNodePodKeys := lo.SliceToMap(deletingNodePods, func(p *corev1.Pod) (client.ObjectKey, interface{}) { return client.ObjectKeyFromObject(p), nil }) - results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes) + results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(scheduling.MaxInstanceTypes) for _, n := range results.ExistingNodes { // We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should // tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them @@ -100,6 +112,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * // If the pod is on a deleting node, we assume one of two things has already happened: // 1. The node was manually terminated, at which the provisioning controller has scheduled or is scheduling a node // for the pod. + // TODO: clarify this point, not clear to me // 2. The node was chosen for a previous disruption command, we assume that the uninitialized node will come up // for this command, and we assume it will be successful. If it is not successful, the node will become // not terminating, and we will no longer need to consider these pods. @@ -115,10 +128,10 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * // UninitializedNodeError tracks a special pod error for disruption where pods schedule to a node // that hasn't been initialized yet, meaning that we can't be confident to make a disruption decision based off of it type UninitializedNodeError struct { - *pscheduling.ExistingNode + *scheduling.ExistingNode } -func NewUninitializedNodeError(node *pscheduling.ExistingNode) *UninitializedNodeError { +func NewUninitializedNodeError(node *scheduling.ExistingNode) *UninitializedNodeError { return &UninitializedNodeError{ExistingNode: node} } diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 83e59b18cb..c214c82fd9 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -32,6 +32,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -212,7 +213,12 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, pods []*corev1. var ErrNodePoolsNotFound = errors.New("no nodepools found") //nolint:gocyclo -func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stateNodes []*state.StateNode) (*scheduler.Scheduler, error) { +func (p *Provisioner) NewScheduler( + ctx context.Context, + pods []*corev1.Pod, + stateNodes []*state.StateNode, + reservedOfferingMode scheduler.ReservedOfferingMode, +) (*scheduler.Scheduler, error) { nodePools, err := nodepoolutils.ListManaged(ctx, p.kubeClient, p.cloudProvider) if err != nil { return nil, fmt.Errorf("listing nodepools, %w", err) @@ -228,6 +234,8 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat return nil, ErrNodePoolsNotFound } + schedulerID := uuid.NewUUID() + // nodeTemplates generated from NodePools are ordered by weight // since they are stored within a slice and scheduling // will always attempt to schedule on the first nodeTemplate @@ -236,7 +244,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat instanceTypes := map[string][]*cloudprovider.InstanceType{} domains := map[string]sets.Set[string]{} for _, np := range nodePools { - its, err := p.cloudProvider.GetInstanceTypes(ctx, np) + its, err := p.cloudProvider.GetInstanceTypes(ctx, np, cloudprovider.WithAvailabilitySnapshotUUID(schedulerID)) if err != nil { log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Error(err, "skipping, unable to resolve instance types") continue @@ -295,7 +303,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat if err != nil { return nil, fmt.Errorf("getting daemon pods, %w", err) } - return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil + return scheduler.NewScheduler(ctx, schedulerID, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, reservedOfferingMode), nil } func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { @@ -332,7 +340,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { if len(pods) == 0 { return scheduler.Results{}, nil } - s, err := p.NewScheduler(ctx, pods, nodes.Active()) + s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.ReservedOfferingModeStrict) if err != nil { if errors.Is(err, ErrNodePoolsNotFound) { log.FromContext(ctx).Info("no nodepools found") diff --git a/pkg/controllers/provisioning/scheduling/nodeclaim.go b/pkg/controllers/provisioning/scheduling/nodeclaim.go index 04149c518c..e30a4148a3 100644 --- a/pkg/controllers/provisioning/scheduling/nodeclaim.go +++ b/pkg/controllers/provisioning/scheduling/nodeclaim.go @@ -42,12 +42,13 @@ import ( type NodeClaim struct { NodeClaimTemplate - Pods []*corev1.Pod - reservedOfferings map[string]cloudprovider.Offerings - topology *Topology - hostPortUsage *scheduling.HostPortUsage - daemonResources corev1.ResourceList - hostname string + Pods []*corev1.Pod + reservedOfferings map[string]cloudprovider.Offerings + reservedOfferingMode ReservedOfferingMode + topology *Topology + hostPortUsage *scheduling.HostPortUsage + daemonResources corev1.ResourceList + hostname string } type NodePoolLimitsExceededError struct { @@ -58,11 +59,6 @@ func (e NodePoolLimitsExceededError) Error() string { return fmt.Sprintf("all available instance types exceed limits for nodepool: %q", e.nodePool) } -// type IncompatibleNodeClaimTemplateError struct { -// nodePool string -// daemonSetOverhead corev1.ResourceList -// } - // ReservedOfferingError indicates a NodeClaim couldn't be created or a pod couldn't be added to an exxisting NodeClaim // due to type ReservedOfferingError struct { @@ -88,6 +84,7 @@ func NewNodeClaimForPod( remainingResources corev1.ResourceList, pod *corev1.Pod, podRequests corev1.ResourceList, + reservedOfferingMode ReservedOfferingMode, ) (*NodeClaim, error) { // Ensure we don't consider instance types which would exceed the limits of the NodePool instanceTypes := filterByRemainingResources(nodeClaimTemplate.InstanceTypeOptions, remainingResources) @@ -200,6 +197,7 @@ func (n *NodeClaim) Add(pod *corev1.Pod, podRequests corev1.ResourceList) error // offerings in a map from instance type name to offerings. Additionally, a slice of offerings to release is returned. // This is based on the previously reserved offerings which are no longer compatible with the nodeclaim. These should // not be released until we're ready to persist the changes to the nodeclaim. +// nolint:gocyclo func (n *NodeClaim) reserveOfferings( instanceTypes []*cloudprovider.InstanceType, nodeClaimRequirements scheduling.Requirements, @@ -225,18 +223,21 @@ func (n *NodeClaim) reserveOfferings( reservedOfferings[it.Name] = reserved } } - // If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should - // fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential - // offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to - // on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod - // during this scheduling simulation, but with the possibility of success on subsequent simulations. - if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 { - return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved")) - } - // If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out, - // we should fail to add the pod to this nodeclaim. - if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 { - return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying constraints would result ")) + if n.reservedOfferingMode == ReservedOfferingModeStrict { + // If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should + // fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential + // offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to + // on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod + // during this scheduling simulation, but with the possibility of success on subsequent simulations. + // Note: while this can occur both on initial creation and on + if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 { + return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved")) + } + // If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out, + // we should fail to add the pod to this nodeclaim. + if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 { + return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying constraints would result ")) + } } // Ensure we release any offerings for instance types which are no longer compatible with nodeClaimRequirements for instanceName, offerings := range n.reservedOfferings { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index d3498a420f..1a717a68bc 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -27,7 +27,6 @@ import ( "go.uber.org/multierr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog/v2" "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" @@ -44,10 +43,41 @@ import ( "sigs.k8s.io/karpenter/pkg/utils/resources" ) -func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1.NodePool, - cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology, - instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*corev1.Pod, - recorder events.Recorder, clock clock.Clock) *Scheduler { +type ReservedOfferingMode int + +// TODO: Evaluate if another mode should be created for drift. The problem with strict is that it assumes we can run +// multiple scheduling loops to make progress, but if scheduling all pods from the drifted node in a single iteration +// requires fallback, we're at a stalemate. This makes strict a non-starter for drift IMO. +// On the other hand, fallback will result in non-ideal launches when there's constrained capacity. This should be +// rectified by consolidation, but if we can be "right" the at initial launch that would be preferable. +// One potential improvement is a "preferences" type strategy, where we attempt to schedule the pod without fallback +// first. This is an improvement over the current fallback strategy since it ensures all new nodeclaims are attempted, +// before then attempting all nodepools, but it still doesn't address the case when offerings are reserved pessimistically. +// I don't believe there's a solution to this short of the max-flow based instance selection algorithm, which has its own +// drawbacks. +const ( + // ReservedOfferingModeStrict indicates that the scheduler should fail to add a pod to a nodeclaim if doing so would + // prevent it from scheduling to reserved capacity, when it would have otherwise. + ReservedOfferingModeStrict ReservedOfferingMode = iota + // ReservedOfferingModeFallbackAlways indicates to the scheduler that the addition of a pod to a nodeclaim which + // results in all potential reserved offerings being filtered out is allowed (e.g. on-demand / spot fallback). + ReservedOfferingModeFallback +) + +func NewScheduler( + ctx context.Context, + uuid types.UID, + kubeClient client.Client, + nodePools []*v1.NodePool, + cluster *state.Cluster, + stateNodes []*state.StateNode, + topology *Topology, + instanceTypes map[string][]*cloudprovider.InstanceType, + daemonSetPods []*corev1.Pod, + recorder events.Recorder, + clock clock.Clock, + reservedOfferingMode ReservedOfferingMode, +) *Scheduler { // if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint // during preference relaxation @@ -71,7 +101,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1 return nct, true }) s := &Scheduler{ - id: uuid.NewUUID(), + uuid: uuid, kubeClient: kubeClient, nodeClaimTemplates: templates, topology: topology, @@ -83,26 +113,28 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1 remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) { return np.Name, corev1.ResourceList(np.Spec.Limits) }), - clock: clock, + clock: clock, + reservedOfferingMode: reservedOfferingMode, } s.calculateExistingNodeClaims(stateNodes, daemonSetPods) return s } type Scheduler struct { - id types.UID // Unique UUID attached to this scheduling loop - newNodeClaims []*NodeClaim - existingNodes []*ExistingNode - nodeClaimTemplates []*NodeClaimTemplate - remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool - daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList - cachedPodRequests map[types.UID]corev1.ResourceList // (Pod Namespace/Name) -> calculated resource requests for the pod - preferences *Preferences - topology *Topology - cluster *state.Cluster - recorder events.Recorder - kubeClient client.Client - clock clock.Clock + uuid types.UID // Unique UUID attached to this scheduling loop + newNodeClaims []*NodeClaim + existingNodes []*ExistingNode + nodeClaimTemplates []*NodeClaimTemplate + remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool + daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList + cachedPodRequests map[types.UID]corev1.ResourceList // (Pod Namespace/Name) -> calculated resource requests for the pod + preferences *Preferences + topology *Topology + cluster *state.Cluster + recorder events.Recorder + kubeClient client.Client + clock clock.Clock + reservedOfferingMode ReservedOfferingMode } // Results contains the results of the scheduling operation @@ -225,11 +257,11 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { lastLogTime := s.clock.Now() batchSize := len(q.pods) for { - UnfinishedWorkSeconds.Set(s.clock.Since(startTime).Seconds(), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}) - QueueDepth.Set(float64(len(q.pods)), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}) + UnfinishedWorkSeconds.Set(s.clock.Since(startTime).Seconds(), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.uuid)}) + QueueDepth.Set(float64(len(q.pods)), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.uuid)}) if s.clock.Since(lastLogTime) > time.Minute { - log.FromContext(ctx).WithValues("pods-scheduled", batchSize-len(q.pods), "pods-remaining", len(q.pods), "duration", s.clock.Since(startTime).Truncate(time.Second), "scheduling-id", string(s.id)).Info("computing pod scheduling...") + log.FromContext(ctx).WithValues("pods-scheduled", batchSize-len(q.pods), "pods-remaining", len(q.pods), "duration", s.clock.Since(startTime).Truncate(time.Second), "scheduling-id", string(s.uuid)).Info("computing pod scheduling...") lastLogTime = s.clock.Now() } // Try the next pod @@ -253,7 +285,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { } } } - UnfinishedWorkSeconds.Delete(map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}) + UnfinishedWorkSeconds.Delete(map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.uuid)}) for _, m := range s.newNodeClaims { m.FinalizeScheduling() } @@ -261,7 +293,9 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { return Results{ NewNodeClaims: s.newNodeClaims, ExistingNodes: s.existingNodes, - PodErrors: errors, + PodErrors: lo.OmitBy(errors, func(_ *corev1.Pod, err error) bool { + return IsReservedOfferingError(err) + }), } } @@ -294,6 +328,7 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { s.remainingResources[nodeClaimTemplate.NodePoolName], pod, s.cachedPodRequests[pod.UID], + s.reservedOfferingMode, ) if err != nil { if nodeClaim != nil { diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index b08f105646..eb9b4d52e8 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -162,10 +162,20 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { b.Fatalf("creating topology, %s", err) } - scheduler := scheduling.NewScheduler(ctx, client, []*v1.NodePool{nodePool}, - cluster, nil, topology, - map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil, - events.NewRecorder(&record.FakeRecorder{}), clock) + scheduler := scheduling.NewScheduler( + ctx, + uuid.NewUUID(), + client, + []*v1.NodePool{nodePool}, + cluster, + nil, + topology, + map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, + nil, + events.NewRecorder(&record.FakeRecorder{}), + clock, + scheduling.ReservedOfferingModeStrict, + ) b.ResetTimer() // Pack benchmark diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index b93987a655..12f0f40cdb 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -3662,7 +3662,7 @@ var _ = Context("Scheduling", func() { }, }, }) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value - s, err := prov.NewScheduler(ctx, pods, nil) + s, err := prov.NewScheduler(ctx, pods, nil, scheduling.ReservedOfferingModeStrict) Expect(err).To(BeNil()) var wg sync.WaitGroup @@ -3734,7 +3734,7 @@ var _ = Context("Scheduling", func() { }, }, }) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value - s, err := prov.NewScheduler(ctx, pods, nil) + s, err := prov.NewScheduler(ctx, pods, nil, scheduling.ReservedOfferingModeStrict) Expect(err).To(BeNil()) s.Solve(injection.WithControllerName(ctx, "provisioner"), pods)