Skip to content

Commit

Permalink
Merge pull request #10247 from hashicorp/f-memory-oversubscription-2
Browse files Browse the repository at this point in the history
Memory oversubscription
  • Loading branch information
Mahmood Ali authored Mar 31, 2021
2 parents ee79587 + 6d89131 commit e3ea516
Show file tree
Hide file tree
Showing 33 changed files with 1,079 additions and 296 deletions.
3 changes: 2 additions & 1 deletion api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ type AllocatedCpuResources struct {
}

type AllocatedMemoryResources struct {
MemoryMB int64
MemoryMB int64
MemoryMaxMB int64
}

type AllocatedDeviceResource struct {
Expand Down
13 changes: 7 additions & 6 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
// Resources encapsulates the required resources of
// a given task or task group.
type Resources struct {
CPU *int `hcl:"cpu,optional"`
Cores *int `hcl:"cores,optional"`
MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"`
DiskMB *int `mapstructure:"disk" hcl:"disk,optional"`
Networks []*NetworkResource `hcl:"network,block"`
Devices []*RequestedDevice `hcl:"device,block"`
CPU *int `hcl:"cpu,optional"`
Cores *int `hcl:"cores,optional"`
MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"`
MemoryMaxMB *int `mapstructure:"memory_max" hcl:"memory_max,optional"`
DiskMB *int `mapstructure:"disk" hcl:"disk,optional"`
Networks []*NetworkResource `hcl:"network,block"`
Devices []*RequestedDevice `hcl:"device,block"`

// COMPAT(0.10)
// XXX Deprecated. Please do not use. The field will be removed in Nomad
Expand Down
7 changes: 6 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
}
}

memoryLimit := taskResources.Memory.MemoryMB
if max := taskResources.Memory.MemoryMaxMB; max > memoryLimit {
memoryLimit = max
}

return &drivers.TaskConfig{
ID: fmt.Sprintf("%s/%s/%s", alloc.ID, task.Name, invocationid),
Name: task.Name,
Expand All @@ -988,7 +993,7 @@ func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
Resources: &drivers.Resources{
NomadResources: taskResources,
LinuxResources: &drivers.LinuxResources{
MemoryLimitBytes: taskResources.Memory.MemoryMB * 1024 * 1024,
MemoryLimitBytes: memoryLimit * 1024 * 1024,
CPUShares: taskResources.Cpu.CpuShares,
PercentTicks: float64(taskResources.Cpu.CpuShares) / float64(tr.clientConfig.Node.NodeResources.Cpu.CpuShares),
},
Expand Down
66 changes: 66 additions & 0 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,72 @@ func runTestTaskRunner(t *testing.T, alloc *structs.Allocation, taskName string)
}
}

func TestTaskRunner_BuildTaskConfig_CPU_Memory(t *testing.T) {
t.Parallel()

cases := []struct {
name string
cpu int64
memoryMB int64
memoryMaxMB int64
expectedLinuxMemoryMB int64
}{
{
name: "plain no max",
cpu: 100,
memoryMB: 100,
memoryMaxMB: 0,
expectedLinuxMemoryMB: 100,
},
{
name: "plain with max=reserve",
cpu: 100,
memoryMB: 100,
memoryMaxMB: 100,
expectedLinuxMemoryMB: 100,
},
{
name: "plain with max>reserve",
cpu: 100,
memoryMB: 100,
memoryMaxMB: 200,
expectedLinuxMemoryMB: 200,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
alloc := mock.BatchAlloc()
alloc.Job.TaskGroups[0].Count = 1
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "2s",
}
res := alloc.AllocatedResources.Tasks[task.Name]
res.Cpu.CpuShares = c.cpu
res.Memory.MemoryMB = c.memoryMB
res.Memory.MemoryMaxMB = c.memoryMaxMB

conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name)
conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between task runners
defer cleanup()

// Run the first TaskRunner
tr, err := NewTaskRunner(conf)
require.NoError(t, err)

tc := tr.buildTaskConfig()
require.Equal(t, c.cpu, tc.Resources.LinuxResources.CPUShares)
require.Equal(t, c.expectedLinuxMemoryMB*1024*1024, tc.Resources.LinuxResources.MemoryLimitBytes)

require.Equal(t, c.cpu, tc.Resources.NomadResources.Cpu.CpuShares)
require.Equal(t, c.memoryMB, tc.Resources.NomadResources.Memory.MemoryMB)
require.Equal(t, c.memoryMaxMB, tc.Resources.NomadResources.Memory.MemoryMaxMB)
})
}
}

