Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle potential panic in cron parsing #4224

Merged
merged 6 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/gorhill/cronexpr"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

const (
Expand Down Expand Up @@ -537,14 +538,14 @@ func (p *PeriodicConfig) Canonicalize() {
// passed time. If no matching instance exists, the zero value of time.Time is
// returned. The `time.Location` of the returned value matches that of the
// passed time.
func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
func (p *PeriodicConfig) Next(fromTime time.Time) (time.Time, error) {
if *p.SpecType == PeriodicSpecCron {
if e, err := cronexpr.Parse(*p.Spec); err == nil {
return e.Next(fromTime)
return structs.CronParseNext(e, fromTime, *p.Spec)
}
}

return time.Time{}
return time.Time{}, nil
}

func (p *PeriodicConfig) GetLocation() (*time.Location, error) {
Expand Down
10 changes: 7 additions & 3 deletions command/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,13 @@ func (c *JobRunCommand) Run(args []string) int {
loc, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(loc)
next := job.Periodic.Next(now)
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second)))
next, err := job.Periodic.Next(now)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error determining next launch time: %v", err))
} else {
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second)))
}
}
} else if !paramjob {
c.Ui.Output("Evaluation ID: " + evalID)
Expand Down
10 changes: 6 additions & 4 deletions command/job_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ func (c *JobStatusCommand) Run(args []string) int {
location, err := job.Periodic.GetLocation()
if err == nil {
now := time.Now().In(location)
next := job.Periodic.Next(now)
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
next, err := job.Periodic.Next(now)
if err == nil {
basic = append(basic, fmt.Sprintf("Next Periodic Launch|%s",
fmt.Sprintf("%s (%s from now)",
formatTime(next), formatTimeDifference(now, next, time.Second))))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// tracking it.
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
return fmt.Errorf("failed adding job to periodic dispatcher: %v", err)
}

// Create a watch set
Expand Down
5 changes: 4 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,10 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)

// If it is a periodic job calculate the next launch
if args.Job.IsPeriodic() && args.Job.Periodic.Enabled {
reply.NextPeriodicLaunch = args.Job.Periodic.Next(time.Now().In(args.Job.Periodic.GetLocation()))
reply.NextPeriodicLaunch, err = args.Job.Periodic.Next(time.Now().In(args.Job.Periodic.GetLocation()))
if err != nil {
return fmt.Errorf("Failed to parse cron expression: %v", err)
}
}

reply.FailedTGAllocs = updatedEval.FailedTGAllocs
Expand Down
9 changes: 7 additions & 2 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ func (s *Server) restorePeriodicDispatcher() error {
}

if err := s.periodicDispatcher.Add(job); err != nil {
return err
s.logger.Printf("[ERR] nomad.periodic: %v", err)
continue
}

// We do not need to force run the job since it isn't active.
Expand All @@ -400,7 +401,11 @@ func (s *Server) restorePeriodicDispatcher() error {
}

// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
nextLaunch, err := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
if err != nil {
s.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic launch for job %s: %v", job.NamespacedID(), err)
continue
}

// We skip force launching the job if there should be no next launch
// (the zero case) or if the next launch time is in the future. If it is
Expand Down
13 changes: 9 additions & 4 deletions nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {

// Add or update the job.
p.tracked[tuple] = job
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
next, err := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if err != nil {
return fmt.Errorf("failed adding job %s: %v", job.NamespacedID(), err)
}
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
Expand Down Expand Up @@ -344,9 +347,11 @@ func (p *PeriodicDispatch) run(ctx context.Context) {
func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) {
p.l.Lock()

nextLaunch := job.Periodic.Next(launchTime)
if err := p.heap.Update(job, nextLaunch); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q (%s): %v", job.ID, job.Namespace, err)
nextLaunch, err := job.Periodic.Next(launchTime)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to parse next periodic launch for job %s: %v", job.NamespacedID(), err)
} else if err := p.heap.Update(job, nextLaunch); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %s: %v", job.NamespacedID(), err)
}

// If the job prohibits overlapping and there are running children, we skip
Expand Down
33 changes: 27 additions & 6 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,14 @@ type Job struct {
JobModifyIndex uint64
}

// NamespacedID returns the namespaced id useful for logging
func (j *Job) NamespacedID() *NamespacedID {
return &NamespacedID{
ID: j.ID,
Namespace: j.Namespace,
}
}

// Canonicalize is used to canonicalize fields in the Job. This should be called
// when registering a Job. A set of warnings are returned if the job was changed
// in anyway that the user should be made aware of.
Expand Down Expand Up @@ -2649,28 +2657,41 @@ func (p *PeriodicConfig) Canonicalize() {
p.location = l
}

// CronParseNext is a helper that parses the next time for the given expression
// but captures any panic that may occur in the underlying library.
func CronParseNext(e *cronexpr.Expression, fromTime time.Time, spec string) (t time.Time, err error) {
defer func() {
if recover() != nil {
t = time.Time{}
err = fmt.Errorf("failed parsing cron expression: %q", spec)
}
}()

return e.Next(fromTime), nil
}

// Next returns the closest time instant matching the spec that is after the
// passed time. If no matching instance exists, the zero value of time.Time is
// returned. The `time.Location` of the returned value matches that of the
// passed time.
func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
func (p *PeriodicConfig) Next(fromTime time.Time) (time.Time, error) {
switch p.SpecType {
case PeriodicSpecCron:
if e, err := cronexpr.Parse(p.Spec); err == nil {
return e.Next(fromTime)
return CronParseNext(e, fromTime, p.Spec)
}
case PeriodicSpecTest:
split := strings.Split(p.Spec, ",")
if len(split) == 1 && split[0] == "" {
return time.Time{}
return time.Time{}, nil
}

// Parse the times
times := make([]time.Time, len(split))
for i, s := range split {
unix, err := strconv.Atoi(s)
if err != nil {
return time.Time{}
return time.Time{}, nil
}

times[i] = time.Unix(int64(unix), 0)
Expand All @@ -2679,12 +2700,12 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
// Find the next match
for _, next := range times {
if fromTime.Before(next) {
return next
return next, nil
}
}
}

return time.Time{}
return time.Time{}, nil
}

// GetLocation returns the location to use for determining the time zone to run
Expand Down
56 changes: 43 additions & 13 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,15 +2009,44 @@ func TestPeriodicConfig_ValidCron(t *testing.T) {
}

func TestPeriodicConfig_NextCron(t *testing.T) {
require := require.New(t)

type testExpectation struct {
Time time.Time
HasError bool
ErrorMsg string
}

from := time.Date(2009, time.November, 10, 23, 22, 30, 0, time.UTC)
specs := []string{"0 0 29 2 * 1980", "*/5 * * * *"}
expected := []time.Time{{}, time.Date(2009, time.November, 10, 23, 25, 0, 0, time.UTC)}
specs := []string{"0 0 29 2 * 1980",
"*/5 * * * *",
"1 15-0 * * 1-5"}
expected := []*testExpectation{
{
Time: time.Time{},
HasError: false,
},
{
Time: time.Date(2009, time.November, 10, 23, 25, 0, 0, time.UTC),
HasError: false,
},
{
Time: time.Time{},
HasError: true,
ErrorMsg: "failed parsing cron expression",
},
}

for i, spec := range specs {
p := &PeriodicConfig{Enabled: true, SpecType: PeriodicSpecCron, Spec: spec}
p.Canonicalize()
n := p.Next(from)
if expected[i] != n {
t.Fatalf("Next(%v) returned %v; want %v", from, n, expected[i])
n, err := p.Next(from)
nextExpected := expected[i]

require.Equal(nextExpected.Time, n)
require.Equal(err != nil, nextExpected.HasError)
if err != nil {
require.True(strings.Contains(err.Error(), nextExpected.ErrorMsg))
}
}
}
Expand All @@ -2034,6 +2063,8 @@ func TestPeriodicConfig_ValidTimeZone(t *testing.T) {
}

func TestPeriodicConfig_DST(t *testing.T) {
require := require.New(t)

// On Sun, Mar 12, 2:00 am 2017: +1 hour UTC
p := &PeriodicConfig{
Enabled: true,
Expand All @@ -2050,15 +2081,14 @@ func TestPeriodicConfig_DST(t *testing.T) {
e1 := time.Date(2017, time.March, 11, 10, 0, 0, 0, time.UTC)
e2 := time.Date(2017, time.March, 12, 9, 0, 0, 0, time.UTC)

n1 := p.Next(t1).UTC()
n2 := p.Next(t2).UTC()
n1, err := p.Next(t1)
require.Nil(err)

if !reflect.DeepEqual(e1, n1) {
t.Fatalf("Got %v; want %v", n1, e1)
}
if !reflect.DeepEqual(e2, n2) {
t.Fatalf("Got %v; want %v", n1, e1)
}
n2, err := p.Next(t2)
require.Nil(err)

require.Equal(e1, n1.UTC())
require.Equal(e2, n2.UTC())
}

func TestRestartPolicy_Validate(t *testing.T) {
Expand Down