Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix missed return statement in cluster.State Capacity #156

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ func (s *Scheduler) calculateExistingMachines(stateNodes []*state.Node, daemonSe
// We don't use the status field and instead recompute the remaining resources to ensure we have a consistent view
// of the cluster during scheduling. Depending on how node creation falls out, this will also work for cases where
// we don't create Node resources.
s.remainingResources[node.Node.Name] = resources.Subtract(s.remainingResources[node.Node.Name], node.Capacity())
if _, ok := s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]]; ok {
jonathan-innis marked this conversation as resolved.
Show resolved Hide resolved
s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]] = resources.Subtract(s.remainingResources[node.Node.Labels[v1alpha5.ProvisionerNameLabelKey]], node.Capacity())
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,30 @@ var _ = Describe("Provisioning", func() {
// only available instance type has 2 GPUs which would exceed the limit
ExpectNotScheduled(ctx, env.Client, pod)
})
It("should not schedule to a provisioner after a scheduling round if limits would be exceeded", func() {
ExpectApplied(ctx, env.Client, test.Provisioner(test.ProvisionerOptions{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2")},
}))
pod := ExpectProvisioned(ctx, env.Client, cluster, recorder, provisioningController, prov, test.UnschedulablePod(
test.PodOptions{ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
// requires a 2 CPU node, but leaves room for overhead
v1.ResourceCPU: resource.MustParse("1.75"),
},
}}))[0]
// A 2 CPU node can be launched
ExpectScheduled(ctx, env.Client, pod)

// This pod requests over the existing limit (would add to 3.5 CPUs) so this should fail
pod = ExpectProvisioned(ctx, env.Client, cluster, recorder, provisioningController, prov, test.UnschedulablePod(
test.PodOptions{ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
// requires a 2 CPU node, but leaves room for overhead
v1.ResourceCPU: resource.MustParse("1.75"),
},
}}))[0]
ExpectNotScheduled(ctx, env.Client, pod)
})
})
Context("Daemonsets and Node Overhead", func() {
It("should account for overhead", func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/state/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (in *Node) Capacity() v1.ResourceList {
ret[resourceName] = quantity
}
}
return ret
}
return in.Node.Status.Capacity
}
Expand Down
194 changes: 103 additions & 91 deletions pkg/controllers/state/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,49 @@ var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
})

var _ = Describe("In-flight Nodes", func() {
It("should consider the node capacity/allocatable based on the instance type", func() {
instanceType := cloudProvider.InstanceTypes[0]
node := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
v1.LabelInstanceTypeStable: instanceType.Name,
}},
})
ExpectApplied(ctx, env.Client, node)
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
ExpectResources(instanceType.Allocatable(), ExpectStateNodeExists(node).Allocatable())
ExpectResources(instanceType.Capacity, ExpectStateNodeExists(node).Capacity())
})
It("should consider the node capacity/allocatable as a combination of instance type and current node", func() {
instanceType := cloudProvider.InstanceTypes[0]
node := test.Node(test.NodeOptions{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
v1.LabelInstanceTypeStable: instanceType.Name,
}},
Allocatable: v1.ResourceList{
v1.ResourceMemory: resource.MustParse("100Mi"),
},
Capacity: v1.ResourceList{
v1.ResourceEphemeralStorage: resource.MustParse("100Gi"),
},
})
ExpectApplied(ctx, env.Client, node)
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
ExpectResources(v1.ResourceList{
v1.ResourceMemory: resource.MustParse("100Mi"), // pulled from the node's real allocatable
v1.ResourceCPU: *instanceType.Capacity.Cpu(),
v1.ResourceEphemeralStorage: *instanceType.Capacity.StorageEphemeral(),
}, ExpectStateNodeExists(node).Allocatable())
ExpectResources(v1.ResourceList{
v1.ResourceMemory: *instanceType.Capacity.Memory(),
v1.ResourceCPU: *instanceType.Capacity.Cpu(),
v1.ResourceEphemeralStorage: resource.MustParse("100Gi"), // pulled from the node's real capacity
}, ExpectStateNodeExists(node).Capacity())
})
})

