Skip to content

Commit

Permalink
Merge pull request #24 from yohamta/feature/retry-policy
Browse files Browse the repository at this point in the history
Fix retry policy and process terminate behavior
  • Loading branch information
yohamta authored Apr 28, 2022
2 parents 5c8d546 + 5dc5393 commit 6e19286
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmd/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func stop(cfg *config.Config) error {
log.Printf("DAG is not running.")
return nil
}
syscall.Kill(int(status.Pid), syscall.SIGINT)
syscall.Kill(int(status.Pid), syscall.SIGTERM)
for {
time.Sleep(time.Second * 3)
s, err := controller.New(cfg).GetStatus()
Expand Down
2 changes: 1 addition & 1 deletion internal/admin/handlers/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func buildLog(logs []*models.StatusFile) *Log {
}

func getPathParameter(r *http.Request) (string, error) {
re := regexp.MustCompile("/dags/([^/\\?]+)/?$")
re := regexp.MustCompile(`/dags/([^/\?]+)/?$`)
m := re.FindStringSubmatch(r.URL.Path)
if len(m) < 2 {
return "", fmt.Errorf("invalid URL")
Expand Down
10 changes: 5 additions & 5 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (a *Agent) Status() *models.Status {
}

// Signal sends the signal to the processes running
// if processes do not terminate for 60 seconds,
// if processes do not terminate for 120 seconds,
// cancel all processes which will send signal -1 to the processes.
func (a *Agent) Signal(sig os.Signal) {
log.Printf("Sending %s signal to running child processes.", sig)
Expand All @@ -112,7 +112,7 @@ func (a *Agent) Signal(sig os.Signal) {
select {
case <-done:
log.Printf("All child processes have been terminated.")
case <-time.After(time.Second * 60):
case <-time.After(time.Second * 120):
a.Cancel(sig)
default:
log.Printf("Waiting for child processes to exit...")
Expand All @@ -121,7 +121,7 @@ func (a *Agent) Signal(sig os.Signal) {
}

// Cancel sends signal -1 to all child processes.
// then it waits another 20 seconds before therminating the
// then it waits another 60 seconds before therminating the
// parent process.
func (a *Agent) Cancel(sig os.Signal) {
log.Printf("Sending -1 signal to running child processes.")
Expand All @@ -132,7 +132,7 @@ func (a *Agent) Cancel(sig os.Signal) {
select {
case <-done:
log.Printf("All child processes have been terminated.")
case <-time.After(time.Second * 20):
case <-time.After(time.Second * 60):
log.Printf("Terminating the controller process.")
a.Kill(done)
default:
Expand All @@ -154,7 +154,7 @@ func (a *Agent) init() {
&scheduler.Config{
LogDir: path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_")),
MaxActiveRuns: a.DAG.MaxActiveRuns,
DelaySec: a.DAG.DelaySec,
Delay: a.DAG.Delay,
Dry: a.Dry,
OnExit: a.DAG.HandlerOn.Exit,
OnSuccess: a.DAG.HandlerOn.Success,
Expand Down
10 changes: 6 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
ErrorMail *MailConfig
InfoMail *MailConfig
Smtp *SmtpConfig
DelaySec time.Duration
Delay time.Duration
HistRetentionDays int
Preconditions []*Condition
MaxActiveRuns int
Expand Down Expand Up @@ -135,7 +135,7 @@ func buildFromDefinition(def *configDefinition, file string, globalConfig *Confi
c.Description = def.Description
c.MailOn.Failure = def.MailOn.Failure
c.MailOn.Success = def.MailOn.Success
c.DelaySec = time.Second * time.Duration(def.DelaySec)
c.Delay = time.Second * time.Duration(def.DelaySec)

if opts != nil && opts.headOnly {
return c, nil
Expand Down Expand Up @@ -308,9 +308,11 @@ func buildStep(variables []string, def *stepDef) (*Step, error) {
Limit: def.RetryPolicy.Limit,
}
}
if def.RepeatPolicy != nil {
step.RepeatPolicy.Repeat = def.RepeatPolicy.Repeat
step.RepeatPolicy.Interval = time.Second * time.Duration(def.RepeatPolicy.IntervalSec)
}
step.MailOnError = def.MailOnError
step.Repeat = def.Repeat
step.RepeatInterval = time.Second * time.Duration(def.RepeatIntervalSec)
step.Preconditions = loadPreCondition(def.Preconditions)
return step, nil
}
Expand Down
26 changes: 15 additions & 11 deletions internal/config/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,28 @@ type handerOnDef struct {
}

type stepDef struct {
Name string
Description string
Dir string
Command string
Depends []string
ContinueOn *continueOnDef
RetryPolicy *retryPolicyDef
MailOnError bool
Repeat bool
RepeatIntervalSec int
Preconditions []*conditionDef
Name string
Description string
Dir string
Command string
Depends []string
ContinueOn *continueOnDef
RetryPolicy *retryPolicyDef
RepeatPolicy *repeatPolicyDef
MailOnError bool
Preconditions []*conditionDef
}

type continueOnDef struct {
Failure bool
Skipped bool
}

type repeatPolicyDef struct {
Repeat bool
IntervalSec int
}

type retryPolicyDef struct {
Limit int
}
Expand Down
6 changes: 5 additions & 1 deletion internal/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func TestLoadConfig(t *testing.T) {
RetryPolicy: &config.RetryPolicy{
Limit: 2,
},
RepeatPolicy: config.RepeatPolicy{
Repeat: true,
Interval: time.Second * 10,
},
},
{
Name: "2",
Expand Down Expand Up @@ -90,7 +94,7 @@ func TestLoadConfig(t *testing.T) {
Failure: true,
Success: true,
},
DelaySec: time.Second * 1,
Delay: time.Second * 1,
MaxActiveRuns: 1,
Params: []string{"param1", "param2"},
DefaultParams: "param1 param2",
Expand Down
30 changes: 17 additions & 13 deletions internal/config/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,29 @@ import (
)

type Step struct {
Name string
Description string
Variables []string
Dir string
Command string
Args []string
Depends []string
ContinueOn ContinueOn
RetryPolicy *RetryPolicy
MailOnError bool
Repeat bool
RepeatInterval time.Duration
Preconditions []*Condition
Name string
Description string
Variables []string
Dir string
Command string
Args []string
Depends []string
ContinueOn ContinueOn
RetryPolicy *RetryPolicy
RepeatPolicy RepeatPolicy
MailOnError bool
Preconditions []*Condition
}

type RetryPolicy struct {
Limit int
}

type RepeatPolicy struct {
Repeat bool
Interval time.Duration
}

type ContinueOn struct {
Failure bool
Skipped bool
Expand Down
4 changes: 2 additions & 2 deletions internal/models/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func TestStatusSerialization(t *testing.T) {
Dir: "dir", Command: "echo 1", Args: []string{},
Depends: []string{}, ContinueOn: config.ContinueOn{},
RetryPolicy: &config.RetryPolicy{}, MailOnError: false,
Repeat: false, RepeatInterval: 0, Preconditions: []*config.Condition{},
RepeatPolicy: config.RepeatPolicy{}, Preconditions: []*config.Condition{},
},
},
MailOn: config.MailOn{},
ErrorMail: &config.MailConfig{},
InfoMail: &config.MailConfig{},
Smtp: &config.SmtpConfig{},
DelaySec: 0,
Delay: 0,
HistRetentionDays: 0,
Preconditions: []*config.Condition{},
MaxActiveRuns: 0,
Expand Down
4 changes: 1 addition & 3 deletions internal/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ func (n *Node) Execute() error {
cmd := exec.CommandContext(ctx, n.Command, n.Args...)
n.cmd = cmd
cmd.Dir = n.Dir
for _, v := range n.Variables {
cmd.Env = append(cmd.Env, v)
}
cmd.Env = append(cmd.Env, n.Variables...)

if n.logWriter != nil {
cmd.Stdout = n.logWriter
Expand Down
26 changes: 15 additions & 11 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Scheduler struct {
type Config struct {
LogDir string
MaxActiveRuns int
DelaySec time.Duration
Delay time.Duration
Dry bool
OnExit *config.Step
OnSuccess *config.Step
Expand Down Expand Up @@ -134,18 +134,22 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error {
// nothing to do
case NodeStatus_Error:
sc.lastError = err
fallthrough
default:
if done != nil {
done <- node
}
}
return
}
if node.Repeat {
if node.ReadStatus() != NodeStatus_Cancel {
node.incDoneCount()
time.Sleep(node.RepeatInterval)
continue
}
if node.RepeatPolicy.Repeat {
if err == nil || node.ContinueOn.Failure {
time.Sleep(node.RepeatPolicy.Interval)
continue
}
}
if err != nil {
if done != nil {
done <- node
}
return
}
break
}
Expand All @@ -155,7 +159,7 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error {
}
}(node)

time.Sleep(sc.DelaySec)
time.Sleep(sc.Delay)
}

time.Sleep(sc.pause)
Expand Down
62 changes: 56 additions & 6 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,10 @@ func TestSchedulerRetrySuccess(t *testing.T) {
defer os.Remove(tmpDir)

go func() {
select {
case <-time.After(time.Millisecond * 300):
f, err := os.Create(tmpFile)
require.NoError(t, err)
f.Close()
}
<-time.After(time.Millisecond * 300)
f, err := os.Create(tmpFile)
require.NoError(t, err)
f.Close()
}()

g, sc, err := testSchedule(t,
Expand Down Expand Up @@ -402,6 +400,58 @@ func TestSchedulerOnFailure(t *testing.T) {
assert.Equal(t, sc.HanderNode(constants.OnCancel).ReadStatus(), scheduler.NodeStatus_None)
}

func TestRepeat(t *testing.T) {
g, _ := scheduler.NewExecutionGraph(
&config.Step{
Name: "1",
Command: "sleep",
Args: []string{"1"},
RepeatPolicy: config.RepeatPolicy{
Repeat: true,
Interval: time.Millisecond * 300,
},
},
)
sc := scheduler.New(&scheduler.Config{})

done := make(chan bool)
go func() {
<-time.After(time.Millisecond * 3000)
sc.Cancel(g, done)
}()

err := sc.Schedule(g, nil)
<-done // Wait for canceling finished
require.NoError(t, err)

nodes := g.Nodes()

assert.Equal(t, sc.Status(g), scheduler.SchedulerStatus_Cancel)
assert.Equal(t, nodes[0].Status, scheduler.NodeStatus_Cancel)
assert.Equal(t, nodes[0].DoneCount, 2)
}

func TestRepeatFail(t *testing.T) {
g, _ := scheduler.NewExecutionGraph(
&config.Step{
Name: "1",
Command: testCommandFail,
RepeatPolicy: config.RepeatPolicy{
Repeat: true,
Interval: time.Millisecond * 300,
},
},
)
sc := scheduler.New(&scheduler.Config{})
err := sc.Schedule(g, nil)
require.Error(t, err)

nodes := g.Nodes()
assert.Equal(t, sc.Status(g), scheduler.SchedulerStatus_Error)
assert.Equal(t, nodes[0].Status, scheduler.NodeStatus_Error)
assert.Equal(t, nodes[0].DoneCount, 1)
}

func testSchedule(t *testing.T, steps ...*config.Step) (
*scheduler.ExecutionGraph, *scheduler.Scheduler, error,
) {
Expand Down
3 changes: 3 additions & 0 deletions tests/testdata/config_load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ steps:
skipped: true
retryPolicy:
limit: 2
repeatPolicy:
repeat: true
intervalSec: 10
preconditions:
- condition: "`echo test`"
expected: test
Expand Down

0 comments on commit 6e19286

Please sign in to comment.