diff --git a/.changelog/24229.txt b/.changelog/24229.txt new file mode 100644 index 00000000000..c4ff1256abc --- /dev/null +++ b/.changelog/24229.txt @@ -0,0 +1,3 @@ +```release-note:bug +docker: Fixed a bug where task CPU stats were reported incorrectly +``` diff --git a/drivers/docker/driver_test.go b/drivers/docker/driver_test.go index 86b766893f4..4a77346fa6d 100644 --- a/drivers/docker/driver_test.go +++ b/drivers/docker/driver_test.go @@ -30,6 +30,7 @@ import ( "github.com/docker/docker/errdefs" "github.com/docker/go-connections/nat" hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/lib/numalib" "github.com/hashicorp/nomad/client/taskenv" @@ -3223,3 +3224,66 @@ func TestDockerDriver_GroupAdd(t *testing.T) { must.Eq(t, cfg.GroupAdd, container.HostConfig.GroupAdd) } + +// TestDockerDriver_CollectStats verifies that the TaskStats API collects stats +// periodically and that these values are non-zero as expected +func TestDockerDriver_CollectStats(t *testing.T) { + ci.Parallel(t) + testutil.RequireLinux(t) // stats outputs are different on Windows + testutil.DockerCompatible(t) + + // we want to generate at least some CPU usage + args := []string{"/bin/sh", "-c", "cat /dev/urandom | base64 > /dev/null"} + taskCfg := newTaskConfig("", args) + task := &drivers.TaskConfig{ + ID: uuid.Generate(), + Name: "nc-demo", + AllocID: uuid.Generate(), + Resources: basicResources, + } + must.NoError(t, task.EncodeConcreteDriverConfig(&taskCfg)) + + d := dockerDriverHarness(t, nil) + plugin, ok := d.Impl().(*Driver) + must.True(t, ok) + plugin.compute.TotalCompute = 1000 + plugin.compute.NumCores = 1 + + cleanup := d.MkAllocDir(task, true) + defer cleanup() + copyImage(t, task.TaskDir(), "busybox.tar") + + _, _, err := d.StartTask(task) + must.NoError(t, err) + + defer d.DestroyTask(task.ID, true) + + // this test has to run for a while because the minimum stats interval we + // can get from Docker is 1s + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + recv, err := d.TaskStats(ctx, task.ID, time.Second) + must.NoError(t, err) + + statsReceived := 0 + tickValues := set.From([]float64{}) + +DONE: + for { + select { + case stats := <-recv: + statsReceived++ + ticks := stats.ResourceUsage.CpuStats.TotalTicks + must.Greater(t, 0, ticks) + tickValues.Insert(ticks) + if statsReceived >= 3 { + cancel() // 3 is plenty + } + case <-ctx.Done(): + break DONE + } + } + + // CPU stats should be changed with every interval + must.Len(t, statsReceived, tickValues.Slice()) +} diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index cd64e966ee9..8ef12add73b 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -66,7 +66,6 @@ func (u *usageSender) send(tru *cstructs.TaskResourceUsage) { func (u *usageSender) close() { u.mu.Lock() defer u.mu.Unlock() - if u.closed { // already closed return @@ -97,22 +96,29 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte timer, cancel := helper.NewSafeTimer(interval) defer cancel() + // we need to use the streaming stats API here because our calculation for + // CPU usage depends on having the values from the previous read, which are + // not available in one-shot + statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true) + if err != nil && err != io.EOF { + h.logger.Debug("error collecting stats from container", "error", err) + return + } + defer statsReader.Body.Close() + collectOnce := func() { defer timer.Reset(interval) - statsReader, err := h.dockerClient.ContainerStatsOneShot(ctx, h.containerID) + var stats *containerapi.Stats + err := json.NewDecoder(statsReader.Body).Decode(&stats) if err != nil && err != io.EOF { - h.logger.Debug("error collecting stats from container", "error", err) + h.logger.Debug("error decoding stats data from container", "error", err) return } - defer statsReader.Body.Close() - - var stats containerapi.Stats - if err := json.NewDecoder(statsReader.Body).Decode(&stats); err != nil { - h.logger.Error("error decoding stats data for container", "error", err) + if stats == nil { + h.logger.Debug("error decoding stats data: stats were nil") return } - - resourceUsage := util.DockerStatsToTaskResourceUsage(&stats, compute) + resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute) destCh.send(resourceUsage) }