var _ = Describe("Node Resource Level", func() {
It("should not count pods not bound to nodes", func() {
pod1 := test.UnschedulablePod(test.PodOptions{
Expand Down Expand Up @@ -118,7 +161,7 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod2))

// two pods, but neither is bound to the node so the node's CPU requests should be zero
ExpectNodeResourceRequest(node, v1.ResourceCPU, "0.0")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.0")}, ExpectStateNodeExists(node).PodRequests())
})
It("should count new pods bound to nodes", func() {
pod1 := test.UnschedulablePod(test.PodOptions{
Expand Down Expand Up @@ -152,11 +195,11 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod1))
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod2))

ExpectNodeResourceRequest(node, v1.ResourceCPU, "1.5")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("1.5")}, ExpectStateNodeExists(node).PodRequests())

ExpectManualBinding(ctx, env.Client, pod2, node)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod2))
ExpectNodeResourceRequest(node, v1.ResourceCPU, "3.5")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("3.5")}, ExpectStateNodeExists(node).PodRequests())
})
It("should count existing pods bound to nodes", func() {
pod1 := test.UnschedulablePod(test.PodOptions{
Expand Down Expand Up @@ -188,7 +231,7 @@ var _ = Describe("Node Resource Level", func() {

// that we just noticed
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
ExpectNodeResourceRequest(node, v1.ResourceCPU, "3.5")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("3.5")}, ExpectStateNodeExists(node).PodRequests())
})
It("should subtract requests if the pod is deleted", func() {
pod1 := test.UnschedulablePod(test.PodOptions{
Expand Down Expand Up @@ -223,16 +266,16 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod1))
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod2))

ExpectNodeResourceRequest(node, v1.ResourceCPU, "3.5")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("3.5")}, ExpectStateNodeExists(node).PodRequests())

// delete the pods and the CPU usage should go down
ExpectDeleted(ctx, env.Client, pod2)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod2))
ExpectNodeResourceRequest(node, v1.ResourceCPU, "1.5")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("1.5")}, ExpectStateNodeExists(node).PodRequests())

ExpectDeleted(ctx, env.Client, pod1)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod1))
ExpectNodeResourceRequest(node, v1.ResourceCPU, "0")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("0")}, ExpectStateNodeExists(node).PodRequests())
})
It("should not add requests if the pod is terminal", func() {
pod1 := test.UnschedulablePod(test.PodOptions{
Expand Down Expand Up @@ -270,7 +313,7 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod1))
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod2))

ExpectNodeResourceRequest(node, v1.ResourceCPU, "0")
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("0")}, ExpectStateNodeExists(node).PodRequests())
})
It("should stop tracking nodes that are deleted", func() {
pod1 := test.UnschedulablePod(test.PodOptions{
Expand All @@ -297,10 +340,8 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod1))

cluster.ForEachNode(func(n *state.Node) bool {
available := n.Available()
requested := n.PodRequests()
Expect(available.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 2.5))
Expect(requested.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 1.5))
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("2.5")}, n.Available())
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("1.5")}, n.PodRequests())
return true
})

Expand Down Expand Up @@ -337,10 +378,8 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod1))

cluster.ForEachNode(func(n *state.Node) bool {
available := n.Available()
requested := n.PodRequests()
Expect(available.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 2.5))
Expect(requested.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 1.5))
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("2.5")}, n.Available())
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("1.5")}, n.PodRequests())
return true
})

Expand Down Expand Up @@ -373,15 +412,13 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod2))

