Skip to content

Commit

Permalink
wip: reserved capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Jan 10, 2025
1 parent cd92f5b commit 1bd2ce9
Show file tree
Hide file tree
Showing 19 changed files with 513 additions and 198 deletions.
4 changes: 2 additions & 2 deletions kwok/cloudprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func setDefaultOptions(opts InstanceTypeOptions) InstanceTypeOptions {

// make sure all the instance types are available
for i := range opts.Offerings {
opts.Offerings[i].Available = true
opts.Offerings[i].OfferingAvailabilityResolver = cloudprovider.TrueStaticAvailabilityResolver
}

return opts
Expand Down Expand Up @@ -174,11 +174,11 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
Requirements: requirements,
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering {
return cloudprovider.Offering{
OfferingAvailabilityResolver: off.Offering.OfferingAvailabilityResolver,
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
})...),
Price: off.Offering.Price,
Available: off.Offering.Available,
}
}),
Capacity: options.Resources,
Expand Down
4 changes: 2 additions & 2 deletions kwok/tools/gen_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions {
corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}},
},
Offering: cloudprovider.Offering{
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
},
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ArchitectureArm64 = "arm64"
CapacityTypeSpot = "spot"
CapacityTypeOnDemand = "on-demand"
CapacityTypeReserved = "reserved"
)

// Karpenter specific domains and labels
Expand Down
64 changes: 42 additions & 22 deletions pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,46 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
}
if len(options.Offerings) == 0 {
options.Offerings = []cloudprovider.Offering{
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}), Price: PriceFromResources(options.Resources), Available: true},
{
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}),
Price: PriceFromResources(options.Resources),
},
}
}
if len(options.Architecture) == 0 {
Expand Down Expand Up @@ -153,12 +173,12 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType {
price := PriceFromResources(opts.Resources)
opts.Offerings = []cloudprovider.Offering{
{
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: ct,
corev1.LabelTopologyZone: zone,
}),
Price: price,
Available: true,
Price: price,
},
}
instanceTypes = append(instanceTypes, NewInstanceType(opts))
Expand Down
113 changes: 109 additions & 4 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import (
var (
SpotRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
OnDemandRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand))

TrueStaticAvailabilityResolver OfferingAvailabilityResolver = staticAvailabilityResolver{available: true}
FalseStaticAvailabilityResolver OfferingAvailabilityResolver = staticAvailabilityResolver{available: false}
)

type DriftReason string
Expand Down Expand Up @@ -224,6 +227,15 @@ func (its InstanceTypes) Truncate(requirements scheduling.Requirements, maxItems
return truncatedInstanceTypes, nil
}

func (its InstanceTypes) Difference(other InstanceTypes) InstanceTypes {
names := sets.New(lo.Map(other, func(it *InstanceType, _ int) string {
return it.Name
})...)
return lo.Reject(its, func(it *InstanceType, _ int) bool {
return names.Has(it.Name)
})
}

type InstanceTypeOverhead struct {
// KubeReserved returns the default resources allocated to kubernetes system daemons by default
KubeReserved corev1.ResourceList
Expand All @@ -237,24 +249,78 @@ func (i InstanceTypeOverhead) Total() corev1.ResourceList {
return resources.Merge(i.KubeReserved, i.SystemReserved, i.EvictionThreshold)
}

// An OfferingAvailabilityResolver is used to determine if there is available capacity for a given offering. To ensure
// consistency between multiple controllers attempting to provision a NodeClaim with a given offering, offerings should
// be "reserved" by the controller. Once a launch decision has been made, all offerings which were reserved may be
// released, enabling their use once again.
type OfferingAvailabilityResolver interface {
Available() bool
Reserve(string) bool
GetReservation(string) OfferingReservation
}

type OfferingReservation interface {
Release()
Commit()
Matches(*v1.NodeClaim) bool
}

type OfferingReservations []OfferingReservation

func (r OfferingReservations) Commit() {
for _, reservation := range r {
reservation.Commit()
}
}

func (r OfferingReservations) Release() {
for _, reservation := range r {
reservation.Release()
}
}

func (r OfferingReservations) Matching(nc *v1.NodeClaim) OfferingReservations {
return lo.Filter(r, func(reservation OfferingReservation, _ int) bool {
return reservation.Matches(nc)
})
}


// An Offering describes where an InstanceType is available to be used, with the expectation that its properties
// may be tightly coupled (e.g. the availability of an instance type in some zone is scoped to a capacity type) and
// these properties are captured with labels in Requirements.
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone
type Offering struct {
OfferingAvailabilityResolver

Requirements scheduling.Requirements
Price float64
// Available is added so that Offerings can return all offerings that have ever existed for an instance type,
// so we can get historical pricing data for calculating savings in consolidation
Available bool
}

type Offerings []Offering

// Reserve attempts to make a reservation for each offering, returning true if it was successful for any.
func (ofs Offerings) Reserve(id string) bool {
success := false
for i := range ofs {
success = success || ofs[i].Reserve(id)
}
return success
}

func (ofs Offerings) Reservations(id string) OfferingReservations {
return lo.FilterMap(ofs, func(o Offering, _ int) (OfferingReservation, bool) {
if reservation := o.GetReservation(id); reservation != nil {
return reservation, true
}
return nil, false
})
}

// Available filters the available offerings from the returned offerings
func (ofs Offerings) Available() Offerings {
return lo.Filter(ofs, func(o Offering, _ int) bool {
return o.Available
return o.Available()
})
}

Expand Down Expand Up @@ -397,3 +463,42 @@ func NewCreateError(err error, message string) *CreateError {
ConditionMessage: message,
}
}

