From 17d63f76dc15109cac9298da17b79181f68e8dcb Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 15 Feb 2022 14:42:02 -0800 Subject: [PATCH] Backwards compatible handling for IncompatibleClusterError (#348) --- flyteadmin/go.mod | 2 +- flyteadmin/go.sum | 2 ++ .../pkg/manager/impl/execution_manager.go | 2 +- .../impl/validation/shared_execution.go | 20 ++++++++++--------- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index c7ae230814..9cbbeb99de 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -11,7 +11,7 @@ require ( github.com/benbjohnson/clock v1.1.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v0.21.24 + github.com/flyteorg/flyteidl v0.22.1 github.com/flyteorg/flyteplugins v0.9.1 github.com/flyteorg/flytepropeller v0.16.14 github.com/flyteorg/flytestdlib v0.4.7 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index d29dcf47d6..7cd007ebfc 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -354,6 +354,8 @@ github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/ github.com/flyteorg/flyteidl v0.21.18/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.21.24 h1:e2wPBK4aiLE+fw2zmhUDNg39QoJk6Lf5lQRvj8XgtFk= github.com/flyteorg/flyteidl v0.21.24/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.22.1 h1:9OYtiUIDTKsnNRoVGFcvUrIRbD3dxUJYgRTDnNnMRbw= +github.com/flyteorg/flyteidl v0.22.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.9.1 h1:Z0gxSvG7LeI+COfEmuzkhz9RnJ4E5wWUcjj5qh1uKuw= github.com/flyteorg/flyteplugins v0.9.1/go.mod h1:OEGQztPFDJG4DV7tS9lDsRRd521iUINn5dcsBf6bW5k= github.com/flyteorg/flytepropeller v0.16.14 h1:zG+UnfZLPCQdwh7ORm3BNwXlb6Sp2Wwa7I7NnZYcPDw= diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index c5ca4ef8f2..b2cdd714c8 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1184,7 +1184,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "This phase %s was already recorded for workflow execution %v", wfExecPhase.String(), request.Event.ExecutionId) - } else if err := validation.ValidateCluster(ctx, request.Event.ProducerId, executionModel.Cluster); err != nil { + } else if err := validation.ValidateCluster(ctx, executionModel.Cluster, request.Event.ProducerId); err != nil { return nil, err } else if common.IsExecutionTerminal(wfExecPhase) { // Cannot go backwards in time from a terminal state to anything else diff --git a/flyteadmin/pkg/manager/impl/validation/shared_execution.go b/flyteadmin/pkg/manager/impl/validation/shared_execution.go index a5ef2a11bc..63d919559f 100644 --- a/flyteadmin/pkg/manager/impl/validation/shared_execution.go +++ b/flyteadmin/pkg/manager/impl/validation/shared_execution.go @@ -13,7 +13,7 @@ import ( ) // ValidateClusterForExecutionID validates that the execution denoted by executionId is recorded as executing on `cluster`. -func ValidateClusterForExecutionID(ctx context.Context, db repositories.RepositoryInterface, executionID *core.WorkflowExecutionIdentifier, cluster string) error { +func ValidateClusterForExecutionID(ctx context.Context, db repositories.RepositoryInterface, executionID *core.WorkflowExecutionIdentifier, clusterInEvent string) error { workflowExecution, err := db.ExecutionRepo().Get(ctx, repoInterfaces.Identifier{ Project: executionID.Project, Domain: executionID.Domain, @@ -23,20 +23,22 @@ func ValidateClusterForExecutionID(ctx context.Context, db repositories.Reposito logger.Debugf(ctx, "Failed to find existing execution with id [%+v] with err: %v", executionID, err) return err } - return ValidateCluster(ctx, workflowExecution.Cluster, cluster) + return ValidateCluster(ctx, workflowExecution.Cluster, clusterInEvent) } -// ValidateClusterForExecution validates that the execution is recorded as executing on `cluster`. -func ValidateCluster(ctx context.Context, recordedCluster, cluster string) error { +// ValidateCluster validates that the execution is recorded as executing on `cluster`. +// clusterInEvent represents the cluster name, or historically, producerID sent in an execution event. +// clusterInDB represents the cluster recorded as running the execution in the database. +func ValidateCluster(ctx context.Context, clusterInDB, clusterInEvent string) error { // DefaultProducerID is used in older versions of propeller which hard code this producer id. // See https://github.com/flyteorg/flytepropeller/blob/eaf084934de5d630cd4c11aae15ecae780cc787e/pkg/controller/nodes/task/transformer.go#L114 - if len(cluster) == 0 || cluster == common.DefaultProducerID { + if len(clusterInEvent) == 0 || clusterInEvent == common.DefaultProducerID { return nil } - if recordedCluster != cluster { - errorMsg := fmt.Sprintf("Cluster/producer from event [%s] does not match existing workflow execution cluster: [%s]", - recordedCluster, cluster) - return errors.NewIncompatibleClusterError(ctx, errorMsg, recordedCluster) + if clusterInEvent != clusterInDB { + errorMsg := fmt.Sprintf("Cluster/producer from event [%s] does not match existing workflow execution clusterInDB: [%s]", + clusterInEvent, clusterInDB) + return errors.NewIncompatibleClusterError(ctx, errorMsg, clusterInEvent) } return nil }