Skip to content

Commit

Permalink
Added execution config changes (flyteorg#378)
Browse files Browse the repository at this point in the history
* Added execution config changes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* lint fixes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* using executionConfig data during launch

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* resolve conflicts

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Removed defaults for labels and annotations

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* added more coverage

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* added more coverage

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Updating idl and lint fixes

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Adde missing go.sum

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* feedback changes to return immediately if any field is set while overriding

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* using released flyteidl

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* using released flyteidl

Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Mar 25, 2022
1 parent 76af9b2 commit f532007
Show file tree
Hide file tree
Showing 8 changed files with 513 additions and 40 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/benbjohnson/clock v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.24.2
github.com/flyteorg/flyteidl v0.24.6
github.com/flyteorg/flyteplugins v0.10.16
github.com/flyteorg/flytepropeller v0.16.36
github.com/flyteorg/flytestdlib v0.4.13
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.2 h1:RQzWmtVQR+NKAppjw7xTsIn6gosP0Q/j58tfF6Cr6h4=
github.com/flyteorg/flyteidl v0.24.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.6 h1:n2796X9Sw7mNDtXWwsJr84DLQpz8Cptvb7LptfJLxag=
github.com/flyteorg/flyteidl v0.24.6/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw=
github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos=
github.com/flyteorg/flytepropeller v0.16.36 h1:5uE8JsutrPVyLVrRJ8BgvhZUOmTBFkEkn5xmIOo21nU=
Expand Down
116 changes: 84 additions & 32 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,41 +428,91 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
return parentNodeExecutionID, sourceExecutionID, nil
}

// WorkflowExecutionConfigInterface is used as common interface for capturing the common behavior catering to the needs
// of fetching the WorkflowExecutionConfig across LaunchPlanSpec, ExecutionCreateRequest
// MatchableResource_WORKFLOW_EXECUTION_CONFIG and ApplicationConfig
type WorkflowExecutionConfigInterface interface {
// GetMaxParallelism Can be used to control the number of parallel nodes to run within the workflow. This is useful to achieve fairness.
GetMaxParallelism() int32
// GetRawOutputDataConfig Encapsulates user settings pertaining to offloaded data (i.e. Blobs, Schema, query data, etc.).
GetRawOutputDataConfig() *admin.RawOutputDataConfig
// GetSecurityContext Indicates security context permissions for executions triggered with this matchable attribute.
GetSecurityContext() *core.SecurityContext
// GetAnnotations Custom annotations to be applied to a triggered execution resource.
GetAnnotations() *admin.Annotations
// GetLabels Custom labels to be applied to a triggered execution resource.
GetLabels() *admin.Labels
}

// Merge into workflowExecConfig from spec and return true if any value has been changed
func mergeIntoExecConfig(workflowExecConfig *admin.WorkflowExecutionConfig, spec WorkflowExecutionConfigInterface) bool {
isChanged := false
if workflowExecConfig.GetMaxParallelism() == 0 && spec.GetMaxParallelism() > 0 {
workflowExecConfig.MaxParallelism = spec.GetMaxParallelism()
isChanged = true
}
if workflowExecConfig.GetSecurityContext() == nil && spec.GetSecurityContext() != nil {
workflowExecConfig.SecurityContext = spec.GetSecurityContext()
isChanged = true
}
// Launchplan spec has label, annotation and rawOutputDataConfig initialized with empty values.
// Hence we do a deep check in the following conditions before assignment
if (workflowExecConfig.GetRawOutputDataConfig() == nil ||
len(workflowExecConfig.GetRawOutputDataConfig().GetOutputLocationPrefix()) == 0) &&
(spec.GetRawOutputDataConfig() != nil && len(spec.GetRawOutputDataConfig().OutputLocationPrefix) > 0) {
workflowExecConfig.RawOutputDataConfig = spec.GetRawOutputDataConfig()
isChanged = true
}
if (workflowExecConfig.GetLabels() == nil || len(workflowExecConfig.GetLabels().Values) == 0) &&
(spec.GetLabels() != nil && len(spec.GetLabels().Values) > 0) {
workflowExecConfig.Labels = spec.GetLabels()
isChanged = true
}
if (workflowExecConfig.GetAnnotations() == nil || len(workflowExecConfig.GetAnnotations().Values) == 0) &&
(spec.GetAnnotations() != nil && len(spec.GetAnnotations().Values) > 0) {
workflowExecConfig.Annotations = spec.GetAnnotations()
isChanged = true
}
return isChanged
}

