From 892f35eb8a0041969039e56b64a0467f17e6809c Mon Sep 17 00:00:00 2001 From: Jeev B Date: Tue, 10 Aug 2021 17:18:11 -0700 Subject: [PATCH] Fix resource overrides for sidecar/pod tasks (#194) Signed-off-by: Jeev B --- .../flytek8s/container_helper.go | 17 ++++ .../flytek8s/container_helper_test.go | 81 +++++++++++++++++++ go/tasks/plugins/k8s/sidecar/sidecar.go | 8 +- go/tasks/plugins/k8s/sidecar/sidecar_test.go | 39 ++++++++- 4 files changed, 141 insertions(+), 4 deletions(-) diff --git a/go/tasks/pluginmachinery/flytek8s/container_helper.go b/go/tasks/pluginmachinery/flytek8s/container_helper.go index c829789bf..28e2c4b47 100755 --- a/go/tasks/pluginmachinery/flytek8s/container_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/container_helper.go @@ -22,6 +22,23 @@ const resourceGPU = "gpu" // Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go const ResourceNvidiaGPU = "nvidia.com/gpu" +func MergeResources(in v1.ResourceRequirements, out *v1.ResourceRequirements) { + if out.Limits == nil { + out.Limits = in.Limits + } else if in.Limits != nil { + for key, val := range in.Limits { + out.Limits[key] = val + } + } + if out.Requests == nil { + out.Requests = in.Requests + } else if in.Requests != nil { + for key, val := range in.Requests { + out.Requests[key] = val + } + } +} + func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements { // set memory and cpu to default if not provided by user. if len(resources.Requests) == 0 { diff --git a/go/tasks/pluginmachinery/flytek8s/container_helper_test.go b/go/tasks/pluginmachinery/flytek8s/container_helper_test.go index 36c1c0b2a..6f57119ba 100755 --- a/go/tasks/pluginmachinery/flytek8s/container_helper_test.go +++ b/go/tasks/pluginmachinery/flytek8s/container_helper_test.go @@ -156,3 +156,84 @@ func TestApplyResourceOverrides_OverrideGpu(t *testing.T) { }) assert.EqualValues(t, gpuRequest, overrides.Limits[ResourceNvidiaGPU]) } + +func TestMergeResources_EmptyIn(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + expectedResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + Limits: v1.ResourceList{ + v1.ResourceStorage: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + } + outResources := expectedResources.DeepCopy() + MergeResources(v1.ResourceRequirements{}, outResources) + assert.EqualValues(t, *outResources, expectedResources) +} + +func TestMergeResources_EmptyOut(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + expectedResources := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + Limits: v1.ResourceList{ + v1.ResourceStorage: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + }, + } + outResources := v1.ResourceRequirements{} + MergeResources(expectedResources, &outResources) + assert.EqualValues(t, outResources, expectedResources) +} + +func TestMergeResources_PartialRequirements(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + resourceList := v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + } + inResources := v1.ResourceRequirements{Requests: resourceList} + outResources := v1.ResourceRequirements{Limits: resourceList} + MergeResources(inResources, &outResources) + assert.EqualValues(t, outResources, v1.ResourceRequirements{ + Requests: resourceList, + Limits: resourceList, + }) +} + +func TestMergeResources_PartialResourceKeys(t *testing.T) { + requestedResourceQuantity := resource.MustParse("1") + resourceList1 := v1.ResourceList{ + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + } + resourceList2 := v1.ResourceList{v1.ResourceCPU: requestedResourceQuantity} + expectedResourceList := v1.ResourceList{ + v1.ResourceCPU: requestedResourceQuantity, + v1.ResourceMemory: requestedResourceQuantity, + v1.ResourceEphemeralStorage: requestedResourceQuantity, + } + inResources := v1.ResourceRequirements{ + Requests: resourceList1, + Limits: resourceList2, + } + outResources := v1.ResourceRequirements{ + Requests: resourceList2, + Limits: resourceList1, + } + MergeResources(inResources, &outResources) + assert.EqualValues(t, outResources, v1.ResourceRequirements{ + Requests: expectedResourceList, + Limits: expectedResourceList, + }) +} diff --git a/go/tasks/plugins/k8s/sidecar/sidecar.go b/go/tasks/plugins/k8s/sidecar/sidecar.go index c33e814aa..2eb39f8ce 100755 --- a/go/tasks/plugins/k8s/sidecar/sidecar.go +++ b/go/tasks/plugins/k8s/sidecar/sidecar.go @@ -39,6 +39,11 @@ func validateAndFinalizePod( for index, container := range pod.Spec.Containers { if container.Name == primaryContainerName { hasPrimaryContainer = true + container.Resources = *flytek8s.ApplyResourceOverrides(ctx, container.Resources) + if taskCtx.TaskExecutionMetadata().GetOverrides() != nil && taskCtx.TaskExecutionMetadata().GetOverrides().GetResources() != nil { + resOverrides := taskCtx.TaskExecutionMetadata().GetOverrides().GetResources() + flytek8s.MergeResources(*resOverrides, &container.Resources) + } } modifiedCommand, err := template.Render(ctx, container.Command, template.Parameters{ TaskExecMetadata: taskCtx.TaskExecutionMetadata(), @@ -62,8 +67,7 @@ func validateAndFinalizePod( } container.Args = modifiedArgs container.Env = flytek8s.DecorateEnvVars(ctx, container.Env, taskCtx.TaskExecutionMetadata().GetTaskExecutionID()) - resources := flytek8s.ApplyResourceOverrides(ctx, container.Resources) - resReqs = append(resReqs, *resources) + resReqs = append(resReqs, container.Resources) finalizedContainers[index] = container } if !hasPrimaryContainer { diff --git a/go/tasks/plugins/k8s/sidecar/sidecar_test.go b/go/tasks/plugins/k8s/sidecar/sidecar_test.go index 1d68b52d0..61ef6ac18 100755 --- a/go/tasks/plugins/k8s/sidecar/sidecar_test.go +++ b/go/tasks/plugins/k8s/sidecar/sidecar_test.go @@ -35,8 +35,8 @@ const ResourceNvidiaGPU = "nvidia.com/gpu" var resourceRequirements = &v1.ResourceRequirements{ Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("1024m"), - v1.ResourceStorage: resource.MustParse("100M"), + v1.ResourceCPU: resource.MustParse("2048m"), + v1.ResourceEphemeralStorage: resource.MustParse("100M"), }, } @@ -243,6 +243,17 @@ func TestBuildSidecarResource_TaskType2(t *testing.T) { assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name) checkUserTolerations(t, res) + // Assert resource requirements are correctly set + expectedCPURequest := resource.MustParse("1") + assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value()) + expectedMemRequest := resource.MustParse("100Mi") + assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value()) + expectedCPULimit := resource.MustParse("2048m") + assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value()) + expectedMemLimit := resource.MustParse("200Mi") + assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value()) + expectedEphemeralStorageLimit := resource.MustParse("100M") + assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value()) } func TestBuildSidecarResource_TaskType2_Invalid_Spec(t *testing.T) { @@ -330,6 +341,18 @@ func TestBuildSidecarResource_TaskType1(t *testing.T) { assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name) checkUserTolerations(t, res) + + // Assert resource requirements are correctly set + expectedCPURequest := resource.MustParse("1") + assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value()) + expectedMemRequest := resource.MustParse("100Mi") + assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value()) + expectedCPULimit := resource.MustParse("2048m") + assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value()) + expectedMemLimit := resource.MustParse("200Mi") + assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value()) + expectedEphemeralStorageLimit := resource.MustParse("100M") + assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value()) } func TestBuildSideResource_TaskType1_InvalidSpec(t *testing.T) { @@ -453,6 +476,18 @@ func TestBuildSidecarResource(t *testing.T) { t.Fatalf("unexpected toleration [%+v]", tol) } } + + // Assert resource requirements are correctly set + expectedCPURequest := resource.MustParse("1024m") + assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value()) + expectedMemRequest := resource.MustParse("1024Mi") + assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value()) + expectedCPULimit := resource.MustParse("2048m") + assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value()) + expectedMemLimit := resource.MustParse("1024Mi") + assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value()) + expectedEphemeralStorageLimit := resource.MustParse("100M") + assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value()) } func TestBuildSidecarReosurceMissingAnnotationsAndLabels(t *testing.T) {