Skip to content

Commit

Permalink
Add possibility to kill tasks
Browse files Browse the repository at this point in the history
Use either the REST API or hermit to kill a task you no longer wish to
run.

POST /task/{taskID}/kill
hermit kill {taskID}

Add button in the GUI to kill a task
  • Loading branch information
Rickard Dybeck committed Sep 23, 2016
1 parent 1a48691 commit 4a118ca
Show file tree
Hide file tree
Showing 19 changed files with 524 additions and 144 deletions.
18 changes: 18 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,21 @@ func (c *Client) Version() (string, error) {

return string(b), nil
}

func (c *Client) Kill(taskID string) error {
u := fmt.Sprintf("%s/task/%s/kill", c.endpoint, taskID)
req, err := http.NewRequest("POST", u, nil)
if err != nil {
return err
}

resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("Unexpected status code `%s`", resp.Status)
}

return nil
}
18 changes: 18 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ func TestClient_Tasks(t *testing.T) {
t.Fail()
}
}

func TestClient_KillTask(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
}))
defer ts.Close()

var httpClient http.Client

c, err := New(ts.URL, &httpClient)
if err != nil {
t.Fatal(err)
}

if err := c.Kill("1234"); err != nil {
t.Fatal(err)
}
}
38 changes: 37 additions & 1 deletion cmd/hermit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"crypto/tls"
"errors"
"flag"
"fmt"
"math"
Expand Down Expand Up @@ -44,11 +45,15 @@ func main() {
"ls": newListCommand(ec),
"logs": newLogsCommand(ec),
"version": newVersionCommand(ec),
"kill": newKillCommand(ec),
}

if len(os.Args) < 2 {
exitWithError(errors.New("No command given"))
}
cmd, ok := cmds[os.Args[1]]
if !ok {
os.Exit(1)
exitWithError(errors.New("Unknown command"))
}

cmd.Parse(os.Args[2:])
Expand Down Expand Up @@ -274,6 +279,37 @@ func (cmd *versionCommand) Run() {
fmt.Printf(" Version: %s", b)
}

type killCommand struct {
flags *flag.FlagSet
client *client.Client
}

func newKillCommand(c *client.Client) *killCommand {
return &killCommand{
flags: newFlagSet("kill", "hermit kill TASKID", "Kill a given task"),
client: c,
}
}

func (cmd *killCommand) Parse(args []string) {
cmd.flags.Parse(args)
}

func (cmd *killCommand) Run() {
taskID := cmd.flags.Arg(0)
if taskID == "" {
cmd.flags.Usage()
os.Exit(1)
}

err := cmd.client.Kill(taskID)
if err != nil {
exitWithError(err)
}

fmt.Printf("Killed task %s\n", taskID)
}

type ByLastUpdated []eremetic.Task

func (t ByLastUpdated) Len() int { return len(t) }
Expand Down
176 changes: 176 additions & 0 deletions mock/mesos_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package mock

import (
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/scheduler"
)