// Produces execution-time attributes for workflow execution.
// Defaults to overridable execution values set in the execution create request, then looks at the launch plan values
// (if any) before defaulting to values set in the matchable resource db and further if matchable resources don't
// exist then defaults to one set in application configuration
func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest,
launchPlan *admin.LaunchPlan) (*admin.WorkflowExecutionConfig, error) {
if request.Spec.MaxParallelism > 0 {
return &admin.WorkflowExecutionConfig{
MaxParallelism: request.Spec.MaxParallelism,
}, nil

workflowExecConfig := &admin.WorkflowExecutionConfig{}
// merge the request spec into workflowExecConfig
if isChanged := mergeIntoExecConfig(workflowExecConfig, request.Spec); isChanged {
return workflowExecConfig, nil
}
if launchPlan != nil && launchPlan.Spec.MaxParallelism > 0 {
return &admin.WorkflowExecutionConfig{
MaxParallelism: launchPlan.Spec.MaxParallelism,
}, nil

if launchPlan != nil && launchPlan.Spec != nil {
// merge the launch plan spec into workflowExecConfig
if isChanged := mergeIntoExecConfig(workflowExecConfig, launchPlan.Spec); isChanged {
return workflowExecConfig, nil
}
}

resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: request.Project,
Domain: request.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
matchableResource, err := util.GetMatchableResource(ctx, m.resourceManager,
admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain)
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get workflow execution config overrides with error: %v", err)
return nil, err
}
return nil, err
}
if resource != nil && resource.Attributes.GetWorkflowExecutionConfig() != nil {
return resource.Attributes.GetWorkflowExecutionConfig(), nil

if matchableResource != nil && matchableResource.Attributes.GetWorkflowExecutionConfig() != nil {
// merge the matchable resource workflow execution config into workflowExecConfig
if isChanged := mergeIntoExecConfig(workflowExecConfig,
matchableResource.Attributes.GetWorkflowExecutionConfig()); isChanged {
return workflowExecConfig, nil
}
}
// merge the application config into workflowExecConfig
mergeIntoExecConfig(workflowExecConfig, m.config.ApplicationConfiguration().GetTopLevelConfig())
// Defaults to one from the application config
return &admin.WorkflowExecutionConfig{
MaxParallelism: m.config.ApplicationConfiguration().GetTopLevelConfig().GetMaxParallelism(),
}, nil
return workflowExecConfig, nil
}

func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *admin.ExecutionCreateRequest) (
Expand Down Expand Up @@ -579,21 +629,23 @@ func (m *ExecutionManager) launchSingleTaskExecution(
}

var labels map[string]string
if requestSpec.Labels != nil {
labels = requestSpec.Labels.Values
if executionConfig.Labels != nil {
labels = executionConfig.Labels.Values
}

labels, err = m.addProjectLabels(ctx, request.Project, labels)
if err != nil {
return nil, nil, err
}

var annotations map[string]string
if requestSpec.Annotations != nil {
annotations = requestSpec.Annotations.Values
if executionConfig.Annotations != nil {
annotations = executionConfig.Annotations.Values
}

rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig
if requestSpec.RawOutputDataConfig != nil {
rawOutputDataConfig = requestSpec.RawOutputDataConfig
var rawOutputDataConfig *admin.RawOutputDataConfig
if executionConfig.RawOutputDataConfig != nil {
rawOutputDataConfig = executionConfig.RawOutputDataConfig
}

clusterAssignment, err := m.getClusterAssignment(ctx, &request)
Expand Down Expand Up @@ -817,19 +869,19 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
namespace := common.GetNamespaceName(
m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), workflowExecutionID.Project, workflowExecutionID.Domain)

labels, err := resolveStringMap(requestSpec.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries())
labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries())
if err != nil {
return nil, nil, err
}
labels, err = m.addProjectLabels(ctx, request.Project, labels)
if err != nil {
return nil, nil, err
}
annotations, err := resolveStringMap(requestSpec.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries())
if err != nil {
return nil, nil, err
}
rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig
var rawOutputDataConfig *admin.RawOutputDataConfig
if requestSpec.RawOutputDataConfig != nil {
rawOutputDataConfig = requestSpec.RawOutputDataConfig
}
Expand Down
Loading

0 comments on commit f532007

Please sign in to comment.