Skip to content

Commit

Permalink
Backwards compatible handling for IncompatibleClusterError (flyteorg#348
Browse files Browse the repository at this point in the history
)
  • Loading branch information
katrogan authored Feb 15, 2022
1 parent 5f876f8 commit 68475a7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/benbjohnson/clock v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.21.24
github.com/flyteorg/flyteidl v0.22.1
github.com/flyteorg/flyteplugins v0.9.1
github.com/flyteorg/flytepropeller v0.16.14
github.com/flyteorg/flytestdlib v0.4.7
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/
github.com/flyteorg/flyteidl v0.21.18/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.24 h1:e2wPBK4aiLE+fw2zmhUDNg39QoJk6Lf5lQRvj8XgtFk=
github.com/flyteorg/flyteidl v0.21.24/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.22.1 h1:9OYtiUIDTKsnNRoVGFcvUrIRbD3dxUJYgRTDnNnMRbw=
github.com/flyteorg/flyteidl v0.22.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.9.1 h1:Z0gxSvG7LeI+COfEmuzkhz9RnJ4E5wWUcjj5qh1uKuw=
github.com/flyteorg/flyteplugins v0.9.1/go.mod h1:OEGQztPFDJG4DV7tS9lDsRRd521iUINn5dcsBf6bW5k=
github.com/flyteorg/flytepropeller v0.16.14 h1:zG+UnfZLPCQdwh7ORm3BNwXlb6Sp2Wwa7I7NnZYcPDw=
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists,
"This phase %s was already recorded for workflow execution %v",
wfExecPhase.String(), request.Event.ExecutionId)
} else if err := validation.ValidateCluster(ctx, request.Event.ProducerId, executionModel.Cluster); err != nil {
} else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil {
return nil, err
} else if common.IsExecutionTerminal(wfExecPhase) {
// Cannot go backwards in time from a terminal state to anything else
Expand Down
20 changes: 11 additions & 9 deletions pkg/manager/impl/validation/shared_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// ValidateClusterForExecutionID validates that the execution denoted by executionId is recorded as executing on `cluster`.
func ValidateClusterForExecutionID(ctx context.Context, db repositories.RepositoryInterface, executionID *core.WorkflowExecutionIdentifier, cluster string) error {
func ValidateClusterForExecutionID(ctx context.Context, db repositories.RepositoryInterface, executionID *core.WorkflowExecutionIdentifier, clusterInEvent string) error {
workflowExecution, err := db.ExecutionRepo().Get(ctx, repoInterfaces.Identifier{
Project: executionID.Project,
Domain: executionID.Domain,
Expand All @@ -23,20 +23,22 @@ func ValidateClusterForExecutionID(ctx context.Context, db repositories.Reposito
logger.Debugf(ctx, "Failed to find existing execution with id [%+v] with err: %v", executionID, err)
return err
}
return ValidateCluster(ctx, workflowExecution.Cluster, cluster)
return ValidateCluster(ctx, workflowExecution.Cluster, clusterInEvent)
}

// ValidateClusterForExecution validates that the execution is recorded as executing on `cluster`.
func ValidateCluster(ctx context.Context, recordedCluster, cluster string) error {
// ValidateCluster validates that the execution is recorded as executing on `cluster`.
// clusterInEvent represents the cluster name, or historically, producerID sent in an execution event.
// clusterInDB represents the cluster recorded as running the execution in the database.
func ValidateCluster(ctx context.Context, clusterInDB, clusterInEvent string) error {
// DefaultProducerID is used in older versions of propeller which hard code this producer id.
// See https://github.com/flyteorg/flytepropeller/blob/eaf084934de5d630cd4c11aae15ecae780cc787e/pkg/controller/nodes/task/transformer.go#L114
if len(cluster) == 0 || cluster == common.DefaultProducerID {
if len(clusterInEvent) == 0 || clusterInEvent == common.DefaultProducerID {
return nil
}
if recordedCluster != cluster {
errorMsg := fmt.Sprintf("Cluster/producer from event [%s] does not match existing workflow execution cluster: [%s]",
recordedCluster, cluster)
return errors.NewIncompatibleClusterError(ctx, errorMsg, recordedCluster)
if clusterInEvent != clusterInDB {
errorMsg := fmt.Sprintf("Cluster/producer from event [%s] does not match existing workflow execution clusterInDB: [%s]",
clusterInEvent, clusterInDB)
return errors.NewIncompatibleClusterError(ctx, errorMsg, clusterInEvent)
}
return nil
}

0 comments on commit 68475a7

Please sign in to comment.