Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory oversubscription #10247

Merged
merged 10 commits into from
Mar 31, 2021
3 changes: 2 additions & 1 deletion api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ type AllocatedCpuResources struct {
}

type AllocatedMemoryResources struct {
MemoryMB int64
MemoryMB int64
MemoryMaxMB int64
}

type AllocatedDeviceResource struct {
Expand Down
13 changes: 7 additions & 6 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
// Resources encapsulates the required resources of
// a given task or task group.
type Resources struct {
CPU *int `hcl:"cpu,optional"`
Cores *int `hcl:"cores,optional"`
MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"`
DiskMB *int `mapstructure:"disk" hcl:"disk,optional"`
Networks []*NetworkResource `hcl:"network,block"`
Devices []*RequestedDevice `hcl:"device,block"`
CPU *int `hcl:"cpu,optional"`
Cores *int `hcl:"cores,optional"`
MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"`
MemoryMaxMB *int `mapstructure:"memory_max" hcl:"memory_max,optional"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this iteration, I've opted to simply add a memory_max field in the job spec, with memory remaining as the "reserve"/base memory requirement. Happy to reconsider this and use an alternative name for the "base", e.g. memory_reserve,memory_required?

I considered memory_min - but I find it ambiguous. min indicates the minimum memory a task uses rather than how much memory we should reserve/allocate for the task.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like that never really got resolved on the RFC, but I'm totally 👍 for this. It avoids any migration issues later, too.

DiskMB *int `mapstructure:"disk" hcl:"disk,optional"`
Networks []*NetworkResource `hcl:"network,block"`
Devices []*RequestedDevice `hcl:"device,block"`

// COMPAT(0.10)
// XXX Deprecated. Please do not use. The field will be removed in Nomad
Expand Down
7 changes: 6 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
}
}

memoryLimit := taskResources.Memory.MemoryMB
if max := taskResources.Memory.MemoryMaxMB; max > memoryLimit {
memoryLimit = max
}

return &drivers.TaskConfig{
ID: fmt.Sprintf("%s/%s/%s", alloc.ID, task.Name, invocationid),
Name: task.Name,
Expand All @@ -988,7 +993,7 @@ func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
Resources: &drivers.Resources{
NomadResources: taskResources,
LinuxResources: &drivers.LinuxResources{
MemoryLimitBytes: taskResources.Memory.MemoryMB * 1024 * 1024,
MemoryLimitBytes: memoryLimit * 1024 * 1024,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the linux resources is intended to ease drivers implementation and adoption of the features: drivers that use resources.LinuxResources.MemoryLimitBytes don't need to be updated.

Drivers that use NomadResources will need to updated to track the new field value. Given that tasks aren't guaranteed to use up the excess memory limit, this is a reasonable compromise.

I don't know the breakdown of how external 3rd party drivers check memory limit, but happy to change the default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drivers that use NomadResources will need to updated to track the new field value. Given that tasks aren't guaranteed to use up the excess memory limit, this is a reasonable compromise.

So if they don't get updated, they'll just end up setting their limit equal to the memory field value, just as they do today? They just end up ignoring memory_max?

From a customer/community impact standpoint, the two I'd worry the most about are containerd and podman. Also, do we want to update qemu to take whichever is greater?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the failure mode is ignoring memory_max and behaving like today. I'm researching soft vs hard limits a bit now, and will ensure containerd and podman are updated to the recommended pattern.

CPUShares: taskResources.Cpu.CpuShares,
PercentTicks: float64(taskResources.Cpu.CpuShares) / float64(tr.clientConfig.Node.NodeResources.Cpu.CpuShares),
},
Expand Down
66 changes: 66 additions & 0 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,72 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string)
}
}

func TestTaskRunner_BuildTaskConfig_CPU_Memory(t *testing.T) {
t.Parallel()

cases := []struct {
name string
cpu int64
memoryMB int64
memoryMaxMB int64
expectedLinuxMemoryMB int64
}{
{
name: "plain no max",
cpu: 100,
memoryMB: 100,
memoryMaxMB: 0,
expectedLinuxMemoryMB: 100,
},
{
name: "plain with max=reserve",
cpu: 100,
memoryMB: 100,
memoryMaxMB: 100,
expectedLinuxMemoryMB: 100,
},
{
name: "plain with max>reserve",
cpu: 100,
memoryMB: 100,
memoryMaxMB: 200,
expectedLinuxMemoryMB: 200,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
alloc := mock.BatchAlloc()
alloc.Job.TaskGroups[0].Count = 1
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "2s",
}
res := alloc.AllocatedResources.Tasks[task.Name]
res.Cpu.CpuShares = c.cpu
res.Memory.MemoryMB = c.memoryMB
res.Memory.MemoryMaxMB = c.memoryMaxMB

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between task runners
defer cleanup()

// Run the first TaskRunner
tr, err := NewTaskRunner(conf)
require.NoError(t, err)

tc := tr.buildTaskConfig()
require.Equal(t, c.cpu, tc.Resources.LinuxResources.CPUShares)
require.Equal(t, c.expectedLinuxMemoryMB*1024*1024, tc.Resources.LinuxResources.MemoryLimitBytes)

require.Equal(t, c.cpu, tc.Resources.NomadResources.Cpu.CpuShares)
require.Equal(t, c.memoryMB, tc.Resources.NomadResources.Memory.MemoryMB)
require.Equal(t, c.memoryMaxMB, tc.Resources.NomadResources.Memory.MemoryMaxMB)
})
}
}

// TestTaskRunner_Restore_Running asserts restoring a running task does not
// rerun the task.
func TestTaskRunner_Restore_Running(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,8 @@ func TestClient_getAllocatedResources(t *testing.T) {
ReservedCores: []uint16{},
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 768,
MemoryMB: 768,
MemoryMaxMB: 768,
},
Networks: nil,
},
Expand Down
4 changes: 4 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,10 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
out.Cores = *in.Cores
}

if in.MemoryMaxMB != nil {
out.MemoryMaxMB = *in.MemoryMaxMB
}

// COMPAT(0.10): Only being used to issue warnings
if in.IOPS != nil {
out.IOPS = *in.IOPS
Expand Down
47 changes: 47 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2930,6 +2930,53 @@ func TestConversion_apiLogConfigToStructs(t *testing.T) {
}))
}

func TestConversion_apiResourcesToStructs(t *testing.T) {
t.Parallel()

cases := []struct {
name string
input *api.Resources
expected *structs.Resources
}{
{
"nil",
nil,
nil,
},
{
"plain",
&api.Resources{
CPU: helper.IntToPtr(100),
MemoryMB: helper.IntToPtr(200),
},
&structs.Resources{
CPU: 100,
MemoryMB: 200,
},
},
{
"with memory max",
&api.Resources{
CPU: helper.IntToPtr(100),
MemoryMB: helper.IntToPtr(200),
MemoryMaxMB: helper.IntToPtr(300),
},
&structs.Resources{
CPU: 100,
MemoryMB: 200,
MemoryMaxMB: 300,
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
found := ApiResourcesToStructs(c.input)
require.Equal(t, c.expected, found)
})
}
}

func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
t.Parallel()
require.Nil(t, apiConnectSidecarTaskToStructs(nil))
Expand Down
13 changes: 12 additions & 1 deletion command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,21 @@ func (c *AllocStatusCommand) outputTaskResources(alloc *api.Allocation, task str
var resourcesOutput []string
resourcesOutput = append(resourcesOutput, "CPU|Memory|Disk|Addresses")
firstAddr := ""
secondAddr := ""
if len(addr) > 0 {
firstAddr = addr[0]
}
if len(addr) > 1 {
secondAddr = addr[1]
}

// Display the rolled up stats. If possible prefer the live statistics
cpuUsage := strconv.Itoa(*resource.CPU)
memUsage := humanize.IBytes(uint64(*resource.MemoryMB * bytesPerMegabyte))
memMax := ""
if max := resource.MemoryMaxMB; max != nil && *max != 0 && *max != *resource.MemoryMB {
memMax = "Max: " + humanize.IBytes(uint64(*resource.MemoryMaxMB*bytesPerMegabyte))
}
var deviceStats []*api.DeviceGroupStats

if stats != nil {
Expand All @@ -588,7 +596,10 @@ func (c *AllocStatusCommand) outputTaskResources(alloc *api.Allocation, task str
memUsage,
humanize.IBytes(uint64(*alloc.Resources.DiskMB*bytesPerMegabyte)),
firstAddr))
for i := 1; i < len(addr); i++ {
if memMax != "" || secondAddr != "" {
resourcesOutput = append(resourcesOutput, fmt.Sprintf("|%v||%v", memMax, secondAddr))
}
for i := 2; i < len(addr); i++ {
resourcesOutput = append(resourcesOutput, fmt.Sprintf("|||%v", addr[i]))
}
c.Ui.Output(formatListWithSpaces(resourcesOutput))
Expand Down
21 changes: 14 additions & 7 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ func parseSecurityOpts(securityOpts []string) ([]string, error) {
}

// memoryLimits computes the memory and memory_reservation values passed along to
// the docker host config. These fields represent hard and soft memory limits from
// docker's perspective, respectively.
// the docker host config. These fields represent hard and soft/reserved memory
// limits from docker's perspective, respectively.
//
// The memory field on the task configuration can be interpreted as a hard or soft
// limit. Before Nomad v0.11.3, it was always a hard limit. Now, it is interpreted
Expand All @@ -754,11 +754,18 @@ func parseSecurityOpts(securityOpts []string) ([]string, error) {
// unset.
//
// Returns (memory (hard), memory_reservation (soft)) values in bytes.
func (_ *Driver) memoryLimits(driverHardLimitMB, taskMemoryLimitBytes int64) (int64, int64) {
if driverHardLimitMB <= 0 {
return taskMemoryLimitBytes, 0
func memoryLimits(driverHardLimitMB int64, taskMemory drivers.MemoryResources) (memory, reserve int64) {
softBytes := taskMemory.MemoryMB * 1024 * 1024

hard := driverHardLimitMB
if taskMemory.MemoryMaxMB > hard {
hard = taskMemory.MemoryMaxMB
}

if hard <= 0 {
return softBytes, 0
}
return driverHardLimitMB * 1024 * 1024, taskMemoryLimitBytes
return hard * 1024 * 1024, softBytes
}

func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *TaskConfig,
Expand Down Expand Up @@ -809,7 +816,7 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T
return c, fmt.Errorf("requested runtime %q is not allowed", containerRuntime)
}

memory, memoryReservation := d.memoryLimits(driverConfig.MemoryHardLimit, task.Resources.LinuxResources.MemoryLimitBytes)
memory, memoryReservation := memoryLimits(driverConfig.MemoryHardLimit, task.Resources.NomadResources.Memory)

hostConfig := &docker.HostConfig{
Memory: memory, // hard limit
Expand Down
60 changes: 50 additions & 10 deletions drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2855,17 +2855,57 @@ func TestDockerDriver_CreateContainerConfig_CPUHardLimit(t *testing.T) {
func TestDockerDriver_memoryLimits(t *testing.T) {
t.Parallel()

t.Run("driver hard limit not set", func(t *testing.T) {
memory, memoryReservation := new(Driver).memoryLimits(0, 256*1024*1024)
require.Equal(t, int64(256*1024*1024), memory)
require.Equal(t, int64(0), memoryReservation)
})
cases := []struct {
name string
driverMemoryMB int64
taskResources drivers.MemoryResources
expectedHard int64
expectedSoft int64
}{
{
"plain request",
0,
drivers.MemoryResources{MemoryMB: 10},
10 * 1024 * 1024,
0,
},
{
"with driver max",
20,
drivers.MemoryResources{MemoryMB: 10},
20 * 1024 * 1024,
10 * 1024 * 1024,
},
{
"with resources max",
20,
drivers.MemoryResources{MemoryMB: 10, MemoryMaxMB: 20},
20 * 1024 * 1024,
10 * 1024 * 1024,
},
{
"with driver and resources max: higher driver",
30,
drivers.MemoryResources{MemoryMB: 10, MemoryMaxMB: 20},
30 * 1024 * 1024,
10 * 1024 * 1024,
},
{
"with driver and resources max: higher resources",
20,
drivers.MemoryResources{MemoryMB: 10, MemoryMaxMB: 30},
30 * 1024 * 1024,
10 * 1024 * 1024,
},
}

t.Run("driver hard limit is set", func(t *testing.T) {
memory, memoryReservation := new(Driver).memoryLimits(512, 256*1024*1024)
require.Equal(t, int64(512*1024*1024), memory)
require.Equal(t, int64(256*1024*1024), memoryReservation)
})
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
hard, soft := memoryLimits(c.driverMemoryMB, c.taskResources)
require.Equal(t, c.expectedHard, hard)
require.Equal(t, c.expectedSoft, soft)
})
}
}

func TestDockerDriver_parseSignal(t *testing.T) {
Expand Down
17 changes: 13 additions & 4 deletions drivers/shared/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,15 +682,24 @@ func configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error {
return nil
}

if mb := command.Resources.NomadResources.Memory.MemoryMB; mb > 0 {
// Total amount of memory allowed to consume
cfg.Cgroups.Resources.Memory = mb * 1024 * 1024
// Total amount of memory allowed to consume
res := command.Resources.NomadResources
memHard, memSoft := res.Memory.MemoryMaxMB, res.Memory.MemoryMB
if memHard <= 0 {
memHard = res.Memory.MemoryMB
memSoft = 0
}

if memHard > 0 {
cfg.Cgroups.Resources.Memory = memHard * 1024 * 1024
cfg.Cgroups.Resources.MemoryReservation = memSoft * 1024 * 1024

// Disable swap to avoid issues on the machine
var memSwappiness uint64
cfg.Cgroups.Resources.MemorySwappiness = &memSwappiness
}

cpuShares := command.Resources.NomadResources.Cpu.CpuShares
cpuShares := res.Cpu.CpuShares
if cpuShares < 2 {
return fmt.Errorf("resources.Cpu.CpuShares must be equal to or greater than 2: %v", cpuShares)
}
Expand Down
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/oversubscription"
_ "github.com/hashicorp/nomad/e2e/parameterized"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
Expand Down
Loading