Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1544 from IzabellaRaulin/task_starts_immediately_…
Browse files Browse the repository at this point in the history
…rebased

Schedule workflow execution immediately
  • Loading branch information
IzabellaRaulin authored Mar 7, 2017
2 parents 58fb94c + 9677fd5 commit 0b2c031
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 49 deletions.
2 changes: 1 addition & 1 deletion control/strategy/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/intelsdi-x/snap/core"
)

// lru provides a stragey that selects the least recently used available plugin.
// lru provides a strategy that selects the least recently used available plugin.
type lru struct {
*cache
logger *log.Entry
Expand Down
2 changes: 1 addition & 1 deletion control/strategy/sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
ErrCacheDoesNotExist = errors.New("cache does not exist")
)

// sticky provides a stragey that ... concurrency count is 1
// sticky provides a strategy that ... concurrency count is 1
type sticky struct {
plugins map[string]AvailablePlugin
metricCache map[string]*cache
Expand Down
45 changes: 34 additions & 11 deletions mgmt/rest/client/client_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,25 @@ func TestSnapClient(t *testing.T) {
})
})

Convey("unload one of collector plugin", func() {
p := c.GetPlugins(false)
So(p.Err, ShouldBeNil)
So(len(p.LoadedPlugins), ShouldEqual, 3)

p2 := c.UnloadPlugin("collector", "mock", 2)
So(p2.Err, ShouldBeNil)
So(p2.Name, ShouldEqual, "mock")
So(p2.Version, ShouldEqual, 2)
So(p2.Type, ShouldEqual, "collector")

Convey("there should be two loaded plugins", func() {
p = c.GetPlugins(false)
So(p.Err, ShouldBeNil)
So(len(p.LoadedPlugins), ShouldEqual, 2)
So(p.AvailablePlugins, ShouldBeEmpty)
})
})

Convey("Tasks", func() {
Convey("Passing a bad task manifest", func() {
wfb := getWMFromSample("bad.json")
Expand Down Expand Up @@ -374,6 +393,7 @@ func TestSnapClient(t *testing.T) {
correctSchedule := &Schedule{Type: "simple", Interval: "1s"}
tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0)
So(tt.Err, ShouldBeNil)
So(tt.State, ShouldEqual, "Running")
})

Convey("Creating a task with correct configuration for windowed schedule", func() {
Expand All @@ -384,17 +404,19 @@ func TestSnapClient(t *testing.T) {
StopTimestamp: &stopTime}
tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0)
So(tt.Err, ShouldBeNil)
So(tt.State, ShouldEqual, "Running")
})

Convey("Creating a task with correct configuration for cron schedule", func() {
correctSchedule := &Schedule{Type: "cron", Interval: "1 1 1 1 1 1"}
tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0)
So(tt.Err, ShouldBeNil)
So(tt.State, ShouldEqual, "Running")
})
})

tf := c.CreateTask(sch, wf, "baron", "", false, 0)
Convey("valid task not started on creation", func() {
tf := c.CreateTask(sch, wf, "baron", "", false, 0)
So(tf.Err, ShouldBeNil)
So(tf.Name, ShouldEqual, "baron")
So(tf.State, ShouldEqual, "Stopped")
Expand Down Expand Up @@ -436,8 +458,8 @@ func TestSnapClient(t *testing.T) {
})
})

