Skip to content

Commit

Permalink
Switch to custom task state enum
Browse files Browse the repository at this point in the history
  • Loading branch information
David Keijser committed Aug 17, 2016
1 parent 560ab7f commit 0713c06
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 51 deletions.
13 changes: 6 additions & 7 deletions database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/klarna/eremetic/types"
mesos "github.com/mesos/mesos-go/mesosproto"
. "github.com/smartystreets/goconvey/convey"
)

Expand All @@ -20,7 +19,7 @@ func setup() error {
func TestDatabase(t *testing.T) {
status := []types.Status{
types.Status{
Status: mesos.TaskState_TASK_RUNNING.String(),
Status: types.TaskState_TASK_RUNNING,
Time: time.Now().Unix(),
},
}
Expand Down Expand Up @@ -106,15 +105,15 @@ func TestDatabase(t *testing.T) {
ID: "1234",
Status: []types.Status{
types.Status{
Status: mesos.TaskState_TASK_STAGING.String(),
Status: types.TaskState_TASK_STAGING,
Time: time.Now().Unix(),
},
types.Status{
Status: mesos.TaskState_TASK_RUNNING.String(),
Status: types.TaskState_TASK_RUNNING,
Time: time.Now().Unix(),
},
types.Status{
Status: mesos.TaskState_TASK_FINISHED.String(),
Status: types.TaskState_TASK_FINISHED,
Time: time.Now().Unix(),
},
},
Expand All @@ -125,11 +124,11 @@ func TestDatabase(t *testing.T) {
ID: "2345",
Status: []types.Status{
types.Status{
Status: mesos.TaskState_TASK_STAGING.String(),
Status: types.TaskState_TASK_STAGING,
Time: time.Now().Unix(),
},
types.Status{
Status: mesos.TaskState_TASK_RUNNING.String(),
Status: types.TaskState_TASK_RUNNING,
Time: time.Now().Unix(),
},
},
Expand Down
2 changes: 1 addition & 1 deletion handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NotifyCallback(task *types.EremeticTask) {

cbData := callbackData{
Time: task.Status[len(task.Status)-1].Time,
Status: task.Status[len(task.Status)-1].Status,
Status: string(task.Status[len(task.Status)-1].Status),
TaskID: task.ID,
}

Expand Down
3 changes: 1 addition & 2 deletions handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/gorilla/mux"
"github.com/klarna/eremetic/database"
"github.com/klarna/eremetic/types"
mesos "github.com/mesos/mesos-go/mesosproto"
. "github.com/smartystreets/goconvey/convey"
)

Expand All @@ -36,7 +35,7 @@ func TestHandling(t *testing.T) {
scheduler := &mockScheduler{}
status := []types.Status{
types.Status{
Status: mesos.TaskState_TASK_RUNNING.String(),
Status: types.TaskState_TASK_RUNNING,
Time: time.Now().Unix(),
},
}
Expand Down
7 changes: 3 additions & 4 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/klarna/eremetic/database"
"github.com/klarna/eremetic/types"
mesos "github.com/mesos/mesos-go/mesosproto"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -41,7 +40,7 @@ func TestReconcile(t *testing.T) {
panic("mock error")
}
t.UpdateStatus(types.Status{
Status: mesos.TaskState_TASK_RUNNING.String(),
Status: types.TaskState_TASK_RUNNING,
Time: time.Now().Unix() + 1,
})
database.PutTask(&t)
Expand All @@ -51,7 +50,7 @@ func TestReconcile(t *testing.T) {
ID: "1234",
Status: []types.Status{
types.Status{
Status: mesos.TaskState_TASK_STAGING.String(),
Status: types.TaskState_TASK_STAGING,
Time: time.Now().Unix(),
},
},
Expand All @@ -73,7 +72,7 @@ func TestReconcile(t *testing.T) {
ID: "1234",
Status: []types.Status{
types.Status{
Status: mesos.TaskState_TASK_STAGING.String(),
Status: types.TaskState_TASK_STAGING,
Time: time.Now().Unix(),
},
},
Expand Down
15 changes: 8 additions & 7 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ loop:
// StatusUpdate takes care of updating the status
func (s *eremeticScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos.TaskStatus) {
id := status.TaskId.GetValue()
newState := types.TaskState(status.State.String())

log.Debugf("Received task status [%s] for task [%s]", status.State.String(), id)

Expand All @@ -138,29 +139,29 @@ func (s *eremeticScheduler) StatusUpdate(driver sched.SchedulerDriver, status *m
}
}

if !task.IsRunning() && *status.State == mesos.TaskState_TASK_RUNNING {
if !task.IsRunning() && newState == types.TaskState_TASK_RUNNING {
TasksRunning.Inc()
}

if types.IsTerminal(status.State.String()) {
TasksTerminated.With(prometheus.Labels{"status": status.State.String()}).Inc()
if types.IsTerminal(newState) {
TasksTerminated.With(prometheus.Labels{"status": string(newState)}).Inc()
if task.WasRunning() {
TasksRunning.Dec()
}
}

task.UpdateStatus(types.Status{
Status: status.State.String(),
Status: newState,
Time: time.Now().Unix(),
})

if *status.State == mesos.TaskState_TASK_FAILED && !task.WasRunning() {
if newState == types.TaskState_TASK_FAILED && !task.WasRunning() {
if task.Retry >= maxRetries {
log.Warnf("giving up on %s after %d retry attempts", id, task.Retry)
} else {
log.Infof("task %s was never running. re-scheduling", id)
task.UpdateStatus(types.Status{
Status: mesos.TaskState_TASK_STAGING.String(),
Status: types.TaskState_TASK_STAGING,
Time: time.Now().Unix(),
})
task.Retry++
Expand All @@ -171,7 +172,7 @@ func (s *eremeticScheduler) StatusUpdate(driver sched.SchedulerDriver, status *m
}
}

if types.IsTerminal(status.State.String()) {
if types.IsTerminal(newState) {
handler.NotifyCallback(&task)
}

Expand Down
12 changes: 6 additions & 6 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestScheduler(t *testing.T) {
})
task, _ := database.ReadTask(id)
So(len(task.Status), ShouldEqual, 1)
So(task.Status[0].Status, ShouldEqual, mesos.TaskState_TASK_RUNNING.String())
So(task.Status[0].Status, ShouldEqual, types.TaskState_TASK_RUNNING)

s.StatusUpdate(nil, &mesos.TaskStatus{
TaskId: &mesos.TaskID{
Expand All @@ -112,8 +112,8 @@ func TestScheduler(t *testing.T) {
task, _ = database.ReadTask(id)

So(len(task.Status), ShouldEqual, 2)
So(task.Status[0].Status, ShouldEqual, mesos.TaskState_TASK_RUNNING.String())
So(task.Status[1].Status, ShouldEqual, mesos.TaskState_TASK_FAILED.String())
So(task.Status[0].Status, ShouldEqual, types.TaskState_TASK_RUNNING)
So(task.Status[1].Status, ShouldEqual, types.TaskState_TASK_FAILED)
})

Convey("Failing immediatly", func() {
Expand All @@ -125,9 +125,9 @@ func TestScheduler(t *testing.T) {
State: mesos.TaskState_TASK_FAILED.Enum(),
})
task, _ := database.ReadTask(id)
So(len(task.Status), ShouldEqual, 2)
So(task.Status[0].Status, ShouldEqual, mesos.TaskState_TASK_FAILED.String())
So(task.Status[1].Status, ShouldEqual, mesos.TaskState_TASK_STAGING.String())
So(task.Status, ShouldHaveLength, 2)
So(task.Status[0].Status, ShouldEqual, types.TaskState_TASK_FAILED)
So(task.Status[1].Status, ShouldEqual, types.TaskState_TASK_STAGING)

select {
case c := <-s.tasks:
Expand Down
2 changes: 1 addition & 1 deletion scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func createEremeticTask(request types.Request) (types.EremeticTask, error) {

status := []types.Status{
types.Status{
Status: mesos.TaskState_TASK_STAGING.String(),
Status: types.TaskState_TASK_STAGING,
Time: time.Now().Unix(),
},
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestTask(t *testing.T) {

status := []types.Status{
types.Status{
Status: mesos.TaskState_TASK_RUNNING.String(),
Status: types.TaskState_TASK_RUNNING,
Time: time.Now().Unix(),
},
}
Expand Down
15 changes: 14 additions & 1 deletion types/states.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
package types

type TaskState string

const (
TaskState_TASK_STAGING TaskState = "TASK_STAGING"
TaskState_TASK_STARTING TaskState = "TASK_STARTING"
TaskState_TASK_RUNNING TaskState = "TASK_RUNNING"
TaskState_TASK_FINISHED TaskState = "TASK_FINISHED"
TaskState_TASK_FAILED TaskState = "TASK_FAILED"
TaskState_TASK_KILLED TaskState = "TASK_KILLED"
TaskState_TASK_LOST TaskState = "TASK_LOST"
TaskState_TASK_ERROR TaskState = "TASK_ERROR"
)

// IsTerminal takes a string representation of a state and returns whether it
// is terminal or not.
func IsTerminal(state string) bool {
func IsTerminal(state TaskState) bool {
switch state {
case "TASK_LOST", "TASK_KILLED", "TASK_FAILED", "TASK_FINISHED":
return true
Expand Down
10 changes: 4 additions & 6 deletions types/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package types

import (
"time"

mesos "github.com/mesos/mesos-go/mesosproto"
)

type Status struct {
Time int64 `json:"time"`
Status string `json:"status"`
Time int64 `json:"time"`
Status TaskState `json:"status"`
}

type EremeticTask struct {
Expand All @@ -32,7 +30,7 @@ type EremeticTask struct {

func (task *EremeticTask) WasRunning() bool {
for _, s := range task.Status {
if s.Status == mesos.TaskState_TASK_RUNNING.String() {
if s.Status == TaskState_TASK_RUNNING {
return true
}
}
Expand All @@ -52,7 +50,7 @@ func (task *EremeticTask) IsRunning() bool {
return false
}
st := task.Status[len(task.Status)-1]
return st.Status == mesos.TaskState_TASK_RUNNING.String()
return st.Status == TaskState_TASK_RUNNING
}

func (task *EremeticTask) LastUpdated() time.Time {
Expand Down
28 changes: 13 additions & 15 deletions types/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,36 @@ import (
"fmt"
"testing"

mesos "github.com/mesos/mesos-go/mesosproto"

. "github.com/smartystreets/goconvey/convey"
)

func TestTypes(t *testing.T) {
Convey("states", t, func() {
terminalStates := []mesos.TaskState{
mesos.TaskState_TASK_FINISHED,
mesos.TaskState_TASK_FAILED,
mesos.TaskState_TASK_KILLED,
mesos.TaskState_TASK_LOST,
terminalStates := []TaskState{
TaskState_TASK_FINISHED,
TaskState_TASK_FAILED,
TaskState_TASK_KILLED,
TaskState_TASK_LOST,
}

nonTerminalStates := []mesos.TaskState{
mesos.TaskState_TASK_RUNNING,
mesos.TaskState_TASK_STAGING,
mesos.TaskState_TASK_STARTING,
nonTerminalStates := []TaskState{
TaskState_TASK_RUNNING,
TaskState_TASK_STAGING,
TaskState_TASK_STARTING,
}

Convey("IsTerminal", func() {
for _, state := range terminalStates {
test := fmt.Sprintf("Should be true for %s", state.String())
test := fmt.Sprintf("Should be true for %s", state)
Convey(test, func() {
So(IsTerminal(state.String()), ShouldBeTrue)
So(IsTerminal(state), ShouldBeTrue)
})
}

for _, state := range nonTerminalStates {
test := fmt.Sprintf("Should be false for %s", state.String())
test := fmt.Sprintf("Should be false for %s", state)
Convey(test, func() {
So(IsTerminal(state.String()), ShouldBeFalse)
So(IsTerminal(state), ShouldBeFalse)
})
}
})
Expand Down

0 comments on commit 0713c06

Please sign in to comment.