// TestTaskRunner_Restore_Running asserts restoring a running task does not
// rerun the task.
func TestTaskRunner_Restore_Running(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,8 @@ func TestClient_getAllocatedResources(t *testing.T) {
ReservedCores: []uint16{},
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 768,
MemoryMB: 768,
MemoryMaxMB: 768,
},
Networks: nil,
},
Expand Down
4 changes: 4 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,10 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
out.Cores = *in.Cores
}

if in.MemoryMaxMB != nil {
out.MemoryMaxMB = *in.MemoryMaxMB
}

// COMPAT(0.10): Only being used to issue warnings
if in.IOPS != nil {
out.IOPS = *in.IOPS
Expand Down
47 changes: 47 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2930,6 +2930,53 @@ func TestConversion_apiLogConfigToStructs(t *testing.T) {
}))
}

func TestConversion_apiResourcesToStructs(t *testing.T) {
t.Parallel()

cases := []struct {
name string
input *api.Resources
expected *structs.Resources
}{
{
"nil",
nil,
nil,
},
{
"plain",
&api.Resources{
CPU: helper.IntToPtr(100),
MemoryMB: helper.IntToPtr(200),
},
&structs.Resources{
CPU: 100,
MemoryMB: 200,
},
},
{
"with memory max",
&api.Resources{
CPU: helper.IntToPtr(100),
MemoryMB: helper.IntToPtr(200),
MemoryMaxMB: helper.IntToPtr(300),
},
&structs.Resources{
CPU: 100,
MemoryMB: 200,
MemoryMaxMB: 300,
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
found := ApiResourcesToStructs(c.input)
require.Equal(t, c.expected, found)
})
}
}

