Skip to content

Commit

Permalink
Merge pull request #351 from CARV-ICS-FORTH/fedbed
Browse files Browse the repository at this point in the history
Add support for Scoped distribution
  • Loading branch information
fnikolai authored Oct 17, 2022
2 parents 9cad38a + 5f194c3 commit 16a9066
Show file tree
Hide file tree
Showing 21 changed files with 797 additions and 518 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,15 @@ help: ## Display this help.

# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
# CRD_OPTIONS ?= "crd:trivialVersions=true" # "crd:trivialVersions=true,preserveUnknownFields=false"
CRD_OPTIONS ?= crd
# crd:allowDangerousTypes=true is used to enable float64.
# They are considered danger as support for them varies across languages. However, since they are
# intended for use within the controllers, they are safe to use.
CRD_OPTIONS ?= crd:allowDangerousTypes=true

##@ Development
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
$(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./..." output:crd:artifacts:config=${CRD_DIR}
$(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./..." output:crd:artifacts:config=${CRD_DIR}
# $(CONTROLLER_GEN) webhook paths="./..." output:webhook:artifacts:config=${WEBHOOK_DIR}
$(CONTROLLER_GEN) rbac:roleName=frisbee paths="./..." output:rbac:artifacts:config=${RBAC_DIR}

Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha1/admission_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (in *Cluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
func (in *Cluster) Default() {
clusterlog.Info("default", "name", in.Name)

/*
Make the inputs consistent with MaxInstances.
*/
if err := in.Spec.GenerateObjectFromTemplate.Prepare(true); err != nil {
clusterlog.Error(err, "template error")
}
Expand Down
13 changes: 8 additions & 5 deletions api/v1alpha1/admission_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,20 @@ func ValidateScheduler(instances int, sch *SchedulerSpec) error {
}

func ValidateDistribution(dist *DistributionSpec) error {
switch dist.Distribution {
case "constant":
switch dist.Name {
case DistributionConstant:
return nil

case "uniform":
case DistributionUniform:
return nil

case "zipfian":
case DistributionZipfian:
return nil

case "histogram":
case DistributionHistogram:
return nil

case DistributionDefault:
return nil
}
/* TODO: continue with the other distributions */
Expand Down
34 changes: 28 additions & 6 deletions api/v1alpha1/crd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,27 @@ type PlacementSpec struct {
type ClusterSpec struct {
GenerateObjectFromTemplate `json:",inline"`

// Resources defines how a set of resources will be distributed among the cluster's services.
// +optional
Resources *ResourceDistributionSpec `json:"resources,omitempty"`
/*
Preparation of Grouped Environment
*/

// TestData defines a volume that will be mounted across the Scenario's Services.
// +optional
TestData *TestdataVolume `json:"testData,omitempty"`

// Tolerate specifies the conditions under which the cluster will fail. If undefined, the cluster fails
// immediately when a service has failed.
// DefaultDistributionSpec pre-calculates a scoped distribution that can be accessed by other entities
// using "distribution.name : default". This default distribution allows us to describe complex relations
// across features managed by different entities (e.g, place the largest dataset on the largest node).
// +optional
Tolerate *TolerateSpec `json:"tolerate,omitempty"`
DefaultDistributionSpec *DistributionSpec `json:"defaultDistribution,omitempty"`

/*
Instance Creation Functions
*/

// Resources defines how a set of resources will be distributed among the cluster's services.
// +optional
Resources *ResourceDistributionSpec `json:"resources,omitempty"`

// Schedule defines the interval between the creation of services in the group.
// +optional
Expand All @@ -73,16 +82,29 @@ type ClusterSpec struct {
// +optional
Placement *PlacementSpec `json:"placement,omitempty"`

/*
Error Management
*/

// Suspend flag tells the controller to suspend subsequent executions, it does not apply to already started
// executions. Defaults to false.
// +optional
Suspend *bool `json:"suspend,omitempty"`

// Tolerate specifies the conditions under which the cluster will fail. If undefined, the cluster fails
// immediately when a service has failed.
// +optional
Tolerate *TolerateSpec `json:"tolerate,omitempty"`
}

// ClusterStatus defines the observed state of Cluster.
type ClusterStatus struct {
Lifecycle `json:",inline"`

// DefaultDistribution keeps the evaluated expression of GenerateObjectFromTemplate.DefaultDistributionSpec.
// +optional
DefaultDistribution []float64 `json:"defaultDistribution,omitempty"`

// QueuedJobs is a list of services scheduled for creation by the cluster.
// +optional
QueuedJobs []ServiceSpec `json:"queuedJobs,omitempty"`
Expand Down
47 changes: 0 additions & 47 deletions api/v1alpha1/crd_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/json"
)
Expand Down Expand Up @@ -68,52 +67,6 @@ func (in *Scenario) Table() (header []string, data [][]string) {
return header, data
}

/*
FindTimeline parses the scenario to find timeline that make sense (formatted into time.UnixMilli).
For the starting time we adhere to these rules:
1. If possible, we use the time that the first job was scheduled.
2. Otherwise, we use the Creation time.
For the ending time we adhere to these rules:
1. If the scenario is successful, we return the ConditionAllJobsAreCompleted time.
2. If the scenario has failed, we return the Failure time.
3. Otherwise, we report time.Now().
*/
func (in *Scenario) FindTimeline() (from int64, to int64) {
initialized := meta.FindStatusCondition(in.Status.Conditions, ConditionCRInitialized.String())
if initialized != nil {
from = initialized.LastTransitionTime.Time.UnixMilli()
} else {
from = in.GetCreationTimestamp().Time.UnixMilli()
}

to = time.Now().UnixMilli()

switch in.Status.Phase {
case PhaseSuccess:
success := meta.FindStatusCondition(in.Status.Conditions, ConditionAllJobsAreCompleted.String())
to = success.LastTransitionTime.Time.UnixMilli()

case PhaseFailed:
// Failure may come from various reasons. Unfortunately we have to go through all of them.
unexpected := meta.FindStatusCondition(in.Status.Conditions, ConditionJobUnexpectedTermination.String())
if unexpected != nil {
to = unexpected.LastTransitionTime.Time.UnixMilli()

break
}

assert := meta.FindStatusCondition(in.Status.Conditions, ConditionAssertionError.String())
if assert != nil {
to = assert.LastTransitionTime.Time.UnixMilli()

break
}
}

return from, to
}

type ActionType string

const (
Expand Down
58 changes: 39 additions & 19 deletions api/v1alpha1/type_distributions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,30 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type ResourceDistributionSpec struct {
// DistributionSpec defines how the TotalResources will be assigned to resources.
*DistributionSpec `json:","`

// TotalResources defines the total resources that will be distributed among the cluster's services.
TotalResources corev1.ResourceList `json:"total"`
}

type TimelineDistributionSpec struct {
// DistributionSpec defines how the TotalDuration will be divided into time-based events.
*DistributionSpec `json:","`

// TotalDuration defines the total duration within which events will happen.
TotalDuration *metav1.Duration `json:"total"`
}
const (
DistributionConstant = "constant"
DistributionUniform = "uniform"
DistributionZipfian = "zipfian"
DistributionHistogram = "histogram"

// DistributionDefault instructs the controller to use an already evaluated distribution.
DistributionDefault = "default"
)

type DistributionSpec struct {
// +kubebuilder:validation:Enum=constant;uniform;zipfian;histogram
Distribution string `json:"distribution"`
// +kubebuilder:validation:Enum=constant;uniform;zipfian;histogram;default
Name string `json:"name"`

// +optional
*DistParamsConstant `json:"constant,omitempty"`

// +optional
*DistParamsUniform `json:"uniform,omitempty"`

// +optional
*DistParamsZipfian `json:"zipfian,omitempty"`

// +optional
*DistParamsHistogram `json:"histogram,omitempty"`
}

Expand Down Expand Up @@ -78,10 +76,18 @@ type DistParamsHistogram struct {

/*
Results of evaluated distributions
Timeline Distribution
*/

type TimelineDistributionSpec struct {
// DistributionSpec defines how the TotalDuration will be divided into time-based events.
DistributionSpec *DistributionSpec `json:"distribution"`

// TotalDuration defines the total duration within which events will happen.
TotalDuration *metav1.Duration `json:"total"`
}

type Timeline []metav1.Time

// Next returns the next activation time, later than the given time.
Expand All @@ -100,7 +106,7 @@ func (in Timeline) Next(ref time.Time) time.Time {
func (in Timeline) String() string {
var out strings.Builder

out.WriteString(fmt.Sprint("\n=== Timeline ===\n"))
out.WriteString("\n=== Timeline ===\n")

for _, node := range in {
out.WriteString(fmt.Sprintf("\n * %s", node.Time.Format(time.StampMilli)))
Expand All @@ -109,6 +115,20 @@ func (in Timeline) String() string {
return out.String()
}

/*
Resource Distribution
*/

type ResourceDistributionSpec struct {
// DistributionSpec defines how the TotalResources will be assigned to resources.
DistributionSpec *DistributionSpec `json:"distribution"`

// TotalResources defines the total resources that will be distributed among the cluster's services.
TotalResources corev1.ResourceList `json:"total"`
}

type ResourceDistribution []corev1.ResourceList

func (in ResourceDistribution) Table() (header []string, data [][]string) {
Expand Down
28 changes: 19 additions & 9 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion charts/databases/cockroachdb/examples/1.baseline-single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ spec:
inputs:
- { server: master, workload: workloada, recordcount: "1000000", threads: "400", delay: "15" }
resources:
distribution:
name: constant
total: { cpu: "2", memory: "100Mi" }
distribution: constant

# Step 3. Run YCSB workload A
- action: Service
Expand Down
11 changes: 9 additions & 2 deletions charts/federated-learning/fedbed/examples/0.baseline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@ spec:
depends: { running: [ server ], success: [ cifar10-download ] }
cluster:
templateRef: fedbed.client
defaultDistribution:
name: constant
inputs:
- { fl_server: server, dataset: fl.datasets.cifar10, node_id: 0, total_nodes: 3 }
- { fl_server: server, dataset: fl.datasets.cifar10, node_id: 1, total_nodes: 3 }
- { fl_server: server, dataset: fl.datasets.cifar10, node_id: 2, total_nodes: 3 }
resources:
distribution:
name: default
total: {mem: 500Mi}


# Teardown
- action: Delete
name: teardown
depends: { running: [ server ], success: [ clients ] }
depends: { success: [ clients, server ] }
delete:
jobs: [ server ]
jobs: []
Loading

0 comments on commit 16a9066

Please sign in to comment.