Skip to content

Commit

Permalink
Implemented portMapping for eremetic tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mrares authored and Rares Mirica committed Aug 31, 2016
1 parent 6ae9e18 commit 043bee4
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 0 deletions.
11 changes: 11 additions & 0 deletions scheduler/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,22 @@ func AttributeMatch(slaveConstraints []types.SlaveConstraint) ogle.Matcher {
return ogle.AllOf(submatchers...)
}

/*
func PortRangeMatch(taskPorts []types.Port) ogle.Matcher {
var submatchers []ogle.Matcher
for _, constraint := range slaveConstraints {
submatchers = append(submatchers, &attributeMatcher{constraint})
}
return ogle.AllOf(submatchers...)
}
*/

func createMatcher(task types.EremeticTask) ogle.Matcher {
return ogle.AllOf(
CPUAvailable(task.TaskCPUs),
MemoryAvailable(task.TaskMem),
AttributeMatch(task.SlaveConstraints),
// PortRangeMatch(task.Ports),
)
}

Expand Down
1 change: 1 addition & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (s *eremeticScheduler) ScheduleTask(request types.Request) (string, error)
"docker_image": request.DockerImage,
"command": request.Command,
"slave_constraints": request.SlaveConstraints,
"ports": request.Ports,
}).Debug("Adding task to queue")

task, err := types.NewEremeticTask(request, fmt.Sprintf("Eremetic task %s", nextID(s)))
Expand Down
54 changes: 54 additions & 0 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,57 @@ func createTaskInfo(task types.EremeticTask, offer *mesos.Offer) (types.Eremetic
})
}

var portResources []*mesos.Value_Range
var portMapping []*mesos.ContainerInfo_DockerInfo_PortMapping

if(len(task.Ports) > 0) {
lastIndex := len(task.Ports)

for _, v := range offer.Resources {
if(lastIndex == 0) {
break
}

if(*v.Name != "ports") {
continue
}

for _, p_v := range v.Ranges.Range {
if(lastIndex == 0) {
break
}

startPort, endPort := *p_v.Begin, int(*p_v.Begin)
for portnumber := int(*p_v.Begin); portnumber <= int(*p_v.End); portnumber++ {
if(lastIndex == 0) {
break
}

lastIndex--
ask_port := &task.Ports[lastIndex]

if(ask_port.ContainerPort == 0) {
continue
}

endPort = portnumber + 1

ask_port.HostPort = uint32(portnumber)

portMapping = append(portMapping, &mesos.ContainerInfo_DockerInfo_PortMapping{
ContainerPort: proto.Uint32(ask_port.ContainerPort),
HostPort: proto.Uint32(ask_port.HostPort),
Protocol: proto.String(ask_port.Protocol),
})

}
if(int(startPort) != endPort) {
portResources = append(portResources, mesosutil.NewValueRange(startPort,uint64(endPort)))
}
}
}
}

var uris []*mesos.CommandInfo_URI
for _, v := range task.FetchURIs {
uris = append(uris, &mesos.CommandInfo_URI{
Expand Down Expand Up @@ -78,12 +129,15 @@ func createTaskInfo(task types.EremeticTask, offer *mesos.Offer) (types.Eremetic
Docker: &mesos.ContainerInfo_DockerInfo{
Image: proto.String(task.Image),
ForcePullImage: proto.Bool(task.ForcePullImage),
PortMappings: portMapping,
Network: mesos.ContainerInfo_DockerInfo_BRIDGE.Enum(),
},
Volumes: volumes,
},
Resources: []*mesos.Resource{
mesosutil.NewScalarResource("cpus", task.TaskCPUs),
mesosutil.NewScalarResource("mem", task.TaskMem),
mesosutil.NewRangesResource("ports", portResources),
},
}
return task, taskInfo
Expand Down
1 change: 1 addition & 0 deletions types/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Request struct {
DockerImage string `json:"docker_image"`
Command string `json:"command"`
Volumes []Volume `json:"volumes"`
Ports []Port `json:"ports"`
Environment map[string]string `json:"env"`
MaskedEnvironment map[string]string `json:"masked_env"`
SlaveConstraints []SlaveConstraint `json:"slave_constraints"`
Expand Down
8 changes: 8 additions & 0 deletions types/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ type Volume struct {
HostPath string `json:"host_path"`
}

type Port struct {
ContainerPort uint32 `json:"containerPort"`
HostPort uint32 `json:"hostPort"`
Protocol string `json:"protocol"`
}

type SlaveConstraint struct {
AttributeName string `json:"attribute_name"`
AttributeValue string `json:"attribute_value"`
Expand All @@ -41,6 +47,7 @@ type EremeticTask struct {
MaskedEnvironment map[string]string `json:"masked_env"`
Image string `json:"image"`
Volumes []Volume `json:"volumes"`
Ports []Port `json:"ports"`
Status []Status `json:"status"`
ID string `json:"id"`
Name string `json:"name"`
Expand Down Expand Up @@ -116,6 +123,7 @@ func NewEremeticTask(request Request, name string) (EremeticTask, error) {
SlaveConstraints: request.SlaveConstraints,
Image: request.DockerImage,
Volumes: request.Volumes,
Ports: request.Ports,
CallbackURI: request.CallbackURI,
ForcePullImage: request.ForcePullImage,
FetchURIs: mergeURIs(request),
Expand Down

0 comments on commit 043bee4

Please sign in to comment.