Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Handle insufficient capacity self-delete #208

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
type CloudProvider struct {
InstanceTypes []*cloudprovider.InstanceType

mu sync.RWMutex
// CreateCalls contains the arguments for every create call that was made since it was cleared
mu sync.RWMutex
CreateCalls []*v1alpha5.Machine
AllowedCreateCalls int
NextCreateErr error
CreatedMachines map[string]*v1alpha5.Machine
Drifted bool
}
Expand All @@ -63,12 +64,19 @@ func (c *CloudProvider) Reset() {
c.CreateCalls = []*v1alpha5.Machine{}
c.CreatedMachines = map[string]*v1alpha5.Machine{}
c.AllowedCreateCalls = math.MaxInt
c.NextCreateErr = nil
}

func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (*v1alpha5.Machine, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.NextCreateErr != nil {
temp := c.NextCreateErr
c.NextCreateErr = nil
return nil, temp
}

c.CreateCalls = append(c.CreateCalls, machine)
if len(c.CreateCalls) > c.AllowedCreateCalls {
return &v1alpha5.Machine{}, fmt.Errorf("erroring as number of AllowedCreateCalls has been exceeded")
Expand Down Expand Up @@ -188,6 +196,9 @@ func (c *CloudProvider) Delete(_ context.Context, m *v1alpha5.Machine) error {
}

func (c *CloudProvider) IsMachineDrifted(context.Context, *v1alpha5.Machine) (bool, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.Drifted, nil
}

Expand Down
36 changes: 33 additions & 3 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,17 @@ func (ofs Offerings) Cheapest() Offering {

// MachineNotFoundError is an error type returned by CloudProviders when the reason for failure is NotFound
type MachineNotFoundError struct {
Err error
error
}

func NewMachineNotFoundError(err error) *MachineNotFoundError {
return &MachineNotFoundError{
Err: err,
error: err,
}
}

func (e *MachineNotFoundError) Error() string {
return fmt.Sprintf("machine not found, %s", e.Err)
return fmt.Sprintf("machine not found, %s", e.error)
}

func IsMachineNotFoundError(err error) bool {
Expand All @@ -173,3 +173,33 @@ func IgnoreMachineNotFoundError(err error) error {
}
return err
}

// InsufficientCapacityError is an error type returned by CloudProviders when a launch fails due to a lack of capacity from machine requirements
type InsufficientCapacityError struct {
error
}

func NewInsufficientCapacityError(err error) *InsufficientCapacityError {
return &InsufficientCapacityError{
error: err,
}
}

func (e *InsufficientCapacityError) Error() string {
return fmt.Sprintf("insufficient capacity, %s", e.error)
}

func IsInsufficientCapacityError(err error) bool {
if err == nil {
return false
}
var icErr *InsufficientCapacityError
return errors.As(err, &icErr)
}

func IgnoreInsufficientCapacityError(err error) error {
if IsInsufficientCapacityError(err) {
return nil
}
return err
}
4 changes: 4 additions & 0 deletions pkg/controllers/machine/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (l *Launch) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reco
logging.FromContext(ctx).Debugf("creating machine")
retrieved, err = l.cloudProvider.Create(ctx, machine)
if err != nil {
if cloudprovider.IsInsufficientCapacityError(err) {
logging.FromContext(ctx).Error(err)
return reconcile.Result{}, client.IgnoreNotFound(l.kubeClient.Delete(ctx, machine))
}
return reconcile.Result{}, fmt.Errorf("creating machine, %w", err)
}
} else {
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/machine/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package machine_test

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"github.com/aws/karpenter-core/pkg/apis"
"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/cloudprovider/fake"
"github.com/aws/karpenter-core/pkg/controllers/machine"
"github.com/aws/karpenter-core/pkg/controllers/machine/terminator"
Expand Down Expand Up @@ -158,6 +160,15 @@ var _ = Describe("Controller", func() {
machine = ExpectExists(ctx, env.Client, machine)
Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineCreated).Status).To(Equal(v1.ConditionTrue))
})
It("should delete the machine if InsufficientCapacity is returned from the cloudprovider", func() {
cloudProvider.NextCreateErr = cloudprovider.NewInsufficientCapacityError(fmt.Errorf("all instance types were unavailable"))
machine := test.Machine()
ExpectApplied(ctx, env.Client, machine)
ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine))
ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) // Reconcile again to handle termination flow

ExpectNotFound(ctx, env.Client, machine)
})
})
Context("Registration", func() {
It("should match the Machine to the Node when the Node comes online", func() {
Expand Down