func TestConversion_apiConnectSidecarTaskToStructs(t *testing.T) {
t.Parallel()
require.Nil(t, apiConnectSidecarTaskToStructs(nil))
Expand Down
13 changes: 12 additions & 1 deletion command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,21 @@ func (c *AllocStatusCommand) outputTaskResources(alloc *api.Allocation, task str
var resourcesOutput []string
resourcesOutput = append(resourcesOutput, "CPU|Memory|Disk|Addresses")
firstAddr := ""
secondAddr := ""
if len(addr) > 0 {
firstAddr = addr[0]
}
if len(addr) > 1 {
secondAddr = addr[1]
}

// Display the rolled up stats. If possible prefer the live statistics
cpuUsage := strconv.Itoa(*resource.CPU)
memUsage := humanize.IBytes(uint64(*resource.MemoryMB * bytesPerMegabyte))
memMax := ""
if max := resource.MemoryMaxMB; max != nil && *max != 0 && *max != *resource.MemoryMB {
memMax = "Max: " + humanize.IBytes(uint64(*resource.MemoryMaxMB*bytesPerMegabyte))
}
var deviceStats []*api.DeviceGroupStats

if stats != nil {
Expand All @@ -588,7 +596,10 @@ func (c *AllocStatusCommand) outputTaskResources(alloc *api.Allocation, task str
memUsage,
humanize.IBytes(uint64(*alloc.Resources.DiskMB*bytesPerMegabyte)),
firstAddr))
for i := 1; i < len(addr); i++ {
if memMax != "" || secondAddr != "" {
resourcesOutput = append(resourcesOutput, fmt.Sprintf("|%v||%v", memMax, secondAddr))
}
for i := 2; i < len(addr); i++ {
resourcesOutput = append(resourcesOutput, fmt.Sprintf("|||%v", addr[i]))
}
c.Ui.Output(formatListWithSpaces(resourcesOutput))
Expand Down
21 changes: 14 additions & 7 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ func parseSecurityOpts(securityOpts []string) ([]string, error) {
}

// memoryLimits computes the memory and memory_reservation values passed along to
// the docker host config. These fields represent hard and soft memory limits from
// docker's perspective, respectively.
// the docker host config. These fields represent hard and soft/reserved memory
// limits from docker's perspective, respectively.
//
// The memory field on the task configuration can be interpreted as a hard or soft
// limit. Before Nomad v0.11.3, it was always a hard limit. Now, it is interpreted
Expand All @@ -754,11 +754,18 @@ func parseSecurityOpts(securityOpts []string) ([]string, error) {
// unset.
//
// Returns (memory (hard), memory_reservation (soft)) values in bytes.
func (_ *Driver) memoryLimits(driverHardLimitMB, taskMemoryLimitBytes int64) (int64, int64) {
if driverHardLimitMB <= 0 {
return taskMemoryLimitBytes, 0
func memoryLimits(driverHardLimitMB int64, taskMemory drivers.MemoryResources) (memory, reserve int64) {
softBytes := taskMemory.MemoryMB * 1024 * 1024

hard := driverHardLimitMB
if taskMemory.MemoryMaxMB > hard {
hard = taskMemory.MemoryMaxMB
}

if hard <= 0 {
return softBytes, 0
}
return driverHardLimitMB * 1024 * 1024, taskMemoryLimitBytes
return hard * 1024 * 1024, softBytes
}

func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *TaskConfig,
Expand Down Expand Up @@ -809,7 +816,7 @@ func (d *Driver) createContainerConfig(task *drivers.TaskConfig, driverConfig *T
return c, fmt.Errorf("requested runtime %q is not allowed", containerRuntime)
}

memory, memoryReservation := d.memoryLimits(driverConfig.MemoryHardLimit, task.Resources.LinuxResources.MemoryLimitBytes)
memory, memoryReservation := memoryLimits(driverConfig.MemoryHardLimit, task.Resources.NomadResources.Memory)

hostConfig := &docker.HostConfig{
Memory: memory, // hard limit
Expand Down
60 changes: 50 additions & 10 deletions drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2855,17 +2855,57 @@ func TestDockerDriver_CreateContainerConfig_CPUHardLimit(t *testing.T) {
func TestDockerDriver_memoryLimits(t *testing.T) {
t.Parallel()

t.Run("driver hard limit not set", func(t *testing.T) {
memory, memoryReservation := new(Driver).memoryLimits(0, 256*1024*1024)
require.Equal(t, int64(256*1024*1024), memory)
require.Equal(t, int64(0), memoryReservation)
})
cases := []struct {
name string
driverMemoryMB int64
taskResources drivers.MemoryResources
expectedHard int64
expectedSoft int64
}{
{
"plain request",
0,
drivers.MemoryResources{MemoryMB: 10},
10 * 1024 * 1024,
0,
},
{
"with driver max",
20,
drivers.MemoryResources{MemoryMB: 10},
20 * 1024 * 1024,
10 * 1024 * 1024,
},
{
"with resources max",
20,
drivers.MemoryResources{MemoryMB: 10, MemoryMaxMB: 20},
20 * 1024 * 1024,
10 * 1024 * 1024,
},
{
"with driver and resources max: higher driver",
30,
drivers.MemoryResources{MemoryMB: 10, MemoryMaxMB: 20},
30 * 1024 * 1024,
10 * 1024 * 1024,
},
{
"with driver and resources max: higher resources",
20,
drivers.MemoryResources{MemoryMB: 10, MemoryMaxMB: 30},
30 * 1024 * 1024,
10 * 1024 * 1024,
},
}

t.Run("driver hard limit is set", func(t *testing.T) {
memory, memoryReservation := new(Driver).memoryLimits(512, 256*1024*1024)
require.Equal(t, int64(512*1024*1024), memory)
require.Equal(t, int64(256*1024*1024), memoryReservation)
})
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
hard, soft := memoryLimits(c.driverMemoryMB, c.taskResources)
require.Equal(t, c.expectedHard, hard)
require.Equal(t, c.expectedSoft, soft)
})
}
}

func TestDockerDriver_parseSignal(t *testing.T) {
Expand Down
17 changes: 13 additions & 4 deletions drivers/shared/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,15 +682,24 @@ func configureCgroups(cfg *lconfigs.Config, command *ExecCommand) error {
return nil
}

if mb := command.Resources.NomadResources.Memory.MemoryMB; mb > 0 {
// Total amount of memory allowed to consume
cfg.Cgroups.Resources.Memory = mb * 1024 * 1024
// Total amount of memory allowed to consume
res := command.Resources.NomadResources
memHard, memSoft := res.Memory.MemoryMaxMB, res.Memory.MemoryMB
if memHard <= 0 {
memHard = res.Memory.MemoryMB
memSoft = 0
}

if memHard > 0 {
cfg.Cgroups.Resources.Memory = memHard * 1024 * 1024
cfg.Cgroups.Resources.MemoryReservation = memSoft * 1024 * 1024

// Disable swap to avoid issues on the machine
var memSwappiness uint64
cfg.Cgroups.Resources.MemorySwappiness = &memSwappiness
}

cpuShares := command.Resources.NomadResources.Cpu.CpuShares
cpuShares := res.Cpu.CpuShares
if cpuShares < 2 {
return fmt.Errorf("resources.Cpu.CpuShares must be equal to or greater than 2: %v", cpuShares)
}
Expand Down
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/oversubscription"
_ "github.com/hashicorp/nomad/e2e/parameterized"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
Expand Down
Loading

0 comments on commit e3ea516

Please sign in to comment.