type staticAvailabilityResolver struct {
requirements scheduling.Requirements
available bool
}

type noopReservation struct {
requirements scheduling.Requirements
}

func (r staticAvailabilityResolver) Available() bool {
return r.available
}

func (r staticAvailabilityResolver) Reserve(_ string) bool {
return r.available
}

func (r staticAvailabilityResolver) GetReservation(_ string) OfferingReservation {
return noopReservation{
requirements: r.requirements,
}
}

func (r noopReservation) Commit() {}

func (r noopReservation) Release() {}

func (r noopReservation) Matches(nc *v1.NodeClaim) bool {
reqs := scheduling.NewLabelRequirements(nc.Labels)
return reqs.IsCompatible(r.requirements, scheduling.AllowUndefinedWellKnownLabels)
}

func NewStaticAvailabilityResolver(available bool, requirements scheduling.Requirements) OfferingAvailabilityResolver {
return staticAvailabilityResolver{
available: available,
requirements: requirements,
}
}
54 changes: 27 additions & 27 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2050,29 +2050,29 @@ var _ = Describe("Consolidation", func() {
Name: "current-on-demand",
Offerings: []cloudprovider.Offering{
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.5,
Available: false,
OfferingAvailabilityResolver: cloudprovider.FalseStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.5,
},
},
})
replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{
Name: "potential-spot-replacement",
Offerings: []cloudprovider.Offering{
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 1.0,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 1.0,
},
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1b"}),
Price: 0.2,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1b"}),
Price: 0.2,
},
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1c"}),
Price: 0.4,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1c"}),
Price: 0.4,
},
},
})
Expand Down Expand Up @@ -2134,34 +2134,34 @@ var _ = Describe("Consolidation", func() {
Name: "current-on-demand",
Offerings: []cloudprovider.Offering{
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.5,
Available: false,
OfferingAvailabilityResolver: cloudprovider.FalseStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.5,
},
},
})
replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{
Name: "on-demand-replacement",
Offerings: []cloudprovider.Offering{
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.6,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.6,
},
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1b"}),
Price: 0.6,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1b"}),
Price: 0.6,
},
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1b"}),
Price: 0.2,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1b"}),
Price: 0.2,
},
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1c"}),
Price: 0.3,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1c"}),
Price: 0.3,
},
},
})
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,19 +775,19 @@ var _ = Describe("Drift", func() {
Name: "current-on-demand",
Offerings: []cloudprovider.Offering{
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.5,
Available: false,
OfferingAvailabilityResolver: cloudprovider.FalseStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.5,
},
},
})
replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{
Name: "replacement-on-demand",
Offerings: []cloudprovider.Offering{
{
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.3,
Available: true,
OfferingAvailabilityResolver: cloudprovider.TrueStaticAvailabilityResolver,
Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}),
Price: 0.3,
},
},
Resources: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("3")},
Expand Down
Loading

0 comments on commit 1bd2ce9

Please sign in to comment.