// MesosScheduler implements a mocked mesos scheduler iterface for testing
type MesosScheduler struct {
AbortFn func() (mesosproto.Status, error)
AbortFnInvoked bool
AcceptOffersFn func([]*mesosproto.OfferID, []*mesosproto.Offer_Operation, *mesosproto.Filters) (mesosproto.Status, error)
AcceptOffersFnInvoked bool
DeclineOfferFn func(*mesosproto.OfferID, *mesosproto.Filters) (mesosproto.Status, error)
DeclineOfferFnInvoked bool
JoinFn func() (mesosproto.Status, error)
JoinFnInvoked bool
KillTaskFn func(*mesosproto.TaskID) (mesosproto.Status, error)
KillTaskFnInvoked bool
ReconcileTasksFn func([]*mesosproto.TaskStatus) (mesosproto.Status, error)
ReconcileTasksFnInvoked bool
RequestResourcesFn func([]*mesosproto.Request) (mesosproto.Status, error)
RequestResourcesFnInvoked bool
ReviveOffersFn func() (mesosproto.Status, error)
ReviveOffersFnInvoked bool
RunFn func() (mesosproto.Status, error)
RunFnInvoked bool
StartFn func() (mesosproto.Status, error)
StartFnInvoked bool
StopFn func(bool) (mesosproto.Status, error)
StopFnInvoked bool
SendFrameworkMessageFn func(*mesosproto.ExecutorID, *mesosproto.SlaveID, string) (mesosproto.Status, error)
SendFrameworkMessageFnInvoked bool
LaunchTasksFn func([]*mesosproto.OfferID, []*mesosproto.TaskInfo, *mesosproto.Filters) (mesosproto.Status, error)
LaunchTasksFnInvoked bool
RegisteredFn func(scheduler.SchedulerDriver, *mesosproto.FrameworkID, *mesosproto.MasterInfo)
RegisteredFnInvoked bool
ReregisteredFn func(scheduler.SchedulerDriver, *mesosproto.MasterInfo)
ReregisteredFnInvoked bool
DisconnectedFn func(scheduler.SchedulerDriver)
DisconnectedFnInvoked bool
ResourceOffersFn func(scheduler.SchedulerDriver, []*mesosproto.Offer)
ResourceOffersFnInvoked bool
OfferRescindedFn func(scheduler.SchedulerDriver, *mesosproto.OfferID)
OfferRescindedFnInvoked bool
StatusUpdateFn func(scheduler.SchedulerDriver, *mesosproto.TaskStatus)
StatusUpdateFnInvoked bool
FrameworkMessageFn func(scheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, string)
FrameworkMessageFnInvoked bool
SlaveLostFn func(scheduler.SchedulerDriver, *mesosproto.SlaveID)
SlaveLostFnInvoked bool
ExecutorLostFn func(scheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, int)
ExecutorLostFnInvoked bool
ErrorFn func(scheduler.SchedulerDriver, string)
ErrorFnInvoked bool
}

// NewMesosScheduler returns a new mocked mesos scheduler
func NewMesosScheduler() *MesosScheduler {
return &MesosScheduler{}
}

func (m *MesosScheduler) Abort() (stat mesosproto.Status, err error) {
m.AbortFnInvoked = true
return m.AbortFn()
}

func (m *MesosScheduler) AcceptOffers(offerIds []*mesosproto.OfferID, operations []*mesosproto.Offer_Operation, filters *mesosproto.Filters) (mesosproto.Status, error) {
m.AcceptOffersFnInvoked = true
return m.AcceptOffersFn(offerIds, operations, filters)
}

func (m *MesosScheduler) DeclineOffer(offerID *mesosproto.OfferID, filters *mesosproto.Filters) (mesosproto.Status, error) {
m.DeclineOfferFnInvoked = true
return m.DeclineOfferFn(offerID, filters)
}

func (m *MesosScheduler) Join() (mesosproto.Status, error) {
m.JoinFnInvoked = true
return m.JoinFn()
}

func (m *MesosScheduler) KillTask(id *mesosproto.TaskID) (mesosproto.Status, error) {
m.KillTaskFnInvoked = true
return m.KillTaskFn(id)
}

func (m *MesosScheduler) ReconcileTasks(ts []*mesosproto.TaskStatus) (mesosproto.Status, error) {
m.ReconcileTasksFnInvoked = true
return m.ReconcileTasksFn(ts)
}

func (m *MesosScheduler) RequestResources(r []*mesosproto.Request) (mesosproto.Status, error) {
m.RequestResourcesFnInvoked = true
return m.RequestResourcesFn(r)
}

func (m *MesosScheduler) ReviveOffers() (mesosproto.Status, error) {
m.ReviveOffersFnInvoked = true
return m.ReviveOffersFn()
}

func (m *MesosScheduler) Run() (mesosproto.Status, error) {
m.RunFnInvoked = true
return m.RunFn()
}

func (m *MesosScheduler) Start() (mesosproto.Status, error) {
m.StartFnInvoked = true
return m.StartFn()
}

func (m *MesosScheduler) Stop(b bool) (mesosproto.Status, error) {
m.StopFnInvoked = true
return m.StopFn(b)
}

func (m *MesosScheduler) SendFrameworkMessage(eID *mesosproto.ExecutorID, sID *mesosproto.SlaveID, s string) (mesosproto.Status, error) {
m.SendFrameworkMessageFnInvoked = true
return m.SendFrameworkMessageFn(eID, sID, s)
}

