Skip to content

Commit

Permalink
feat: added expiration as node annotation for monitorability (#266)
Browse files Browse the repository at this point in the history
* feat: added expiration as node annotation for monitorability

* add suite test

* fix tests

* reorder annotation logic
  • Loading branch information
njtran authored Apr 4, 2023
1 parent 137105d commit 959b537
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 44 deletions.
1 change: 1 addition & 0 deletions pkg/apis/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (

// Karpenter specific annotation values
VoluntaryDisruptionDriftedAnnotationValue = "drifted"
VoluntaryDisruptionExpiredAnnotationValue = "expired"
)

// Karpenter specific finalizers
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (c *CloudProvider) Reset() {
c.AllowedCreateCalls = math.MaxInt
c.NextCreateErr = nil
c.DeleteCalls = []*v1alpha5.Machine{}
c.Drifted = false
}

func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (*v1alpha5.Machine, error) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/controllers/deprovisioning/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var _ = Describe("Drift", func() {
prov = test.Provisioner()
machine, node = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
Expand Down Expand Up @@ -79,7 +82,7 @@ var _ = Describe("Drift", func() {
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
})
It("should ignore nodes with the drift label, but not the drifted value", func() {
It("should ignore nodes with the disrupted annotation key, but not the drifted value", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: "wrong-value",
})
Expand All @@ -99,7 +102,8 @@ var _ = Describe("Drift", func() {
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
})
It("should ignore nodes without the drift label", func() {
It("should ignore nodes without the disrupted annotation key", func() {
delete(node.Annotations, v1alpha5.VoluntaryDisruptionAnnotationKey)
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
Expand Down
22 changes: 9 additions & 13 deletions pkg/controllers/deprovisioning/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"k8s.io/utils/clock"

v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/utils/node"
)

// Expiration is a subreconciler that deletes empty nodes.
Expand All @@ -57,7 +57,12 @@ func NewExpiration(clk clock.Clock, kubeClient client.Client, cluster *state.Clu

// ShouldDeprovision is a predicate used to filter deprovisionable nodes
func (e *Expiration) ShouldDeprovision(ctx context.Context, c *Candidate) bool {
return e.clock.Now().After(getExpirationTime(c.Node, c.provisioner))
// Filter out nodes without the TTL defined or expired.
if c.provisioner == nil || c.provisioner.Spec.TTLSecondsUntilExpired == nil {
return false
}

return c.Node.Annotations[v1alpha5.VoluntaryDisruptionAnnotationKey] == v1alpha5.VoluntaryDisruptionExpiredAnnotationValue
}

// SortCandidates orders expired nodes by when they've expired
Expand All @@ -67,7 +72,7 @@ func (e *Expiration) filterAndSortCandidates(ctx context.Context, nodes []*Candi
return nil, fmt.Errorf("filtering candidates, %w", err)
}
sort.Slice(candidates, func(i int, j int) bool {
return getExpirationTime(candidates[i].Node, candidates[i].provisioner).Before(getExpirationTime(candidates[j].Node, candidates[j].provisioner))
return node.GetExpirationTime(candidates[i].Node, candidates[i].provisioner).Before(node.GetExpirationTime(candidates[j].Node, candidates[j].provisioner))
})
return candidates, nil
}
Expand Down Expand Up @@ -95,7 +100,7 @@ func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (C
}

logging.FromContext(ctx).With("ttl", time.Duration(ptr.Int64Value(candidates[0].provisioner.Spec.TTLSecondsUntilExpired))*time.Second).
With("delay", time.Since(getExpirationTime(candidates[0].Node, candidates[0].provisioner))).Infof("triggering termination for expired node after TTL")
With("delay", time.Since(node.GetExpirationTime(candidates[0].Node, candidates[0].provisioner))).Infof("triggering termination for expired node after TTL")
return Command{
candidates: []*Candidate{candidate},
action: actionReplace,
Expand All @@ -109,12 +114,3 @@ func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (C
func (e *Expiration) String() string {
return metrics.ExpirationReason
}

func getExpirationTime(node *v1.Node, provisioner *v1alpha5.Provisioner) time.Time {
if provisioner == nil || provisioner.Spec.TTLSecondsUntilExpired == nil {
// If not defined, return some much larger time.
return time.Date(5000, 0, 0, 0, 0, 0, 0, time.UTC)
}
expirationTTL := time.Duration(ptr.Int64Value(provisioner.Spec.TTLSecondsUntilExpired)) * time.Second
return node.CreationTimestamp.Add(expirationTTL)
}
52 changes: 30 additions & 22 deletions pkg/controllers/deprovisioning/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package deprovisioning_test

import (
"sync"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -41,9 +41,14 @@ var _ = Describe("Expiration", func() {
var node *v1.Node

BeforeEach(func() {
prov = test.Provisioner()
prov = test.Provisioner(test.ProvisionerOptions{
TTLSecondsUntilExpired: ptr.Int64(30),
})
machine, node = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionExpiredAnnotationValue,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
Expand All @@ -60,28 +65,39 @@ var _ = Describe("Expiration", func() {
},
})
})
It("should ignore nodes without TTLSecondsUntilExpired", func() {
It("should ignore nodes with the disruption annotation but different value", func() {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: "wrong-value",
})
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

fakeClock.Step(10 * time.Minute)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
})
It("can delete expired nodes", func() {
prov.Spec.TTLSecondsUntilExpired = ptr.Int64(60)
It("should ignore nodes without the disruption annotation", func() {
delete(node.Annotations, v1alpha5.VoluntaryDisruptionAnnotationKey)
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

// step forward past the expiration time
fakeClock.Step(10 * time.Minute)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Expect to not create or delete more machines
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, node)
})
It("can delete expired nodes", func() {
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
Expand All @@ -98,6 +114,9 @@ var _ = Describe("Expiration", func() {
})
machineToExpire, nodeToExpire := test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionExpiredAnnotationValue,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: expireProv.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
Expand All @@ -110,7 +129,6 @@ var _ = Describe("Expiration", func() {
Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("32")},
},
})
prov.Spec.TTLSecondsUntilExpired = ptr.Int64(500)
machineNotExpire, nodeNotExpire := test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -131,13 +149,7 @@ var _ = Describe("Expiration", func() {
// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{nodeToExpire, nodeNotExpire}, []*v1alpha5.Machine{machineToExpire, machineNotExpire})

// step forward past the expiration time
fakeClock.Step(10 * time.Minute)

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})
wg.Wait()

// Expect that one of the expired machines is gone
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
Expand Down Expand Up @@ -174,8 +186,6 @@ var _ = Describe("Expiration", func() {
// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

fakeClock.Step(10 * time.Minute)

// deprovisioning won't delete the old node until the new node is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
Expand All @@ -191,7 +201,6 @@ var _ = Describe("Expiration", func() {
})
It("should uncordon nodes when expiration replacement fails", func() {
cloudProvider.AllowedCreateCalls = 0 // fail the replacement and expect it to uncordon
prov.Spec.TTLSecondsUntilExpired = ptr.Int64(30)

labels := map[string]string{
"app": "test",
Expand Down Expand Up @@ -223,7 +232,6 @@ var _ = Describe("Expiration", func() {
// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

fakeClock.Step(10 * time.Minute)
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
_, err := deprovisioningController.Reconcile(ctx, reconcile.Request{})
Expand Down Expand Up @@ -290,6 +298,9 @@ var _ = Describe("Expiration", func() {
})
machine, node := test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionExpiredAnnotationValue,
},
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: currentInstance.Name,
Expand All @@ -302,7 +313,6 @@ var _ = Describe("Expiration", func() {
Allocatable: map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: resource.MustParse("8")},
},
})
prov.Spec.TTLSecondsUntilExpired = ptr.Int64(200)
ExpectApplied(ctx, env.Client, rs, machine, node, prov, pods[0], pods[1], pods[2])

// bind pods to node
Expand All @@ -313,8 +323,6 @@ var _ = Describe("Expiration", func() {
// inform cluster state about nodes and machines
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

fakeClock.Step(10 * time.Minute)

// deprovisioning won't delete the old machine until the new machine is ready
var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller"

Expand All @@ -55,6 +54,7 @@ type Controller struct {
emptiness *Emptiness
finalizer *Finalizer
drift *Drift
expiration *Expiration
}

// NewController constructs a nodeController instance
Expand All @@ -65,6 +65,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou
initialization: &Initialization{kubeClient: kubeClient, cloudProvider: cloudProvider},
emptiness: &Emptiness{kubeClient: kubeClient, clock: clk, cluster: cluster},
drift: &Drift{kubeClient: kubeClient, cloudProvider: cloudProvider},
expiration: &Expiration{clock: clk},
})
}

Expand Down Expand Up @@ -95,9 +96,8 @@ func (c *Controller) Reconcile(ctx context.Context, node *v1.Node) (reconcile.Re
c.initialization,
c.emptiness,
c.finalizer,
}
if settings.FromContext(ctx).DriftEnabled {
reconcilers = append(reconcilers, c.drift)
c.expiration,
c.drift,
}
for _, reconciler := range reconcilers {
res, err := reconciler.Reconcile(ctx, provisioner, node)
Expand Down
27 changes: 24 additions & 3 deletions pkg/controllers/node/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/samber/lo"

"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/utils/machine"
Expand All @@ -36,20 +38,39 @@ type Drift struct {
}

func (d *Drift) Reconcile(ctx context.Context, provisioner *v1alpha5.Provisioner, node *v1.Node) (reconcile.Result, error) {
if _, ok := node.Annotations[v1alpha5.VoluntaryDisruptionAnnotationKey]; ok {
// If the node is marked as voluntarily disrupted by another controller, do nothing.
val, hasAnnotation := node.Annotations[v1alpha5.VoluntaryDisruptionAnnotationKey]
if hasAnnotation && val != v1alpha5.VoluntaryDisruptionDriftedAnnotationValue {
return reconcile.Result{}, nil
}

// From here there are three scenarios to handle:
// 1. If drift is not enabled but the node is drifted, remove the annotation
// so another disruption controller can annotate the node.
if !settings.FromContext(ctx).DriftEnabled {
if val == v1alpha5.VoluntaryDisruptionDriftedAnnotationValue {
delete(node.Annotations, v1alpha5.VoluntaryDisruptionAnnotationKey)
logging.FromContext(ctx).Infof("removing drift annotation from node as drift has been disabled")
}
return reconcile.Result{}, nil
}

// TODO: Add Provisioner Drift
drifted, err := d.cloudProvider.IsMachineDrifted(ctx, machine.NewFromNode(node))
if err != nil {
return reconcile.Result{}, fmt.Errorf("getting drift for node, %w", err)
}
if drifted {
// 2. Otherwise, if the node isn't drifted, but has the annotation, remove it.
if !drifted && hasAnnotation {
delete(node.Annotations, v1alpha5.VoluntaryDisruptionAnnotationKey)
logging.FromContext(ctx).Infof("removing drift annotation from node")
// 3. Finally, if the node is drifted, but doesn't have the annotation, add it.
} else if drifted && !hasAnnotation {
node.Annotations = lo.Assign(node.Annotations, map[string]string{
v1alpha5.VoluntaryDisruptionAnnotationKey: v1alpha5.VoluntaryDisruptionDriftedAnnotationValue,
})
logging.FromContext(ctx).Infof("annotating node as drifted")
}

// Requeue after 5 minutes for the cache TTL
return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
}
Loading

0 comments on commit 959b537

Please sign in to comment.