From 5c0a3d254b825d11083119daf42ecc386c42f26c Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 25 Mar 2016 16:46:48 -0700 Subject: [PATCH] Limit garbage collection of batch jobs --- nomad/core_sched.go | 11 ++++ nomad/core_sched_test.go | 105 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 944f852341a..1bff28f67c5 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -85,6 +85,8 @@ OUTER: for _, eval := range evals { gc, allocs, err := c.gcEval(eval, oldThreshold) if err != nil || !gc { + // We skip the job because it is not finished if it has + // non-terminal allocations. continue OUTER } @@ -155,11 +157,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error { var gcAlloc, gcEval []string for raw := iter.Next(); raw != nil; raw = iter.Next() { eval := raw.(*structs.Evaluation) + gc, allocs, err := c.gcEval(eval, oldThreshold) if err != nil { return err } + // If the eval is from a "batch" job we don't want to garbage collect + // its allocations. If there is a long running batch job and its + // terminal allocations get GC'd the scheduler would re-run the + // allocations. + if len(allocs) != 0 && eval.Type == structs.JobTypeBatch { + continue + } + if gc { gcEval = append(gcEval, eval.ID) gcAlloc = append(gcAlloc, allocs...) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index ba5b5cc4aa5..792c306482f 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -69,6 +69,111 @@ func TestCoreScheduler_EvalGC(t *testing.T) { } } +func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" eval + state := s1.fsm.State() + eval := mock.Eval() + eval.Type = structs.JobTypeBatch + eval.Status = structs.EvalStatusFailed + err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobEvalGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should be gone because there is no alloc associated + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("bad: %v", out) + } +} + +func TestCoreScheduler_EvalGC_Batch_Allocs(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Insert "dead" eval + state := s1.fsm.State() + eval := mock.Eval() + eval.Type = structs.JobTypeBatch + eval.Status = structs.EvalStatusFailed + err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "dead" alloc + alloc := mock.Alloc() + alloc.EvalID = eval.ID + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + err = state.UpsertAllocs(1001, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Update the time tables to make this work + tt := s1.fsm.TimeTable() + tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + + // Create a core scheduler + snap, err := state.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC + gc := s1.coreJobEval(structs.CoreJobEvalGC) + gc.ModifyIndex = 2000 + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Shouldn't be gone because there are associated allocs. + out, err := state.EvalByID(eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + outA, err := state.AllocByID(alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA == nil { + t.Fatalf("bad: %v", outA) + } +} + func TestCoreScheduler_EvalGC_Force(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown()