Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split task scheduler #4

Merged
merged 3 commits into from
Oct 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions eremetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
"os/signal"

"github.com/alde/eremetic/handler"
"github.com/alde/eremetic/routes"
Expand Down Expand Up @@ -46,6 +47,18 @@ func main() {
setupLogging()
bind := fmt.Sprintf("%s:%d", viper.GetString("address"), viper.GetInt("port"))

// Catch interrupt
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
s := <-c
if s != os.Interrupt {
return
}

log.Info("Eremetic is shutting down")
}()

router := routes.Create()
log.Infof("listening to %s", bind)
go handler.Run()
Expand Down
29 changes: 29 additions & 0 deletions handler/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package handler

import (
"net"

"github.com/alde/eremetic/zook"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
sched "github.com/mesos/mesos-go/scheduler"
"github.com/spf13/viper"
)

func createDriver(scheduler *eremeticScheduler) (*sched.MesosSchedulerDriver, error) {
publishedAddr := net.ParseIP(viper.GetString("messenger_address"))
bindingPort := uint16(viper.GetInt("messenger_port"))
master := zook.DiscoverMaster(viper.GetString("zookeeper"))

return sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: master,
Framework: &mesos.FrameworkInfo{
Name: proto.String("Eremetic"),
User: proto.String(""),
},
Scheduler: scheduler,
BindingAddress: net.ParseIP("0.0.0.0"),
PublishedAddress: publishedAddr,
BindingPort: bindingPort,
})
}
64 changes: 15 additions & 49 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,11 @@ package handler

import (
"encoding/json"
"net"
"net/http"
"os"
"os/signal"

log "github.com/dmuth/google-go-log4go"

"github.com/alde/eremetic/types"
"github.com/alde/eremetic/zook"
"github.com/golang/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
sched "github.com/mesos/mesos-go/scheduler"
"github.com/spf13/viper"
)

var requests = make(chan *types.Request)
Expand All @@ -35,56 +27,30 @@ func WriteJSON(status int, data interface{}, w http.ResponseWriter) error {

// Run the RequestChannel Listener
func Run() {
log.Debug("Entering handler.Run loop")
for {
select {
case req := <-requests:
log.Debug("Found a request in the queue!")
handle(*req)
}
}
}

func handle(request types.Request) {
publishedAddr := net.ParseIP(viper.GetString("messenger_address"))
bindingPort := uint16(viper.GetInt("messenger_port"))
master := zook.DiscoverMaster(viper.GetString("zookeeper"))
scheduler := createEremeticScheduler(request)
defer close(scheduler.shutdown)
scheduler := createEremeticScheduler()
driver, err := createDriver(scheduler)

driver, err := sched.NewMesosSchedulerDriver(sched.DriverConfig{
Master: master,
Framework: &mesos.FrameworkInfo{
Name: proto.String("Eremetic"),
User: proto.String(""),
},
Scheduler: scheduler,
BindingAddress: net.ParseIP("0.0.0.0"),
PublishedAddress: publishedAddr,
BindingPort: bindingPort,
})
if err != nil {
log.Errorf("Unable to create scheduler driver: %s", err)
return
}

// Catch interrupt
defer close(scheduler.shutdown)
defer driver.Stop(false)

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
s := <-c
if s != os.Interrupt {
return
if status, err := driver.Run(); err != nil {
log.Errorf("Framework stopped with status %s and error: %s\n", status.String(), err.Error())
}

log.Info("Eremetic is shutting down")

// we have shut down
driver.Stop(false)
log.Info("Exiting...")
}()

if status, err := driver.Run(); err != nil {
log.Errorf("Framework stopped with status %s and error: %s\n", status.String(), err.Error())
log.Debug("Entering handler.Run loop")
for {
select {
case req := <-requests:
log.Debug("Found a request in the queue!")
scheduleTasks(scheduler, *req)
}
}
log.Info("Exiting...")
}
89 changes: 23 additions & 66 deletions handler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package handler

import (
"encoding/json"
"fmt"

log "github.com/dmuth/google-go-log4go"

"github.com/alde/eremetic/types"
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
sched "github.com/mesos/mesos-go/scheduler"
)

Expand All @@ -19,15 +16,12 @@ var (

// eremeticScheduler holds the structure of the Eremetic Scheduler
type eremeticScheduler struct {
taskCPUs float64
taskMem float64
dockerImage string
command string
tasksToLaunch int
tasksCreated int
tasksRunning int

eremeticExecutor *mesos.ExecutorInfo
// task to start
tasks chan eremeticTask

// This channel is closed when the program receives an interrupt,
// signalling that the program should shut down.
Expand All @@ -37,30 +31,10 @@ type eremeticScheduler struct {
done chan struct{}
}

func createTaskInfo(taskID int, offer *mesos.Offer, s *eremeticScheduler) *mesos.TaskInfo {
return &mesos.TaskInfo{
TaskId: &mesos.TaskID{
Value: proto.String(fmt.Sprintf("Eremetic-%d: Running '%s' on '%s'", taskID, s.command, s.dockerImage)),
},
SlaveId: offer.SlaveId,
Resources: []*mesos.Resource{
mesosutil.NewScalarResource("cpus", s.taskCPUs),
mesosutil.NewScalarResource("mem", s.taskMem),
},
}
}

func (s *eremeticScheduler) newTaskPrototype(offer *mesos.Offer) *mesos.TaskInfo {
func (s *eremeticScheduler) newTask(offer *mesos.Offer, spec *eremeticTask) *mesos.TaskInfo {
taskID := s.tasksCreated
s.tasksCreated++
return createTaskInfo(taskID, offer, s)
}

func (s *eremeticScheduler) newTask(offer *mesos.Offer) *mesos.TaskInfo {
task := s.newTaskPrototype(offer)
task.Name = proto.String("EREMETIC_" + *task.TaskId.Value)
task.Executor = s.eremeticExecutor
return task
return createTaskInfo(spec, taskID, offer)
}

// Registered is called when the Scheduler is Registered
Expand Down Expand Up @@ -93,23 +67,16 @@ func (s *eremeticScheduler) ResourceOffers(driver sched.SchedulerDriver, offers
close(s.done)
}
continue
case t := <-s.tasks:
log.Debug("Preparing to launch task")
task := s.newTask(offer, &t)
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, []*mesos.TaskInfo{task}, defaultFilter)
continue
default:
}

tasks := []*mesos.TaskInfo{}
for s.tasksToLaunch > 0 {
task := s.newTask(offer)
tasks = append(tasks, task)
s.tasksToLaunch--
}

if len(tasks) == 0 {
log.Debug("No tasks to launch. Declining offer.")
driver.DeclineOffer(offer.Id, defaultFilter)
} else {
log.Debugf("Launching %d tasks.", len(tasks))
driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, defaultFilter)
}
log.Debug("No tasks to launch. Declining offer.")
driver.DeclineOffer(offer.Id, defaultFilter)
}
}

Expand Down Expand Up @@ -139,7 +106,7 @@ func (s *eremeticScheduler) FrameworkMessage(

log.Debug("Getting a framework message")
switch *executorID.Value {
case *s.eremeticExecutor.ExecutorId.Value:
case "eremetic-executor":
var result interface{}
err := json.Unmarshal([]byte(message), &result)
if err != nil {
Expand All @@ -166,28 +133,18 @@ func (s *eremeticScheduler) Error(_ sched.SchedulerDriver, err string) {
log.Debugf("Receiving an error: %s", err)
}

func createEremeticScheduler(request types.Request) *eremeticScheduler {
func createEremeticScheduler() *eremeticScheduler {
s := &eremeticScheduler{
taskCPUs: request.TaskCPUs,
taskMem: request.TaskMem,
dockerImage: request.DockerImage,
command: request.Command,
tasksToLaunch: request.TasksToLaunch,
shutdown: make(chan struct{}),
done: make(chan struct{}),
eremeticExecutor: &mesos.ExecutorInfo{
ExecutorId: &mesos.ExecutorID{Value: proto.String("eremetic-executor")},
Command: &mesos.CommandInfo{
Value: proto.String(request.Command),
},
Container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(request.DockerImage),
},
},
Name: proto.String("Eremetic"),
},
shutdown: make(chan struct{}),
done: make(chan struct{}),
tasks: make(chan eremeticTask, 100),
}
return s
}

func scheduleTasks(s *eremeticScheduler, request types.Request) {
for i := 0; i < request.TasksToLaunch; i++ {
log.Debug("Adding task to queue")
s.tasks <- createEremeticTask(request)
}
}
60 changes: 60 additions & 0 deletions handler/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package handler

import (
"fmt"
log "github.com/dmuth/google-go-log4go"

"github.com/alde/eremetic/types"
"github.com/gogo/protobuf/proto"
"github.com/m4rw3r/uuid"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
)

type eremeticTask struct {
taskCPUs float64
taskMem float64
command *mesos.CommandInfo
container *mesos.ContainerInfo
}

func createEremeticTask(request types.Request) eremeticTask {
task := eremeticTask{
taskCPUs: request.TaskCPUs,
taskMem: request.TaskMem,
command: &mesos.CommandInfo{
Value: proto.String(request.Command),
User: proto.String("root"),
},
container: &mesos.ContainerInfo{
Type: mesos.ContainerInfo_DOCKER.Enum(),
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(request.DockerImage),
},
},
}
return task
}

func createTaskInfo(task *eremeticTask, taskID int, offer *mesos.Offer) *mesos.TaskInfo {
randId, err := uuid.V4()
if err != nil {
log.Error("Could not create random Id")
os.Exit(1)
}
id := fmt.Sprintf("eremetic-task.%s", randId.String())

return &mesos.TaskInfo{
TaskId: &mesos.TaskID{
Value: proto.String(id),
},
SlaveId: offer.SlaveId,
Name: proto.String("Eremetic task " + string(taskID)),
Command: task.command,
Container: task.container,
Resources: []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.taskCPUs),
mesosutil.NewScalarResource("mem", task.taskMem),
},
}
}