tt := c.CreateTask(sch, wf, "baron", "", true, 0)
Convey("valid task started on creation", func() {
tt := c.CreateTask(sch, wf, "baron", "", true, 0)
So(tt.Err, ShouldBeNil)
So(tt.Name, ShouldEqual, "baron")
So(tt.State, ShouldEqual, "Running")
Expand Down Expand Up @@ -533,6 +555,7 @@ func TestSnapClient(t *testing.T) {

a := new(ea)
r := c.WatchTask(tf.ID)
So(r.Err, ShouldBeNil)
wait := make(chan struct{})
go func() {
for {
Expand All @@ -554,10 +577,11 @@ func TestSnapClient(t *testing.T) {
So(startResp.Err, ShouldBeNil)
<-wait
a.Lock()
defer a.Unlock()

So(len(a.events), ShouldEqual, 5)
a.Unlock()
So(a.events[0], ShouldEqual, "task-started")
for x := 2; x <= 4; x++ {
for x := 1; x < 5; x++ {
So(a.events[x], ShouldEqual, "metric-event")
}
})
Expand All @@ -570,16 +594,15 @@ func TestSnapClient(t *testing.T) {
So(p.Err, ShouldNotBeNil)
So(p.Err.Error(), ShouldEqual, "plugin not found")
})
Convey("unload already unloaded plugin", func() {
p := c.UnloadPlugin("collector", "mock", 2)
So(p.Err, ShouldNotBeNil)
So(p.Err.Error(), ShouldEqual, "plugin not found")
})
Convey("unload one of multiple", func() {
p1 := c.GetPlugins(false)
So(p1.Err, ShouldBeNil)
So(len(p1.LoadedPlugins), ShouldEqual, 3)

p2 := c.UnloadPlugin("collector", "mock", 2)
So(p2.Err, ShouldBeNil)
So(p2.Name, ShouldEqual, "mock")
So(p2.Version, ShouldEqual, 2)
So(p2.Type, ShouldEqual, "collector")
So(len(p1.LoadedPlugins), ShouldEqual, 2)

p3 := c.UnloadPlugin("publisher", "mock-file", 3)
So(p3.Err, ShouldBeNil)
Expand Down
4 changes: 3 additions & 1 deletion pkg/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ type Response interface {
}

func waitOnInterval(last time.Time, i time.Duration) (uint, time.Time) {
// first run
if (last == time.Time{}) {
time.Sleep(i)
// for the first run, do not wait on interval
// and schedule workflow execution immediately
return uint(0), time.Now()
}
// Get the difference in time.Duration since last in nanoseconds (int64)
Expand Down
74 changes: 71 additions & 3 deletions pkg/schedule/simple_schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,88 @@ func TestSimpleSchedule(t *testing.T) {
r := s.Wait(last)
after := time.Since(before)

So(r.State(), ShouldEqual, Active)
So(r.Missed(), ShouldResemble, uint(4))
So(r.Error(), ShouldEqual, nil)
So(r.State(), ShouldEqual, Active)
So(r.Missed(), ShouldEqual, 4)
// We are ok at this precision with being within 10% over or under (10ms)
afterMS := after.Nanoseconds() / 1000 / 1000
So(afterMS, ShouldBeGreaterThan, shouldWait-10)
So(afterMS, ShouldBeLessThan, shouldWait+10)
})

Convey("invalid schedule", func() {
s := NewSimpleSchedule(0)
err := s.Validate()
So(err, ShouldResemble, ErrInvalidInterval)
})
})
Convey("Simple schedule with no misses", t, func() {
interval := time.Millisecond * 10
s := NewSimpleSchedule(interval)

err := s.Validate()
So(err, ShouldBeNil)

var r []Response
last := *new(time.Time)

before := time.Now()
for len(r) <= 10 {
r1 := s.Wait(last)
last = time.Now()
r = append(r, r1)
}

var missed uint
for _, x := range r {
missed += x.Missed()
}
So(missed, ShouldEqual, 0)

// the task should start immediately
So(
r[0].LastTime().Sub(before).Seconds(),
ShouldBeBetweenOrEqual,
0,
(interval).Seconds(),
)
})
Convey("Simple schedule with a few misses", t, func() {
interval := time.Millisecond * 10
s := NewSimpleSchedule(interval)

err := s.Validate()
So(err, ShouldBeNil)

var r []Response
last := *new(time.Time)

before := time.Now()
for len(r) <= 10 {
r1 := s.Wait(last)
last = time.Now()
r = append(r, r1)
// make it miss some
if len(r) == 3 || len(r) == 7 {
time.Sleep(s.Interval)
}
if len(r) == 9 {
// Miss two
time.Sleep(s.Interval * 2)
}
}

var missed uint
for _, x := range r {
missed += x.Missed()
}
So(missed, ShouldEqual, 4)

// the task should fire immediately
So(
r[0].LastTime().Sub(before).Seconds(),
ShouldBeBetweenOrEqual,
0,
(interval).Seconds(),
)
})
}
17 changes: 5 additions & 12 deletions pkg/schedule/windowed_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,12 @@ func (w *WindowedSchedule) Wait(last time.Time) Response {
}).Debug("Waiting for window to start")
time.Sleep(wait)
}
if (last == time.Time{}) {
logger.WithFields(log.Fields{
"_block": "windowed-wait",
}).Debug("Last was unset using start time")
last = *w.StartTime
}
} else {
if (last == time.Time{}) {
logger.WithFields(log.Fields{
"_block": "windowed-wait",
}).Debug("Last was unset using start time")
last = time.Now()
}
// This has no start like a simple schedule, so execution starts immediately
logger.WithFields(log.Fields{
"_block": "windowed-wait",
"sleep-duration": 0,
}).Debug("Window start time not defined, start execution immediately")
}

// If within the window we wait our interval and return
Expand Down
Loading

0 comments on commit 0b2c031

Please sign in to comment.