Skip to content

Commit

Permalink
Merge branch 'main' into existence-sub-reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Feb 10, 2023
2 parents 176bad1 + f3b4934 commit eae389d
Show file tree
Hide file tree
Showing 21 changed files with 280 additions and 154 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ test: ## Run tests
go test ./... \
-race \
--ginkgo.focus="${FOCUS}" \
--ginkgo.v \
-cover -coverprofile=coverage.out -outputdir=. -coverpkg=./...

deflake: ## Run randomized, racing tests until the test fails to catch flakes
Expand Down
1 change: 1 addition & 0 deletions hack/toolchain.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tools() {
go install github.com/sigstore/cosign/cmd/[email protected]
go install github.com/gohugoio/[email protected]+extended
go install golang.org/x/vuln/cmd/[email protected]
go install github.com/onsi/ginkgo/v2/ginkgo@latest

if ! echo "$PATH" | grep -q "${GOPATH:-undefined}/bin\|$HOME/go/bin"; then
echo "Go workspace's \"bin\" directory is not in PATH. Run 'export PATH=\"\$PATH:\${GOPATH:-\$HOME/go}/bin\"'."
Expand Down
13 changes: 12 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)
type CloudProvider struct {
InstanceTypes []*cloudprovider.InstanceType

mu sync.RWMutex
// CreateCalls contains the arguments for every create call that was made since it was cleared
mu sync.RWMutex
CreateCalls []*v1alpha5.Machine
AllowedCreateCalls int
NextCreateErr error
CreatedMachines map[string]*v1alpha5.Machine
Drifted bool
}
Expand All @@ -63,12 +64,19 @@ func (c *CloudProvider) Reset() {
c.CreateCalls = []*v1alpha5.Machine{}
c.CreatedMachines = map[string]*v1alpha5.Machine{}
c.AllowedCreateCalls = math.MaxInt
c.NextCreateErr = nil
}