func (m *MesosScheduler) LaunchTasks(o []*mesosproto.OfferID, t []*mesosproto.TaskInfo, f *mesosproto.Filters) (mesosproto.Status, error) {
m.LaunchTasksFnInvoked = true
return m.LaunchTasksFn(o, t, f)
}

func (m *MesosScheduler) Registered(s scheduler.SchedulerDriver, f *mesosproto.FrameworkID, minfo *mesosproto.MasterInfo) {
m.RegisteredFnInvoked = true
m.RegisteredFn(s, f, minfo)
}

func (m *MesosScheduler) Reregistered(s scheduler.SchedulerDriver, info *mesosproto.MasterInfo) {
m.ReregisteredFnInvoked = true
m.ReregisteredFn(s, info)
}

func (m *MesosScheduler) Disconnected(s scheduler.SchedulerDriver) {
m.DisconnectedFnInvoked = true
m.DisconnectedFn(s)
}

func (m *MesosScheduler) ResourceOffers(s scheduler.SchedulerDriver, o []*mesosproto.Offer) {
m.ResourceOffersFnInvoked = true
m.ResourceOffersFn(s, o)
}

func (m *MesosScheduler) OfferRescinded(s scheduler.SchedulerDriver, o *mesosproto.OfferID) {
m.OfferRescindedFnInvoked = true
m.OfferRescindedFn(s, o)
}

func (m *MesosScheduler) StatusUpdate(s scheduler.SchedulerDriver, ts *mesosproto.TaskStatus) {
m.StatusUpdateFnInvoked = true
m.StatusUpdateFn(s, ts)
}

func (m *MesosScheduler) FrameworkMessage(sd scheduler.SchedulerDriver, eID *mesosproto.ExecutorID, sID *mesosproto.SlaveID, s string) {
m.FrameworkMessageFnInvoked = true
m.FrameworkMessageFn(sd, eID, sID, s)
}

func (m *MesosScheduler) SlaveLost(s scheduler.SchedulerDriver, sID *mesosproto.SlaveID) {
m.SlaveLostFnInvoked = true
m.SlaveLostFn(s, sID)
}

func (m *MesosScheduler) ExecutorLost(sd scheduler.SchedulerDriver, eID *mesosproto.ExecutorID, sID *mesosproto.SlaveID, i int) {
m.ExecutorLostFnInvoked = true
m.ExecutorLostFn(sd, eID, sID, i)
}

func (m *MesosScheduler) Error(d scheduler.SchedulerDriver, msg string) {
m.ErrorFnInvoked = true
m.ErrorFn(d, msg)
}
10 changes: 10 additions & 0 deletions mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ import (
type Scheduler struct {
ScheduleTaskFn func(req eremetic.Request) (string, error)
ScheduleTaskInvoked bool
KillFn func(id string) error
KillInvoked bool
}

func (s *Scheduler) ScheduleTask(req eremetic.Request) (string, error) {
s.ScheduleTaskInvoked = true
return s.ScheduleTaskFn(req)
}

func (s *Scheduler) Kill(id string) error {
s.KillInvoked = true
return s.KillFn(id)
}

type TaskDB struct {
CleanFn func() error
CloseFn func()
Expand Down Expand Up @@ -60,7 +67,10 @@ func (s *ErrScheduler) ScheduleTask(request eremetic.Request) (string, error) {

}
return "eremetic-task.mock", nil
}

func (s *ErrScheduler) Kill(_id string) error {
return nil
}

type ErrorReader struct{}
Expand Down
1 change: 1 addition & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package eremetic

type Scheduler interface {
ScheduleTask(request Request) (string, error)
Kill(taskID string) error
}
1 change: 1 addition & 0 deletions scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
// Run the eremetic scheduler
func Run(s *eremeticScheduler, settings *Settings) {
driver, err := createDriver(s, settings)
s.driver = driver

if err != nil {
logrus.WithError(err).Error("Unable to create scheduler driver")
Expand Down
Loading

0 comments on commit 4a118ca

Please sign in to comment.