cluster.ForEachNode(func(n *state.Node) bool {
available := n.Available()
requested := n.PodRequests()
if n.Node.Name == node1.Name {
// not on node1 any longer, so it should be fully free
Expect(available.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 4))
Expect(requested.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 0))
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("4")}, n.Available())
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("0")}, n.PodRequests())
} else {
Expect(available.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 3.0))
Expect(requested.Cpu().AsApproximateFloat64()).To(BeNumerically("~", 5.0))
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("3")}, n.Available())
ExpectResources(v1.ResourceList{v1.ResourceCPU: resource.MustParse("5")}, n.PodRequests())
}
return true
})
Expand All @@ -408,8 +445,11 @@ var _ = Describe("Node Resource Level", func() {
v1.ResourcePods: resource.MustParse("500"),
}})
ExpectApplied(ctx, env.Client, node)
ExpectNodeResourceRequest(node, v1.ResourceCPU, "0.0")
ExpectNodeResourceRequest(node, v1.ResourcePods, "0")
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0"),
v1.ResourcePods: resource.MustParse("0"),
}, ExpectStateNodeExists(node).PodRequests())
ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))

sum := 0.0
Expand All @@ -425,8 +465,10 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod))
}
sum += pod.Spec.Containers[0].Resources.Requests.Cpu().AsApproximateFloat64()
ExpectNodeResourceRequest(node, v1.ResourceCPU, fmt.Sprintf("%1.1f", sum))
ExpectNodeResourceRequest(node, v1.ResourcePods, fmt.Sprintf("%d", podCount))
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%1.1f", sum)),
v1.ResourcePods: resource.MustParse(fmt.Sprintf("%d", podCount)),
}, ExpectStateNodeExists(node).PodRequests())
}

for _, pod := range pods {
Expand All @@ -438,11 +480,15 @@ var _ = Describe("Node Resource Level", func() {
}
sum -= pod.Spec.Containers[0].Resources.Requests.Cpu().AsApproximateFloat64()
podCount--
ExpectNodeResourceRequest(node, v1.ResourceCPU, fmt.Sprintf("%1.1f", sum))
ExpectNodeResourceRequest(node, v1.ResourcePods, fmt.Sprintf("%d", podCount))
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%1.1f", sum)),
v1.ResourcePods: resource.MustParse(fmt.Sprintf("%d", podCount)),
}, ExpectStateNodeExists(node).PodRequests())
}
ExpectNodeResourceRequest(node, v1.ResourceCPU, "0.0")
ExpectNodeResourceRequest(node, v1.ResourcePods, "0")
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0"),
v1.ResourcePods: resource.MustParse("0"),
}, ExpectStateNodeExists(node).PodRequests())
})
It("should track daemonset requested resources separately", func() {
ds := test.DaemonSet(
Expand Down Expand Up @@ -495,21 +541,29 @@ var _ = Describe("Node Resource Level", func() {
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(pod1))

// daemonset pod isn't bound yet
ExpectNodeDaemonSetRequested(node, v1.ResourceCPU, "0")
ExpectNodeDaemonSetRequested(node, v1.ResourceMemory, "0")
ExpectNodeResourceRequest(node, v1.ResourceCPU, "1.5")
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0"),
v1.ResourceMemory: resource.MustParse("0"),
}, ExpectStateNodeExists(node).DaemonSetRequests())
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1.5"),
}, ExpectStateNodeExists(node).PodRequests())

ExpectApplied(ctx, env.Client, dsPod)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(dsPod))
ExpectManualBinding(ctx, env.Client, dsPod, node)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(dsPod))