func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (*v1alpha5.Machine, error) {
c.mu.Lock()
defer c.mu.Unlock()

if c.NextCreateErr != nil {
temp := c.NextCreateErr
c.NextCreateErr = nil
return nil, temp
}

c.CreateCalls = append(c.CreateCalls, machine)
if len(c.CreateCalls) > c.AllowedCreateCalls {
return &v1alpha5.Machine{}, fmt.Errorf("erroring as number of AllowedCreateCalls has been exceeded")
Expand Down Expand Up @@ -188,6 +196,9 @@ func (c *CloudProvider) Delete(_ context.Context, m *v1alpha5.Machine) error {
}

func (c *CloudProvider) IsMachineDrifted(context.Context, *v1alpha5.Machine) (bool, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.Drifted, nil
}

Expand Down
36 changes: 33 additions & 3 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,17 @@ func (ofs Offerings) Cheapest() Offering {

// MachineNotFoundError is an error type returned by CloudProviders when the reason for failure is NotFound
type MachineNotFoundError struct {
Err error
error
}

func NewMachineNotFoundError(err error) *MachineNotFoundError {
return &MachineNotFoundError{
Err: err,
error: err,
}
}

func (e *MachineNotFoundError) Error() string {
return fmt.Sprintf("machine not found, %s", e.Err)
return fmt.Sprintf("machine not found, %s", e.error)
}

func IsMachineNotFoundError(err error) bool {
Expand All @@ -173,3 +173,33 @@ func IgnoreMachineNotFoundError(err error) error {
}
return err
}

// InsufficientCapacityError is an error type returned by CloudProviders when a launch fails due to a lack of capacity from machine requirements
type InsufficientCapacityError struct {
error
}

func NewInsufficientCapacityError(err error) *InsufficientCapacityError {
return &InsufficientCapacityError{
error: err,
}
}

func (e *InsufficientCapacityError) Error() string {
return fmt.Sprintf("insufficient capacity, %s", e.error)
}

func IsInsufficientCapacityError(err error) bool {
if err == nil {
return false
}
var icErr *InsufficientCapacityError
return errors.As(err, &icErr)
}

func IgnoreInsufficientCapacityError(err error) error {
if IsInsufficientCapacityError(err) {
return nil
}
return err
}
24 changes: 13 additions & 11 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
deprovisioningevents "github.com/aws/karpenter-core/pkg/controllers/deprovisioning/events"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/scheduling"
)
Expand All @@ -43,19 +45,19 @@ type consolidation struct {
kubeClient client.Client
provisioner *provisioning.Provisioner
cloudProvider cloudprovider.CloudProvider
reporter *Reporter
recorder events.Recorder
lastConsolidationState int64
}

func makeConsolidation(clock clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
cloudProvider cloudprovider.CloudProvider, reporter *Reporter) consolidation {
cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) consolidation {
return consolidation{
clock: clock,
cluster: cluster,
kubeClient: kubeClient,
provisioner: provisioner,
cloudProvider: cloudProvider,
reporter: reporter,
recorder: recorder,
lastConsolidationState: 0,
}
}
Expand All @@ -79,7 +81,7 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []Can
// filter out nodes that can't be terminated
nodes = lo.Filter(nodes, func(cn CandidateNode, _ int) bool {
if reason, canTerminate := canBeTerminated(cn, pdbs); !canTerminate {
c.reporter.RecordUnconsolidatableReason(ctx, cn.Node, reason)
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(cn.Node, reason))
return false
}
return true
Expand All @@ -94,15 +96,15 @@ func (c *consolidation) sortAndFilterCandidates(ctx context.Context, nodes []Can
// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (c *consolidation) ShouldDeprovision(ctx context.Context, n *state.Node, provisioner *v1alpha5.Provisioner, _ []*v1.Pod) bool {
if val, ok := n.Annotations()[v1alpha5.DoNotConsolidateNodeAnnotationKey]; ok {
c.reporter.RecordUnconsolidatableReason(ctx, n.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey))
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(n.Node, fmt.Sprintf("%s annotation exists", v1alpha5.DoNotConsolidateNodeAnnotationKey)))
return val != "true"
}
if provisioner == nil {
c.reporter.RecordUnconsolidatableReason(ctx, n.Node, "provisioner is unknown")
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(n.Node, "provisioner is unknown"))
return false
}
if provisioner.Spec.Consolidation == nil || !ptr.BoolValue(provisioner.Spec.Consolidation.Enabled) {
c.reporter.RecordUnconsolidatableReason(ctx, n.Node, fmt.Sprintf("provisioner %s has consolidation disabled", provisioner.Name))
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(n.Node, fmt.Sprintf("provisioner %s has consolidation disabled", provisioner.Name)))
return false
}
return true
Expand Down Expand Up @@ -191,7 +193,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi
if !allPodsScheduled {
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(nodes) == 1 {
c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, "not all pods would schedule")
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, "not all pods would schedule"))
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -207,7 +209,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi
// we're not going to turn a single node into multiple nodes
if len(newNodes) != 1 {
if len(nodes) == 1 {
c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, fmt.Sprintf("can't remove without creating %d nodes", len(newNodes)))
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, fmt.Sprintf("can't remove without creating %d nodes", len(newNodes))))
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -221,7 +223,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi
newNodes[0].InstanceTypeOptions = filterByPrice(newNodes[0].InstanceTypeOptions, newNodes[0].Requirements, nodesPrice)
if len(newNodes[0].InstanceTypeOptions) == 0 {
if len(nodes) == 1 {
c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, "can't replace with a cheaper node")
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, "can't replace with a cheaper node"))
}
// no instance types remain after filtering by price
return Command{action: actionDoNothing}, nil
Expand All @@ -240,7 +242,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, nodes ...Candi
if allExistingAreSpot &&
newNodes[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) {
if len(nodes) == 1 {
c.reporter.RecordUnconsolidatableReason(ctx, nodes[0].Node, "can't replace a spot node with a spot node")
c.recorder.Publish(deprovisioningevents.UnconsolidatableReason(nodes[0].Node, "can't replace a spot node with a spot node"))
}
return Command{action: actionDoNothing}, nil
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Controller struct {
clock clock.Clock
cloudProvider cloudprovider.CloudProvider
deprovisioners []Deprovisioner
reporter *Reporter
}

// pollingPeriod that we inspect cluster to look for opportunities to deprovision
Expand All @@ -73,14 +72,12 @@ var waitRetryOptions = []retry.Option{
func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provisioning.Provisioner,
cp cloudprovider.CloudProvider, recorder events.Recorder, cluster *state.Cluster) *Controller {

reporter := NewReporter(recorder)
return &Controller{
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
reporter: reporter,
cloudProvider: cp,
deprovisioners: []Deprovisioner{
// Expire any nodes that must be deleted, allowing their pods to potentially land on currently
Expand All @@ -90,11 +87,11 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi
// Delete any remaining empty nodes as there is zero cost in terms of dirsuption. Emptiness and
// emptyNodeConsolidation are mutually exclusive, only one of these will operate
NewEmptiness(clk),
NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
NewEmptyNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder),
// Attempt to identify multiple nodes that we can consolidate simultaneously to reduce pod churn
NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
NewMultiNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder),
// And finally fall back our single node consolidation to further reduce cluster cost.
NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter),
NewSingleNodeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder),
},
}
}
Expand Down
31 changes: 23 additions & 8 deletions pkg/controllers/deprovisioning/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package deprovisioning_test

