Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added support for --replicas-max-per-node switch
Browse files Browse the repository at this point in the history
Signed-off-by: Olli Janatuinen <[email protected]>
olljanat committed Sep 29, 2018
1 parent e2aafdd commit e6a2a25
Showing 8 changed files with 529 additions and 319 deletions.
21 changes: 21 additions & 0 deletions api/api.pb.txt
Original file line number Diff line number Diff line change
@@ -787,6 +787,20 @@ file {
type: TYPE_STRING
json_name: "phpNamespace"
}
field {
name: "php_metadata_namespace"
number: 44
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "phpMetadataNamespace"
}
field {
name: "ruby_package"
number: 45
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "rubyPackage"
}
field {
name: "uninterpreted_option"
number: 999
@@ -3555,6 +3569,13 @@ file {
type_name: ".docker.swarmkit.v1.Platform"
json_name: "platforms"
}
field {
name: "maxreplicas"
number: 4
label: LABEL_OPTIONAL
type: TYPE_UINT64
json_name: "maxreplicas"
}
}
message_type {
name: "JoinTokens"
669 changes: 350 additions & 319 deletions api/types.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/types.proto
Original file line number Diff line number Diff line change
@@ -862,6 +862,9 @@ message Placement {
// This field is used in the platform filter for scheduling. If empty,
// then the platform filter is off, meaning there are no scheduling restrictions.
repeated Platform platforms = 3;

// Max replicas specifies limit for maximum number of replicas running on one node.
uint64 maxreplicas = 4;
}

// JoinToken contains the join tokens for workers and managers.
1 change: 1 addition & 0 deletions cmd/swarmctl/service/flagparser/flags.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ func AddServiceFlags(flags *pflag.FlagSet) {
flags.StringSlice("label", nil, "service label (key=value)")

flags.Uint64("replicas", 1, "number of replicas for the service (only works in replicated service mode)")
flags.Uint64("replicas-max-per-node", 0, "maximum number of replicas for per node (only works in replicated service mode) (default 0 = unlimited)")

flags.String("image", "", "container image")
flags.String("hostname", "", "container hostname")
15 changes: 15 additions & 0 deletions cmd/swarmctl/service/flagparser/placement.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flagparser

import (
"fmt"
"github.com/docker/swarmkit/api"
"github.com/spf13/pflag"
)
@@ -17,5 +18,19 @@ func parsePlacement(flags *pflag.FlagSet, spec *api.ServiceSpec) error {
spec.Task.Placement.Constraints = constraints
}

if flags.Changed("replicas-max-per-node") {
if spec.GetReplicated() == nil {
return fmt.Errorf("--replicas-max-per-node can only be specified in --mode replicated")
}
maxreplicas, err := flags.GetUint64("replicas-max-per-node")
if err != nil {
return err
}
if spec.Task.Placement == nil {
spec.Task.Placement = &api.Placement{}
}
spec.Task.Placement.Maxreplicas = maxreplicas
}

return nil
}
31 changes: 31 additions & 0 deletions manager/scheduler/filter.go
Original file line number Diff line number Diff line change
@@ -359,3 +359,34 @@ func (f *HostPortFilter) Explain(nodes int) string {
}
return fmt.Sprintf("host-mode port already in use on %d nodes", nodes)
}

// MaxReplicasFilter selects only nodes that does not exceed max replicas per node.
type MaxReplicasFilter struct {
t *api.Task
}

// SetTask returns true when max replicas per node filter > 0 for a given task.
func (f *MaxReplicasFilter) SetTask(t *api.Task) bool {
if t.Spec.Placement != nil {
if t.Spec.Placement.Maxreplicas > 0 {
f.t = t
return true
}
}

return false
}

// Check returns true if there is free slots for task in a given node.
func (f *MaxReplicasFilter) Check(n *NodeInfo) bool {
if uint64(n.ActiveTasksCountByService[f.t.ServiceID]) < f.t.Spec.Placement.Maxreplicas {
return true
}

return false
}

// Explain returns an explanation of a failure.
func (f *MaxReplicasFilter) Explain(nodes int) string {
return "max replicas per node limit exceed"
}
1 change: 1 addition & 0 deletions manager/scheduler/pipeline.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ var (
&ConstraintFilter{},
&PlatformFilter{},
&HostPortFilter{},
&MaxReplicasFilter{},
}
)

107 changes: 107 additions & 0 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -3262,3 +3262,110 @@ func TestSchedulerHostPort(t *testing.T) {
failure := watchAssignmentFailure(t, watch)
assert.Equal(t, "no suitable node (host-mode port already in use on 2 nodes)", failure.Status.Err)
}

func testMaxreplicas(t *testing.T, useSpecVersion bool) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
ID: "id1",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"datacenter": "1",
},
},
},
},
{
ID: "id2",
Status: api.NodeStatus{
State: api.NodeStatus_READY,
},
Spec: api.NodeSpec{
Annotations: api.Annotations{
Labels: map[string]string{
"datacenter": "2",
},
},
},
},
}

taskTemplate1 := &api.Task{
DesiredState: api.TaskStateRunning,
ServiceID: "service1",
Spec: api.TaskSpec{
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{
Image: "v:1",
},
},
Placement: &api.Placement{
Preferences: []*api.PlacementPreference{
{
Preference: &api.PlacementPreference_Spread{
Spread: &api.SpreadOver{
SpreadDescriptor: "node.labels.datacenter",
},
},
},
},
Maxreplicas: 2,
},
},
Status: api.TaskStatus{
State: api.TaskStatePending,
},
}

if useSpecVersion {
taskTemplate1.SpecVersion = &api.Version{Index: 1}
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

t1Instances := 8

err := s.Update(func(tx store.Tx) error {
// Prepoulate nodes
for _, n := range initialNodeSet {
assert.NoError(t, store.CreateNode(tx, n))
}

// Prepopulate tasks from template 1
for i := 0; i != t1Instances; i++ {
taskTemplate1.ID = fmt.Sprintf("t1id%d", i)
assert.NoError(t, store.CreateTask(tx, taskTemplate1))
}
return nil
})
assert.NoError(t, err)

scheduler := New(s)

watch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancel()

go func() {
assert.NoError(t, scheduler.Run(ctx))
}()
defer scheduler.Stop()

t1Assignments := make(map[string]int)
for i := 0; i != t1Instances; i++ {
assignment := watchAssignment(t, watch)
if !strings.HasPrefix(assignment.ID, "t1") {
t.Fatal("max replicas per node limit exceed")
}
t1Assignments[assignment.NodeID]++
}

assert.Len(t, t1Assignments, 2)
assert.Equal(t, 2, t1Assignments["id1"])
assert.Equal(t, 2, t1Assignments["id2"])
}

0 comments on commit e6a2a25

Please sign in to comment.