Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Fix resource overrides for sidecar/pod tasks (#194)
Browse files Browse the repository at this point in the history
Signed-off-by: Jeev B <[email protected]>
  • Loading branch information
jeevb authored Aug 11, 2021
1 parent 184d075 commit 892f35e
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 4 deletions.
17 changes: 17 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ const resourceGPU = "gpu"
// Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go
const ResourceNvidiaGPU = "nvidia.com/gpu"

func MergeResources(in v1.ResourceRequirements, out *v1.ResourceRequirements) {
if out.Limits == nil {
out.Limits = in.Limits
} else if in.Limits != nil {
for key, val := range in.Limits {
out.Limits[key] = val
}
}
if out.Requests == nil {
out.Requests = in.Requests
} else if in.Requests != nil {
for key, val := range in.Requests {
out.Requests[key] = val
}
}
}

func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequirements) *v1.ResourceRequirements {
// set memory and cpu to default if not provided by user.
if len(resources.Requests) == 0 {
Expand Down
81 changes: 81 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/container_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,84 @@ func TestApplyResourceOverrides_OverrideGpu(t *testing.T) {
})
assert.EqualValues(t, gpuRequest, overrides.Limits[ResourceNvidiaGPU])
}

func TestMergeResources_EmptyIn(t *testing.T) {
requestedResourceQuantity := resource.MustParse("1")
expectedResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceCPU: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
}
outResources := expectedResources.DeepCopy()
MergeResources(v1.ResourceRequirements{}, outResources)
assert.EqualValues(t, *outResources, expectedResources)
}

func TestMergeResources_EmptyOut(t *testing.T) {
requestedResourceQuantity := resource.MustParse("1")
expectedResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceCPU: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
}
outResources := v1.ResourceRequirements{}
MergeResources(expectedResources, &outResources)
assert.EqualValues(t, outResources, expectedResources)
}

func TestMergeResources_PartialRequirements(t *testing.T) {
requestedResourceQuantity := resource.MustParse("1")
resourceList := v1.ResourceList{
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceCPU: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
}
inResources := v1.ResourceRequirements{Requests: resourceList}
outResources := v1.ResourceRequirements{Limits: resourceList}
MergeResources(inResources, &outResources)
assert.EqualValues(t, outResources, v1.ResourceRequirements{
Requests: resourceList,
Limits: resourceList,
})
}

func TestMergeResources_PartialResourceKeys(t *testing.T) {
requestedResourceQuantity := resource.MustParse("1")
resourceList1 := v1.ResourceList{
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
}
resourceList2 := v1.ResourceList{v1.ResourceCPU: requestedResourceQuantity}
expectedResourceList := v1.ResourceList{
v1.ResourceCPU: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
}
inResources := v1.ResourceRequirements{
Requests: resourceList1,
Limits: resourceList2,
}
outResources := v1.ResourceRequirements{
Requests: resourceList2,
Limits: resourceList1,
}
MergeResources(inResources, &outResources)
assert.EqualValues(t, outResources, v1.ResourceRequirements{
Requests: expectedResourceList,
Limits: expectedResourceList,
})
}
8 changes: 6 additions & 2 deletions go/tasks/plugins/k8s/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func validateAndFinalizePod(
for index, container := range pod.Spec.Containers {
if container.Name == primaryContainerName {
hasPrimaryContainer = true
container.Resources = *flytek8s.ApplyResourceOverrides(ctx, container.Resources)
if taskCtx.TaskExecutionMetadata().GetOverrides() != nil && taskCtx.TaskExecutionMetadata().GetOverrides().GetResources() != nil {
resOverrides := taskCtx.TaskExecutionMetadata().GetOverrides().GetResources()
flytek8s.MergeResources(*resOverrides, &container.Resources)
}
}
modifiedCommand, err := template.Render(ctx, container.Command, template.Parameters{
TaskExecMetadata: taskCtx.TaskExecutionMetadata(),
Expand All @@ -62,8 +67,7 @@ func validateAndFinalizePod(
}
container.Args = modifiedArgs
container.Env = flytek8s.DecorateEnvVars(ctx, container.Env, taskCtx.TaskExecutionMetadata().GetTaskExecutionID())
resources := flytek8s.ApplyResourceOverrides(ctx, container.Resources)
resReqs = append(resReqs, *resources)
resReqs = append(resReqs, container.Resources)
finalizedContainers[index] = container
}
if !hasPrimaryContainer {
Expand Down
39 changes: 37 additions & 2 deletions go/tasks/plugins/k8s/sidecar/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ const ResourceNvidiaGPU = "nvidia.com/gpu"

var resourceRequirements = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("2048m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
}

Expand Down Expand Up @@ -243,6 +243,17 @@ func TestBuildSidecarResource_TaskType2(t *testing.T) {
assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name)
checkUserTolerations(t, res)

// Assert resource requirements are correctly set
expectedCPURequest := resource.MustParse("1")
assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value())
expectedMemRequest := resource.MustParse("100Mi")
assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value())
expectedCPULimit := resource.MustParse("2048m")
assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value())
expectedMemLimit := resource.MustParse("200Mi")
assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value())
expectedEphemeralStorageLimit := resource.MustParse("100M")
assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value())
}

func TestBuildSidecarResource_TaskType2_Invalid_Spec(t *testing.T) {
Expand Down Expand Up @@ -330,6 +341,18 @@ func TestBuildSidecarResource_TaskType1(t *testing.T) {
assert.Equal(t, "volume mount", res.(*v1.Pod).Spec.Containers[0].VolumeMounts[0].Name)

checkUserTolerations(t, res)

// Assert resource requirements are correctly set
expectedCPURequest := resource.MustParse("1")
assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value())
expectedMemRequest := resource.MustParse("100Mi")
assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value())
expectedCPULimit := resource.MustParse("2048m")
assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value())
expectedMemLimit := resource.MustParse("200Mi")
assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value())
expectedEphemeralStorageLimit := resource.MustParse("100M")
assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value())
}

func TestBuildSideResource_TaskType1_InvalidSpec(t *testing.T) {
Expand Down Expand Up @@ -453,6 +476,18 @@ func TestBuildSidecarResource(t *testing.T) {
t.Fatalf("unexpected toleration [%+v]", tol)
}
}

// Assert resource requirements are correctly set
expectedCPURequest := resource.MustParse("1024m")
assert.Equal(t, expectedCPURequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Cpu().Value())
expectedMemRequest := resource.MustParse("1024Mi")
assert.Equal(t, expectedMemRequest.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Requests.Memory().Value())
expectedCPULimit := resource.MustParse("2048m")
assert.Equal(t, expectedCPULimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Cpu().Value())
expectedMemLimit := resource.MustParse("1024Mi")
assert.Equal(t, expectedMemLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.Memory().Value())
expectedEphemeralStorageLimit := resource.MustParse("100M")
assert.Equal(t, expectedEphemeralStorageLimit.Value(), res.(*v1.Pod).Spec.Containers[0].Resources.Limits.StorageEphemeral().Value())
}

func TestBuildSidecarReosurceMissingAnnotationsAndLabels(t *testing.T) {
Expand Down

0 comments on commit 892f35e

Please sign in to comment.