// just the DS request portion
ExpectNodeDaemonSetRequested(node, v1.ResourceCPU, "1")
ExpectNodeDaemonSetRequested(node, v1.ResourceMemory, "2Gi")
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("2Gi"),
}, ExpectStateNodeExists(node).DaemonSetRequests())
// total request
ExpectNodeResourceRequest(node, v1.ResourceCPU, "2.5")
ExpectNodeResourceRequest(node, v1.ResourceMemory, "2Gi")
ExpectResources(v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2.5"),
v1.ResourceMemory: resource.MustParse("2Gi"),
}, ExpectStateNodeExists(node).PodRequests())
})
It("should mark node for deletion when node is deleted", func() {
node := test.Node(test.NodeOptions{
Expand All @@ -531,7 +585,7 @@ var _ = Describe("Node Resource Level", func() {

ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node))
ExpectNodeExists(ctx, env.Client, node.Name)
ExpectNodeDeletionMarked(node)
Expect(ExpectStateNodeExists(node).MarkedForDeletion()).To(BeTrue())
})
It("should nominate the node until the nomination time passes", func() {
node := test.Node(test.NodeOptions{
Expand All @@ -552,11 +606,11 @@ var _ = Describe("Node Resource Level", func() {
cluster.NominateNodeForPod(ctx, node.Name)

// Expect that the node is now nominated
ExpectNodeNominated(node)
Expect(ExpectStateNodeExists(node).Nominated()).To(BeTrue())
time.Sleep(time.Second * 5) // nomination window is 10s so it should still be nominated
ExpectNodeNominated(node)
Expect(ExpectStateNodeExists(node).Nominated()).To(BeTrue())
time.Sleep(time.Second * 6) // past 10s, node should no longer be nominated
ExpectNodeNotNominated(node)
Expect(ExpectStateNodeExists(node).Nominated()).To(BeFalse())
})
})

Expand Down Expand Up @@ -748,61 +802,19 @@ var _ = Describe("Provisioner Spec Updates", func() {
})
})

func ExpectNodeResourceRequest(node *v1.Node, resourceName v1.ResourceName, amount string) {
func ExpectStateNodeExistsWithOffset(offset int, node *v1.Node) *state.Node {
var ret *state.Node
cluster.ForEachNode(func(n *state.Node) bool {
if n.Node.Name != node.Name {
return true
}
nodeRequest := n.PodRequests()[resourceName]
expected := resource.MustParse(amount)
ExpectWithOffset(1, nodeRequest.AsApproximateFloat64()).To(BeNumerically("~", expected.AsApproximateFloat64(), 0.001))
ret = n.DeepCopy()
return false
})
ExpectWithOffset(offset+1, ret).ToNot(BeNil())
return ret
}
func ExpectNodeDaemonSetRequested(node *v1.Node, resourceName v1.ResourceName, amount string) {
cluster.ForEachNode(func(n *state.Node) bool {
if n.Node.Name != node.Name {
return true
}
dsReq := n.DaemonSetRequests()[resourceName]
expected := resource.MustParse(amount)
Expect(dsReq.AsApproximateFloat64()).To(BeNumerically("~", expected.AsApproximateFloat64(), 0.001))
return false
})
}
func ExpectNodeDeletionMarked(node *v1.Node) {
found := false
cluster.ForEachNode(func(n *state.Node) bool {
if n.Node.Name != node.Name {
return true
}
found = true
Expect(n.MarkedForDeletion()).To(BeTrue())
return false
})
Expect(found).To(BeTrue())
}
func ExpectNodeNominated(node *v1.Node) {
found := false
cluster.ForEachNode(func(n *state.Node) bool {
if n.Node.Name != node.Name {
return true
}
found = true
Expect(n.Nominated()).To(BeTrue())
return false
})
Expect(found).To(BeTrue())
}
func ExpectNodeNotNominated(node *v1.Node) {
found := false
cluster.ForEachNode(func(n *state.Node) bool {
if n.Node.Name != node.Name {
return true
}
found = true
Expect(n.Nominated()).To(BeFalse())
return false
})
Expect(found).To(BeTrue())

func ExpectStateNodeExists(node *v1.Node) *state.Node {
return ExpectStateNodeExistsWithOffset(1, node)
}
8 changes: 8 additions & 0 deletions pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,14 @@ func ExpectSkew(ctx context.Context, c client.Client, namespace string, constrai
return ExpectWithOffset(1, skew)
}

// ExpectResources expects all the resources in expected to exist in real with the same values
func ExpectResources(expected, real v1.ResourceList) {
for k, v := range expected {
realV := real[k]
ExpectWithOffset(1, v.Value()).To(BeNumerically("~", realV.Value()))
}
}

// ExpectPanic is a function that should be deferred at the beginning of a test like "defer ExpectPanic()"
// It asserts that the test should panic
func ExpectPanic() {
Expand Down