Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Scoped distribution #351

Merged
merged 2 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,15 @@ help: ## Display this help.

# Produce CRDs that work back to Kubernetes 1.11 (no version conversion)
# CRD_OPTIONS ?= "crd:trivialVersions=true" # "crd:trivialVersions=true,preserveUnknownFields=false"
CRD_OPTIONS ?= crd
# crd:allowDangerousTypes=true is used to enable float64.
# They are considered danger as support for them varies across languages. However, since they are
# intended for use within the controllers, they are safe to use.
CRD_OPTIONS ?= crd:allowDangerousTypes=true

##@ Development
generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."
$(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./..." output:crd:artifacts:config=${CRD_DIR}
$(CONTROLLER_GEN) $(CRD_OPTIONS) paths="./..." output:crd:artifacts:config=${CRD_DIR}
# $(CONTROLLER_GEN) webhook paths="./..." output:webhook:artifacts:config=${WEBHOOK_DIR}
$(CONTROLLER_GEN) rbac:roleName=frisbee paths="./..." output:rbac:artifacts:config=${RBAC_DIR}

Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha1/admission_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (in *Cluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
func (in *Cluster) Default() {
clusterlog.Info("default", "name", in.Name)

/*
Make the inputs consistent with MaxInstances.
*/
if err := in.Spec.GenerateObjectFromTemplate.Prepare(true); err != nil {
clusterlog.Error(err, "template error")
}
Expand Down
13 changes: 8 additions & 5 deletions api/v1alpha1/admission_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,20 @@ func ValidateScheduler(instances int, sch *SchedulerSpec) error {
}

func ValidateDistribution(dist *DistributionSpec) error {
switch dist.Distribution {
case "constant":
switch dist.Name {
case DistributionConstant:
return nil

case "uniform":
case DistributionUniform:
return nil

case "zipfian":
case DistributionZipfian:
return nil

case "histogram":
case DistributionHistogram:
return nil

case DistributionDefault:
return nil
}
/* TODO: continue with the other distributions */
Expand Down
34 changes: 28 additions & 6 deletions api/v1alpha1/crd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,27 @@ type PlacementSpec struct {
type ClusterSpec struct {
GenerateObjectFromTemplate `json:",inline"`

// Resources defines how a set of resources will be distributed among the cluster's services.
// +optional
Resources *ResourceDistributionSpec `json:"resources,omitempty"`
/*
Preparation of Grouped Environment
*/

// TestData defines a volume that will be mounted across the Scenario's Services.
// +optional
TestData *TestdataVolume `json:"testData,omitempty"`

// Tolerate specifies the conditions under which the cluster will fail. If undefined, the cluster fails
// immediately when a service has failed.
// DefaultDistributionSpec pre-calculates a scoped distribution that can be accessed by other entities
// using "distribution.name : default". This default distribution allows us to describe complex relations
// across features managed by different entities (e.g, place the largest dataset on the largest node).
// +optional
Tolerate *TolerateSpec `json:"tolerate,omitempty"`
DefaultDistributionSpec *DistributionSpec `json:"defaultDistribution,omitempty"`

/*
Instance Creation Functions
*/

// Resources defines how a set of resources will be distributed among the cluster's services.
// +optional
Resources *ResourceDistributionSpec `json:"resources,omitempty"`

// Schedule defines the interval between the creation of services in the group.
// +optional
Expand All @@ -73,16 +82,29 @@ type ClusterSpec struct {
// +optional
Placement *PlacementSpec `json:"placement,omitempty"`

/*
Error Management
*/

// Suspend flag tells the controller to suspend subsequent executions, it does not apply to already started
// executions. Defaults to false.
// +optional
Suspend *bool `json:"suspend,omitempty"`

// Tolerate specifies the conditions under which the cluster will fail. If undefined, the cluster fails
// immediately when a service has failed.
// +optional
Tolerate *TolerateSpec `json:"tolerate,omitempty"`
}

// ClusterStatus defines the observed state of Cluster.
type ClusterStatus struct {
Lifecycle `json:",inline"`

// DefaultDistribution keeps the evaluated expression of GenerateObjectFromTemplate.DefaultDistributionSpec.
// +optional
DefaultDistribution []float64 `json:"defaultDistribution,omitempty"`

// QueuedJobs is a list of services scheduled for creation by the cluster.
// +optional
QueuedJobs []ServiceSpec `json:"queuedJobs,omitempty"`
Expand Down
47 changes: 0 additions & 47 deletions api/v1alpha1/crd_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/json"
)
Expand Down Expand Up @@ -68,52 +67,6 @@ func (in *Scenario) Table() (header []string, data [][]string) {
return header, data
}

/*
FindTimeline parses the scenario to find timeline that make sense (formatted into time.UnixMilli).
For the starting time we adhere to these rules:
1. If possible, we use the time that the first job was scheduled.
2. Otherwise, we use the Creation time.

For the ending time we adhere to these rules:
1. If the scenario is successful, we return the ConditionAllJobsAreCompleted time.
2. If the scenario has failed, we return the Failure time.
3. Otherwise, we report time.Now().
*/
func (in *Scenario) FindTimeline() (from int64, to int64) {
initialized := meta.FindStatusCondition(in.Status.Conditions, ConditionCRInitialized.String())
if initialized != nil {
from = initialized.LastTransitionTime.Time.UnixMilli()
} else {
from = in.GetCreationTimestamp().Time.UnixMilli()
}

to = time.Now().UnixMilli()

switch in.Status.Phase {
case PhaseSuccess:
success := meta.FindStatusCondition(in.Status.Conditions, ConditionAllJobsAreCompleted.String())
to = success.LastTransitionTime.Time.UnixMilli()

case PhaseFailed:
// Failure may come from various reasons. Unfortunately we have to go through all of them.
unexpected := meta.FindStatusCondition(in.Status.Conditions, ConditionJobUnexpectedTermination.String())
if unexpected != nil {
to = unexpected.LastTransitionTime.Time.UnixMilli()

break
}

assert := meta.FindStatusCondition(in.Status.Conditions, ConditionAssertionError.String())
if assert != nil {
to = assert.LastTransitionTime.Time.UnixMilli()

break
}
}

return from, to
}

type ActionType string

const (
Expand Down
58 changes: 39 additions & 19 deletions api/v1alpha1/type_distributions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,30 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type ResourceDistributionSpec struct {
// DistributionSpec defines how the TotalResources will be assigned to resources.
*DistributionSpec `json:","`

// TotalResources defines the total resources that will be distributed among the cluster's services.
TotalResources corev1.ResourceList `json:"total"`
}

type TimelineDistributionSpec struct {
// DistributionSpec defines how the TotalDuration will be divided into time-based events.
*DistributionSpec `json:","`

// TotalDuration defines the total duration within which events will happen.
TotalDuration *metav1.Duration `json:"total"`
}
const (
DistributionConstant = "constant"
DistributionUniform = "uniform"
DistributionZipfian = "zipfian"
DistributionHistogram = "histogram"

// DistributionDefault instructs the controller to use an already evaluated distribution.
DistributionDefault = "default"
)

type DistributionSpec struct {
// +kubebuilder:validation:Enum=constant;uniform;zipfian;histogram
Distribution string `json:"distribution"`
// +kubebuilder:validation:Enum=constant;uniform;zipfian;histogram;default
Name string `json:"name"`

// +optional
*DistParamsConstant `json:"constant,omitempty"`

// +optional
*DistParamsUniform `json:"uniform,omitempty"`

// +optional
*DistParamsZipfian `json:"zipfian,omitempty"`

// +optional
*DistParamsHistogram `json:"histogram,omitempty"`
}

Expand Down Expand Up @@ -78,10 +76,18 @@ type DistParamsHistogram struct {

/*

Results of evaluated distributions
Timeline Distribution

*/

type TimelineDistributionSpec struct {
// DistributionSpec defines how the TotalDuration will be divided into time-based events.
DistributionSpec *DistributionSpec `json:"distribution"`

// TotalDuration defines the total duration within which events will happen.
TotalDuration *metav1.Duration `json:"total"`
}

type Timeline []metav1.Time

// Next returns the next activation time, later than the given time.
Expand All @@ -100,7 +106,7 @@ func (in Timeline) Next(ref time.Time) time.Time {
func (in Timeline) String() string {
var out strings.Builder

out.WriteString(fmt.Sprint("\n=== Timeline ===\n"))
out.WriteString("\n=== Timeline ===\n")

for _, node := range in {
out.WriteString(fmt.Sprintf("\n * %s", node.Time.Format(time.StampMilli)))
Expand All @@ -109,6 +115,20 @@ func (in Timeline) String() string {
return out.String()
}

/*

Resource Distribution

*/

type ResourceDistributionSpec struct {
// DistributionSpec defines how the TotalResources will be assigned to resources.
DistributionSpec *DistributionSpec `json:"distribution"`

// TotalResources defines the total resources that will be distributed among the cluster's services.
TotalResources corev1.ResourceList `json:"total"`
}

type ResourceDistribution []corev1.ResourceList

func (in ResourceDistribution) Table() (header []string, data [][]string) {
Expand Down
28 changes: 19 additions & 9 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion charts/databases/cockroachdb/examples/1.baseline-single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ spec:
inputs:
- { server: master, workload: workloada, recordcount: "1000000", threads: "400", delay: "15" }
resources:
distribution:
name: constant
total: { cpu: "2", memory: "100Mi" }
distribution: constant

# Step 3. Run YCSB workload A
- action: Service
Expand Down
11 changes: 9 additions & 2 deletions charts/federated-learning/fedbed/examples/0.baseline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@ spec:
depends: { running: [ server ], success: [ cifar10-download ] }
cluster:
templateRef: fedbed.client
defaultDistribution:
name: constant
inputs:
- { fl_server: server, dataset: fl.datasets.cifar10, node_id: 0, total_nodes: 3 }
- { fl_server: server, dataset: fl.datasets.cifar10, node_id: 1, total_nodes: 3 }
- { fl_server: server, dataset: fl.datasets.cifar10, node_id: 2, total_nodes: 3 }
resources:
distribution:
name: default
total: {mem: 500Mi}


# Teardown
- action: Delete
name: teardown
depends: { running: [ server ], success: [ clients ] }
depends: { success: [ clients, server ] }
delete:
jobs: [ server ]
jobs: []
Loading