Skip to content

Commit

Permalink
Added support for --replicas-max-per-node switch
Browse files Browse the repository at this point in the history
Signed-off-by: Olli Janatuinen <[email protected]>
  • Loading branch information
olljanat committed Oct 14, 2018
1 parent 7d5d33b commit 6553bbe
Show file tree
Hide file tree
Showing 8 changed files with 654 additions and 317 deletions.
7 changes: 7 additions & 0 deletions api/api.pb.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3555,6 +3555,13 @@ file {
type_name: ".docker.swarmkit.v1.Platform"
json_name: "platforms"
}
field {
name: "max_replicas"
number: 4
label: LABEL_OPTIONAL
type: TYPE_UINT64
json_name: "maxReplicas"
}
}
message_type {
name: "JoinTokens"
Expand Down
665 changes: 348 additions & 317 deletions api/types.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,9 @@ message Placement {
// This field is used in the platform filter for scheduling. If empty,
// then the platform filter is off, meaning there are no scheduling restrictions.
repeated Platform platforms = 3;

// MaxReplicas specifies the limit for maximum number of replicas running on one node.
uint64 max_replicas = 4;
}

// JoinToken contains the join tokens for workers and managers.
Expand Down
1 change: 1 addition & 0 deletions cmd/swarmctl/service/flagparser/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func AddServiceFlags(flags *pflag.FlagSet) {
flags.StringSlice("label", nil, "service label (key=value)")

flags.Uint64("replicas", 1, "number of replicas for the service (only works in replicated service mode)")
flags.Uint64("replicas-max-per-node", 0, "maximum number of replicas for per node (only works in replicated service mode) (default 0 = unlimited)")

flags.String("image", "", "container image")
flags.String("hostname", "", "container hostname")
Expand Down
16 changes: 16 additions & 0 deletions cmd/swarmctl/service/flagparser/placement.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package flagparser

import (
"fmt"

"github.com/docker/swarmkit/api"
"github.com/spf13/pflag"
)
Expand All @@ -17,5 +19,19 @@ func parsePlacement(flags *pflag.FlagSet, spec *api.ServiceSpec) error {
spec.Task.Placement.Constraints = constraints
}

if flags.Changed("replicas-max-per-node") {
if spec.GetReplicated() == nil {
return fmt.Errorf("--replicas-max-per-node can only be specified in --mode replicated")
}
maxReplicas, err := flags.GetUint64("replicas-max-per-node")
if err != nil {
return err
}
if spec.Task.Placement == nil {
spec.Task.Placement = &api.Placement{}
}
spec.Task.Placement.MaxReplicas = maxReplicas
}

return nil
}
25 changes: 25 additions & 0 deletions manager/scheduler/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,28 @@ func (f *HostPortFilter) Explain(nodes int) string {
}
return fmt.Sprintf("host-mode port already in use on %d nodes", nodes)
}

// MaxReplicasFilter selects only nodes that does not exceed max replicas per node.
type MaxReplicasFilter struct {
t *api.Task
}

// SetTask returns true when max replicas per node filter > 0 for a given task.
func (f *MaxReplicasFilter) SetTask(t *api.Task) bool {
if t.Spec.Placement != nil && t.Spec.Placement.MaxReplicas > 0 {
f.t = t
return true
}

return false
}

// Check returns true if there is less active (assigned or pre-assigned) tasks for this service on current node than set to MaxReplicas limit
func (f *MaxReplicasFilter) Check(n *NodeInfo) bool {
return uint64(n.ActiveTasksCountByService[f.t.ServiceID]) < f.t.Spec.Placement.MaxReplicas
}

// Explain returns an explanation of a failure.
func (f *MaxReplicasFilter) Explain(nodes int) string {
return "max replicas per node limit exceed"
}
1 change: 1 addition & 0 deletions manager/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
&ConstraintFilter{},
&PlatformFilter{},
&HostPortFilter{},
&MaxReplicasFilter{},
}
)

Expand Down
253 changes: 253 additions & 0 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3262,3 +3262,256 @@ func TestSchedulerHostPort(t *testing.T) {
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (host-mode port already in use on 2 nodes)", failure.Status.Err)
}

func TestSchedulerMaxReplicas(t *testing.T) {
ctx := context.Background()
node1 := &api.Node{
ID: "nodeid1",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node1",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
node2 := &api.Node{
ID: "nodeid2",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node2",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
task1 := &api.Task{
ID: "id1",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name1",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task2 := &api.Task{
ID: "id2",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name2",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task3 := &api.Task{
ID: "id3",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
MaxReplicas: 1,
},
},
ServiceAnnotations: api.Annotations{
Name: "name3",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
service1 := &api.Service{
ID: "serviceID1",
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

err := s.Update(func(tx store.Tx) error {
// Add initial node, service and task
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()

// Tasks shouldn't be scheduled because there are no nodes.
watchAssignmentFailure(t, watch)
watchAssignmentFailure(t, watch)

err = s.Update(func(tx store.Tx) error {
// Add initial node and task
assert.NoError(t, store.CreateNode(tx, node1))
assert.NoError(t, store.CreateNode(tx, node2))
return nil
})
assert.NoError(t, err)

// Tasks 1 and 2 should be assigned to different nodes.
assignment1 := watchAssignment(t, watch)
assignment2 := watchAssignment(t, watch)
assert.True(t, assignment1 != assignment2)

// Task 3 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task3))
return nil
})
assert.NoError(t, err)

failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (max replicas per node limit exceed)", failure.Status.Err)

// Add third node to get task 3 scheduled
node3 := &api.Node{
ID: "nodeid3",
Spec: api.NodeSpec{
Annotations: api.Annotations{
Name: "node3",
},
},
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, node3))
return nil
})
assert.NoError(t, err)

// Create four more tasks to node 1
task4 := &api.Task{
ID: "id4",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name4",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task5 := &api.Task{
ID: "id5",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name5",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task6 := &api.Task{
ID: "id6",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name6",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
task7 := &api.Task{
ID: "id7",
ServiceID: "serviceID1",
DesiredState: api.TaskStateRunning,
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
Placement: &api.Placement{
Constraints: []string{"node.hostname==node1"},
MaxReplicas: 3,
},
},
ServiceAnnotations: api.Annotations{
Name: "name7",
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task4))
assert.NoError(t, store.CreateTask(tx, task5))
assert.NoError(t, store.CreateTask(tx, task6))
return nil
})
assert.NoError(t, err)

// Task 7 should not be schedulable.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task7))
return nil
})
assert.NoError(t, err)

failure = watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (scheduling constraints not satisfied on 3 nodes)", failure.Status.Err)
}

0 comments on commit 6553bbe

Please sign in to comment.