Skip to content

Commit

Permalink
Update provisioning in testing across all files
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Feb 1, 2023
1 parent b33010a commit d7bddc6
Show file tree
Hide file tree
Showing 16 changed files with 5,210 additions and 5,087 deletions.
23 changes: 10 additions & 13 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"
Expand All @@ -47,6 +48,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/controllers/state/informer"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/scheme"
"github.com/aws/karpenter-core/pkg/test"
Expand All @@ -60,7 +62,6 @@ var deprovisioningController *deprovisioning.Controller
var provisioningController controller.Controller
var provisioner *provisioning.Provisioner
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var nodeStateController controller.Controller
var fakeClock *clock.FakeClock
var onDemandInstances []*cloudprovider.InstanceType
Expand All @@ -82,9 +83,8 @@ var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeStateController = informer.NewNodeController(env.Client, cluster)
recorder = test.NewEventRecorder()
provisioner = provisioning.NewProvisioner(ctx, env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
provisioningController = provisioning.NewController(env.Client, provisioner, recorder)
provisioner = provisioning.NewProvisioner(ctx, env.Client, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
provisioningController = provisioning.NewController(env.Client, provisioner, events.NewRecorder(&record.FakeRecorder{}))
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -122,13 +122,12 @@ var _ = BeforeEach(func() {
mostExpensiveInstance = onDemandInstances[len(onDemandInstances)-1]
mostExpensiveOffering = mostExpensiveInstance.Offerings[0]

recorder.Reset()
// ensure any waiters on our clock are allowed to proceed before resetting our clock time
for fakeClock.HasWaiters() {
fakeClock.Step(1 * time.Minute)
}
fakeClock.SetTime(time.Now())
deprovisioningController = deprovisioning.NewController(fakeClock, env.Client, provisioner, cloudProvider, recorder, cluster)
deprovisioningController = deprovisioning.NewController(fakeClock, env.Client, provisioner, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster)
// Reset Feature Flags to test defaults
ctx = settings.ToContext(ctx, test.Settings(test.SettingsOptions{DriftEnabled: true}))
})
Expand Down Expand Up @@ -1793,11 +1792,12 @@ var _ = Describe("Parallelization", func() {
}, time.Second*10).Should(Succeed())
wg.Wait()
// Add a new pending pod that should schedule while node is not yet deleted
pods := ExpectProvisionedNoBinding(ctx, env.Client, provisioningController, provisioner, test.UnschedulablePod())
pod = test.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, provisioner, pod)
nodes := &v1.NodeList{}
Expect(env.Client.List(ctx, nodes)).To(Succeed())
Expect(len(nodes.Items)).To(Equal(2))
Expect(pods[0].Spec.NodeName).NotTo(Equal(node.Name))
ExpectScheduled(ctx, env.Client, pod)
})
It("should not consolidate a node that is launched for pods on a deleting node", func() {
labels := map[string]string{
Expand Down Expand Up @@ -1838,7 +1838,7 @@ var _ = Describe("Parallelization", func() {
pods = append(pods, pod)
}
ExpectApplied(ctx, env.Client, rs, prov)
ExpectProvisionedNoBinding(ctx, env.Client, provisioningController, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)
ExpectProvisionedNoBinding(ctx, env.Client, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)

nodeList := &v1.NodeList{}
Expect(env.Client.List(ctx, nodeList)).To(Succeed())
Expand All @@ -1847,13 +1847,10 @@ var _ = Describe("Parallelization", func() {
// Update cluster state with new node
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(&nodeList.Items[0]))

// Reset the bindings so we can re-record bindings
recorder.ResetBindings()

// Mark the node for deletion and re-trigger reconciliation
oldNodeName := nodeList.Items[0].Name
cluster.MarkForDeletion(nodeList.Items[0].Name)
ExpectProvisionedNoBinding(ctx, env.Client, provisioningController, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)
ExpectProvisionedNoBinding(ctx, env.Client, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)

// Make sure that the cluster state is aware of the current node state
Expect(env.Client.List(ctx, nodeList)).To(Succeed())
Expand Down
48 changes: 46 additions & 2 deletions pkg/controllers/inflightchecks/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package inflightchecks_test
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -47,7 +48,7 @@ var inflightController controller.Controller
var env *test.Environment
var fakeClock *clock.FakeClock
var cp *fake.CloudProvider
var recorder *test.EventRecorder
var recorder *FakeEventRecorder

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -60,7 +61,7 @@ var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = settings.ToContext(ctx, test.Settings())
cp = &fake.CloudProvider{}
recorder = test.NewEventRecorder()
recorder = NewFakeEventRecorder()
inflightController = inflightchecks.NewController(fakeClock, env.Client, recorder, cp)
})

Expand Down Expand Up @@ -183,6 +184,49 @@ var _ = Describe("Controller", func() {
})
})

var _ events.Recorder = (*FakeEventRecorder)(nil)

// FakeEventRecorder is a mock event recorder that is used to facilitate testing.
type FakeEventRecorder struct {
mu sync.RWMutex
calls map[string]int
events []events.Event
}

func NewFakeEventRecorder() *FakeEventRecorder {
return &FakeEventRecorder{
calls: map[string]int{},
}
}

func (e *FakeEventRecorder) Publish(evt events.Event) {
e.mu.Lock()
defer e.mu.Unlock()
e.events = append(e.events, evt)
e.calls[evt.Reason]++
}

func (e *FakeEventRecorder) Calls(reason string) int {
e.mu.RLock()
defer e.mu.RUnlock()
return e.calls[reason]
}

func (e *FakeEventRecorder) Reset() {
e.mu.Lock()
defer e.mu.Unlock()
e.events = nil
e.calls = map[string]int{}
}

func (e *FakeEventRecorder) ForEachEvent(f func(evt events.Event)) {
e.mu.RLock()
defer e.mu.RUnlock()
for _, e := range e.events {
f(e)
}
}

func ExpectDetectedEvent(msg string) {
foundEvent := false
recorder.ForEachEvent(func(evt events.Event) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/machine/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
. "knative.dev/pkg/logging/testing"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/aws/karpenter-core/pkg/cloudprovider/fake"
"github.com/aws/karpenter-core/pkg/controllers/machine"
"github.com/aws/karpenter-core/pkg/controllers/machine/terminator"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/scheme"
. "github.com/aws/karpenter-core/pkg/test/expectations"
Expand Down Expand Up @@ -65,9 +67,8 @@ var _ = BeforeSuite(func() {
ctx = settings.ToContext(ctx, test.Settings())

cloudProvider = fake.NewCloudProvider()
recorder := test.NewEventRecorder()
terminator := terminator.NewTerminator(fakeClock, env.Client, cloudProvider, terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), recorder))
machineController = machine.NewController(fakeClock, env.Client, cloudProvider, terminator, recorder)
terminator := terminator.NewTerminator(fakeClock, env.Client, cloudProvider, terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{})))
machineController = machine.NewController(fakeClock, env.Client, cloudProvider, terminator, events.NewRecorder(&record.FakeRecorder{}))
})

var _ = AfterSuite(func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, po v1.Pod) {
}
}

//nolint:gocyclo
func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNodes []*state.Node, opts scheduler.SchedulerOptions) (*scheduler.Scheduler, error) {
// Build node templates
var machines []*scheduler.MachineTemplate
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (n *ExistingNode) Add(ctx context.Context, pod *v1.Pod) error {

nodeRequirements := scheduling.NewRequirements(n.requirements.Values()...)
podRequirements := scheduling.NewPodRequirements(pod)
// Check Machine Affinity Requirements
// Check Node Affinity Requirements
if err = nodeRequirements.Compatible(podRequirements); err != nil {
return err
}
Expand Down
Loading

0 comments on commit d7bddc6

Please sign in to comment.