Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Parallelize creating cluster snapshot" #7695

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ type AutoscalingOptions struct {
ForceDeleteLongUnregisteredNodes bool
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
DynamicResourceAllocationEnabled bool
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
ClusterSnapshotParallelism int
}

// KubeClientOptions specify options for kube client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewBasicSnapshotStore())
},
"delta": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore(16))
return testsnapshot.NewCustomTestSnapshotOrDie(b, store.NewDeltaSnapshotStore())
},
}
for snapshotName, snapshotFactory := range snapshots {
Expand Down
4 changes: 1 addition & 3 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ var (
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -464,7 +463,6 @@ func createAutoscalingOptions() config.AutoscalingOptions {
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
}
}

Expand Down Expand Up @@ -507,7 +505,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism)
var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore()
if autoscalingOptions.DynamicResourceAllocationEnabled {
// TODO(DRA): Remove this once DeltaSnapshotStore is integrated with DRA.
klog.Warningf("Using BasicSnapshotStore instead of DeltaSnapshotStore because DRA is enabled. Autoscaling performance/scalability might be decreased.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore(16))
clusterSnapshot := testsnapshot.NewCustomTestSnapshotOrDie(t, store.NewDeltaSnapshotStore())
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
ctx := context.AutoscalingContext{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){
if err != nil {
return nil, err
}
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(16), fwHandle, true), nil
return NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, true), nil
},
}

Expand Down
104 changes: 15 additions & 89 deletions cluster-autoscaler/simulator/clustersnapshot/store/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ limitations under the License.
package store

import (
"context"
"fmt"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
Expand All @@ -46,8 +44,7 @@ import (
// pod affinity - causes scheduler framework to list pods with non-empty selector,
// so basic caching doesn't help.
type DeltaSnapshotStore struct {
data *internalDeltaSnapshotData
parallelism int
data *internalDeltaSnapshotData
}

type deltaSnapshotStoreNodeLister DeltaSnapshotStore
Expand Down Expand Up @@ -140,14 +137,10 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework
return nodeInfoList
}

func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) (*schedulerframework.NodeInfo, error) {
func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error {
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node)
err := data.addNodeInfo(nodeInfo)
if err != nil {
return nil, err
}
return nodeInfo, nil
return data.addNodeInfo(nodeInfo)
}

func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
Expand Down Expand Up @@ -248,24 +241,6 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return nil
}

func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error {
ni.AddPod(pod)

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error {
for _, pod := range pods {
ni.AddPod(pod)
}

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is revert the best option? Looks like this data.clearCaches() is redundant when setting cluster state and its removal will fix the race.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can quickly fix it then we can skip the revert I think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in general it is safer to revert first, and submit fixed version later, so that tests are not broken at HEAD.

return nil
}

func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
// This always clones node info, even if the pod is actually missing.
// Not sure if we mind, since removing non-existent pod
Expand Down Expand Up @@ -428,10 +403,8 @@ func (snapshot *DeltaSnapshotStore) DeviceClasses() schedulerframework.DeviceCla
}

// NewDeltaSnapshotStore creates instances of DeltaSnapshotStore.
func NewDeltaSnapshotStore(parallelism int) *DeltaSnapshotStore {
snapshot := &DeltaSnapshotStore{
parallelism: parallelism,
}
func NewDeltaSnapshotStore() *DeltaSnapshotStore {
snapshot := &DeltaSnapshotStore{}
snapshot.clear()
return snapshot
}
Expand All @@ -444,7 +417,7 @@ func (snapshot *DeltaSnapshotStore) DraSnapshot() drasnapshot.Snapshot {

// AddSchedulerNodeInfo adds a NodeInfo.
func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
if _, err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
if err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
return err
}
for _, podInfo := range nodeInfo.Pods {
Expand All @@ -455,71 +428,24 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram
return nil
}

// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
for _, pod := range scheduledPods {
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil {
return err
}
}
}
return nil
}

// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
for _, pod := range scheduledPods {
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
if !ok {
continue
}
podsForNode[nodeIdx] = append(podsForNode[nodeIdx], pod)
}

ctx := context.Background()
ctx, cancel := context.WithCancelCause(ctx)

workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) {
err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx])
if err != nil {
cancel(err)
}
})

return context.Cause(ctx)
}

// SetClusterState sets the cluster state.
func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod, draSnapshot drasnapshot.Snapshot) error {
snapshot.clear()

nodeNameToIdx := make(map[string]int, len(nodes))
nodeInfos := make([]*schedulerframework.NodeInfo, len(nodes))
for i, node := range nodes {
nodeInfo, err := snapshot.data.addNode(node)
if err != nil {
knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.data.addNode(node); err != nil {
return err
}
nodeNameToIdx[node.Name] = i
nodeInfos[i] = nodeInfo
knownNodes[node.Name] = true
}

if snapshot.parallelism > 1 {
err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
} else {
// TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1
// after making sure the implementation is always correct in CA 1.33.
err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}

// TODO(DRA): Save DRA snapshot.
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
for _, tc := range testCases {
b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount + 1000)
deltaStore := NewDeltaSnapshotStore(16)
deltaStore := NewDeltaSnapshotStore()
if err := deltaStore.SetClusterState(nodes[:tc.nodeCount], nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
Expand All @@ -70,7 +70,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
for _, tc := range testCases {
b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) {
nodes := clustersnapshot.CreateTestNodes(tc.nodeCount)
deltaStore := NewDeltaSnapshotStore(16)
deltaStore := NewDeltaSnapshotStore()
if err := deltaStore.SetClusterState(nodes, nil, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}
Expand Down
Loading