import (
"sync"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -61,9 +62,11 @@ var _ = Describe("Drift", func() {
// inform cluster state about the nodes
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node))
fakeClock.Step(10 * time.Minute)
go triggerVerifyAction()
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).ToNot(HaveOccurred())
wg.Wait()

Expect(cloudProvider.CreateCalls).To(HaveLen(0))
ExpectExists(ctx, env.Client, node)
Expand Down Expand Up @@ -93,9 +96,12 @@ var _ = Describe("Drift", func() {
// inform cluster state about the nodes
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node))
fakeClock.Step(10 * time.Minute)
go triggerVerifyAction()

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).ToNot(HaveOccurred())
wg.Wait()

Expect(cloudProvider.CreateCalls).To(HaveLen(0))
ExpectExists(ctx, env.Client, node)
Expand Down Expand Up @@ -154,9 +160,11 @@ var _ = Describe("Drift", func() {
// inform cluster state about the nodes
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node))
fakeClock.Step(10 * time.Minute)
go triggerVerifyAction()
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).ToNot(HaveOccurred())
wg.Wait()

// we don't need a new node, but we should evict everything off one of node2 which only has a single pod
Expect(cloudProvider.CreateCalls).To(HaveLen(0))
Expand Down Expand Up @@ -207,9 +215,11 @@ var _ = Describe("Drift", func() {
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed())

// deprovisioning won't delete the old node until the new node is ready
wg := ExpectMakeNewNodesReady(ctx, env.Client, 1, node)
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 1, node)

fakeClock.Step(10 * time.Minute)
go triggerVerifyAction()
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).ToNot(HaveOccurred())
wg.Wait()
Expand Down Expand Up @@ -299,9 +309,11 @@ var _ = Describe("Drift", func() {
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(node), node)).To(Succeed())

// deprovisioning won't delete the old node until the new node is ready
wg := ExpectMakeNewNodesReady(ctx, env.Client, 3, node)
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectMakeNewNodesReady(ctx, env.Client, &wg, 3, node)

fakeClock.Step(10 * time.Minute)
go triggerVerifyAction()
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).ToNot(HaveOccurred())
wg.Wait()
Expand Down Expand Up @@ -344,10 +356,13 @@ var _ = Describe("Drift", func() {
// inform cluster state about the nodes
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node1))
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(node2))

fakeClock.Step(10 * time.Minute)
go triggerVerifyAction()
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expect(err).ToNot(HaveOccurred())
wg.Wait()

// we don't need a new node, but we should evict everything off one of node2 which only has a single pod
Expect(cloudProvider.CreateCalls).To(HaveLen(0))
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/deprovisioning/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
)

// EmptyNodeConsolidation is the consolidation controller that performs multi-node consolidation of entirely empty nodes
Expand All @@ -36,8 +37,8 @@ type EmptyNodeConsolidation struct {
}

func NewEmptyNodeConsolidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client,
provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, reporter *Reporter) *EmptyNodeConsolidation {
return &EmptyNodeConsolidation{consolidation: makeConsolidation(clk, cluster, kubeClient, provisioner, cp, reporter)}
provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, recorder events.Recorder) *EmptyNodeConsolidation {
return &EmptyNodeConsolidation{consolidation: makeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder)}
}

// ComputeCommand generates a deprovisioning command given deprovisionable nodes
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/deprovisioning/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package events

import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -69,5 +70,6 @@ func UnconsolidatableReason(node *v1.Node, reason string) events.Event {
Reason: "Unconsolidatable",
Message: reason,
DedupeValues: []string{node.Name},
DedupeTimeout: time.Minute * 15,
}
}
Loading

0 comments on commit eae389d

Please sign in to comment.