From db4b8b83b98caec261ad994f58b8b326d3de393b Mon Sep 17 00:00:00 2001 From: Troy Chiu <114708546+troychiu@users.noreply.github.com> Date: Tue, 2 Apr 2024 10:27:33 -0700 Subject: [PATCH] SVR-396: Cherry-pick: ValueNotIn Filter and Fix execution phase bug (#182) ## Overview Cherry picks [b35246b87996c0829edff850a480fb5a698883df](https://github.com/flyteorg/flyte/commit/b35246b87996c0829edff850a480fb5a698883df) and [920fd6763a0ca9b399114071dea657ad25e37020](https://github.com/flyteorg/flyte/commit/920fd6763a0ca9b399114071dea657ad25e37020) ## Test Plan - [b35246b87996c0829edff850a480fb5a698883df](https://github.com/flyteorg/flyte/commit/b35246b87996c0829edff850a480fb5a698883df) The ValueNotIn filter works as expected in staging. - [920fd6763a0ca9b399114071dea657ad25e37020](https://github.com/flyteorg/flyte/commit/920fd6763a0ca9b399114071dea657ad25e37020) Originally this [execution](https://dogfood.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/amks92fslmngjr8lzxrt) is in `UNDEFINED` phase in DB. After deploying the changes, the [execution](https://dogfood.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/asnxrszrxk99nrzh97f9) is in `FAILED` phase now, which is what we expected. ## Rollout Plan (if applicable) *TODO: Describe any deployment or compatibility considerations for rolling out this change.* ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [ ] To be upstreamed ## Jira Issue https://unionai.atlassian.net/browse/ ## Checklist * [ ] Added tests * [ ] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation --- docs/concepts/admin.rst | 7 ++-- flyteadmin/pkg/common/filters.go | 26 +++++++++++-- flyteadmin/pkg/common/filters_test.go | 39 ++++++++++--------- .../pkg/manager/impl/execution_manager.go | 2 - .../repositories/transformers/execution.go | 7 +--- .../transformers/execution_test.go | 16 +++----- 6 files changed, 54 insertions(+), 43 deletions(-) diff --git a/docs/concepts/admin.rst b/docs/concepts/admin.rst index 4e6ee67a8e..9daf60b13e 100644 --- a/docs/concepts/admin.rst +++ b/docs/concepts/admin.rst @@ -298,14 +298,15 @@ The fully supported set of filter functions are - contains - gt (greater than) -- gte (greter than or equal to) +- gte (greater than or equal to) - lt (less than) - lte (less than or equal to) - eq (equal) - ne (not equal) -- value_in (for repeated sets of values) +- value_in (value in repeated sets of values) +- value_not_in (value not in repeated sets of values) -"value_in" is a special case where multiple values are passed to the filter expression. For example:: +"value_in" and "value_not_in" are special cases where multiple values are passed to the filter expression. For example:: value_in(phase, RUNNING;SUCCEEDED;FAILED) diff --git a/flyteadmin/pkg/common/filters.go b/flyteadmin/pkg/common/filters.go index e95aa98878..db3b75292c 100644 --- a/flyteadmin/pkg/common/filters.go +++ b/flyteadmin/pkg/common/filters.go @@ -29,6 +29,7 @@ const ( Equal NotEqual ValueIn + ValueNotIn ) // String formats for various filter expression queries @@ -43,6 +44,7 @@ const ( equalQuery = "%s = ?" notEqualQuery = "%s <> ?" valueInQuery = "%s in (?)" + valueNotInQuery = "%s not in (?)" ) // Set of available filters which exclusively accept a single argument value. @@ -58,7 +60,8 @@ var singleValueFilters = map[FilterExpression]bool{ // Set of available filters which exclusively accept repeated argument values. var repeatedValueFilters = map[FilterExpression]bool{ - ValueIn: true, + ValueIn: true, + ValueNotIn: true, } const EqualExpression = "eq" @@ -72,6 +75,19 @@ var filterNameMappings = map[string]FilterExpression{ EqualExpression: Equal, "ne": NotEqual, "value_in": ValueIn, + "value_not_in": ValueNotIn, +} + +var filterQueryMappings = map[FilterExpression]string{ + Contains: containsQuery, + GreaterThan: greaterThanQuery, + GreaterThanOrEqual: greaterThanOrEqualQuery, + LessThan: lessThanQuery, + LessThanOrEqual: lessThanOrEqualQuery, + Equal: equalQuery, + NotEqual: notEqualQuery, + ValueIn: valueInQuery, + ValueNotIn: valueNotInQuery, } var executionIdentifierFields = map[string]bool{ @@ -109,6 +125,8 @@ func getFilterExpressionName(expression FilterExpression) string { return "not equal" case ValueIn: return "value in" + case ValueNotIn: + return "value not in" default: return "" } @@ -167,9 +185,9 @@ func (f *inlineFilterImpl) GetField() string { func (f *inlineFilterImpl) getGormQueryExpr(formattedField string) (GormQueryExpr, error) { - // ValueIn is special because it uses repeating values. - if f.function == ValueIn { - queryStr := fmt.Sprintf(valueInQuery, formattedField) + // Filters that use repeated values + if _, ok := repeatedValueFilters[f.function]; ok { + queryStr := fmt.Sprintf(filterQueryMappings[f.function], formattedField) return GormQueryExpr{ Query: queryStr, Args: f.repeatedValue, diff --git a/flyteadmin/pkg/common/filters_test.go b/flyteadmin/pkg/common/filters_test.go index e49084557f..87ba5ac2ac 100644 --- a/flyteadmin/pkg/common/filters_test.go +++ b/flyteadmin/pkg/common/filters_test.go @@ -1,6 +1,7 @@ package common import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -83,6 +84,14 @@ func TestNewRepeatedValueFilter(t *testing.T) { assert.Equal(t, "projects.project in (?)", expression.Query) assert.Equal(t, vals, expression.Args) + filter, err = NewRepeatedValueFilter(Workflow, ValueNotIn, "project", vals) + assert.NoError(t, err) + + expression, err = filter.GetGormJoinTableQueryExpr("projects") + assert.NoError(t, err) + assert.Equal(t, "projects.project not in (?)", expression.Query) + assert.Equal(t, vals, expression.Args) + _, err = NewRepeatedValueFilter(Workflow, Equal, "domain", []string{"production", "qa"}) assert.EqualError(t, err, "invalid repeated value filter expression: equal") } @@ -96,16 +105,6 @@ func TestGetGormJoinTableQueryExpr(t *testing.T) { assert.Equal(t, "workflows.domain = ?", gormQueryExpr.Query) } -var expectedQueriesForFilters = map[FilterExpression]string{ - Contains: "field LIKE ?", - GreaterThan: "field > ?", - GreaterThanOrEqual: "field >= ?", - LessThan: "field < ?", - LessThanOrEqual: "field <= ?", - Equal: "field = ?", - NotEqual: "field <> ?", -} - var expectedArgsForFilters = map[FilterExpression]string{ Contains: "%value%", GreaterThan: "value", @@ -117,12 +116,13 @@ var expectedArgsForFilters = map[FilterExpression]string{ } func TestQueryExpressions(t *testing.T) { - for expression, expectedQuery := range expectedQueriesForFilters { + for expression := range singleValueFilters { filter, err := NewSingleValueFilter(Workflow, expression, "field", "value") assert.NoError(t, err) gormQueryExpr, err := filter.GetGormQueryExpr() assert.NoError(t, err) + expectedQuery := fmt.Sprintf(filterQueryMappings[expression], "field") assert.Equal(t, expectedQuery, gormQueryExpr.Query) expectedArg, ok := expectedArgsForFilters[expression] @@ -130,14 +130,17 @@ func TestQueryExpressions(t *testing.T) { assert.Equal(t, expectedArg, gormQueryExpr.Args) } - // Also test the one repeated value filter - filter, err := NewRepeatedValueFilter(Workflow, ValueIn, "field", []string{"value"}) - assert.NoError(t, err) + // Also test the repeated value filters + for expression := range repeatedValueFilters { + filter, err := NewRepeatedValueFilter(Workflow, expression, "field", []string{"value"}) + assert.NoError(t, err) - gormQueryExpr, err := filter.GetGormQueryExpr() - assert.NoError(t, err) - assert.Equal(t, "field in (?)", gormQueryExpr.Query) - assert.EqualValues(t, []string{"value"}, gormQueryExpr.Args) + gormQueryExpr, err := filter.GetGormQueryExpr() + assert.NoError(t, err) + expectedQuery := fmt.Sprintf(filterQueryMappings[expression], "field") + assert.Equal(t, expectedQuery, gormQueryExpr.Query) + assert.EqualValues(t, []string{"value"}, gormQueryExpr.Args) + } } func TestMapFilter(t *testing.T) { diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index fac2b02523..6093ac8bc3 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -646,7 +646,6 @@ func (m *ExecutionManager) launchSingleTaskExecution( TaskID: taskModel.ID, WorkflowID: workflowModel.ID, // The execution is not considered running until the propeller sends a specific event saying so. - Phase: core.WorkflowExecution_UNDEFINED, CreatedAt: m._clock.Now(), Notifications: notificationsSettings, WorkflowIdentifier: workflow.Id, @@ -1202,7 +1201,6 @@ func (m *ExecutionManager) launchExecution( LaunchPlanID: launchPlanModel.ID, WorkflowID: launchPlanModel.WorkflowID, // The execution is not considered running until the propeller sends a specific event saying so. - Phase: core.WorkflowExecution_UNDEFINED, CreatedAt: m._clock.Now(), Notifications: notificationsSettings, WorkflowIdentifier: workflow.Id, diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index 1795c9f4d9..45e8e041a7 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -35,7 +35,6 @@ type CreateExecutionModelInput struct { LaunchPlanID uint WorkflowID uint TaskID uint - Phase core.WorkflowExecution_Phase CreatedAt time.Time Notifications []*admin.Notification WorkflowIdentifier *core.Identifier @@ -77,7 +76,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e } createdAt := timestamppb.New(input.CreatedAt) closure := admin.ExecutionClosure{ - Phase: input.Phase, CreatedAt: createdAt, UpdatedAt: createdAt, Notifications: input.Notifications, @@ -88,9 +86,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e OccurredAt: createdAt, }, } - if input.Phase == core.WorkflowExecution_RUNNING { - closure.StartedAt = createdAt - } if input.Error != nil { closure.Phase = core.WorkflowExecution_FAILED execErr := &core.ExecutionError{ @@ -130,7 +125,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e Org: input.WorkflowExecutionID.GetOrg(), }, Spec: spec, - Phase: input.Phase.String(), + Phase: closure.Phase.String(), Closure: closureBytes, WorkflowID: input.WorkflowID, ExecutionCreatedAt: &input.CreatedAt, diff --git a/flyteadmin/pkg/repositories/transformers/execution_test.go b/flyteadmin/pkg/repositories/transformers/execution_test.go index 9f900b8bd8..1582852707 100644 --- a/flyteadmin/pkg/repositories/transformers/execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/execution_test.go @@ -73,7 +73,7 @@ func TestCreateExecutionModel(t *testing.T) { }, } namespace := "ns" - t.Run("running", func(t *testing.T) { + t.Run("successful execution", func(t *testing.T) { execution, err := CreateExecutionModel(CreateExecutionModelInput{ WorkflowExecutionID: core.WorkflowExecutionIdentifier{ Project: "project", @@ -84,7 +84,6 @@ func TestCreateExecutionModel(t *testing.T) { RequestSpec: execRequest.Spec, LaunchPlanID: lpID, WorkflowID: wfID, - Phase: core.WorkflowExecution_RUNNING, CreatedAt: createdAt, WorkflowIdentifier: workflowIdentifier, ParentNodeExecutionID: nodeID, @@ -107,6 +106,7 @@ func TestCreateExecutionModel(t *testing.T) { assert.Equal(t, nodeID, execution.ParentNodeExecutionID) assert.Equal(t, sourceID, execution.SourceExecutionID) assert.Equal(t, "launch_plan", execution.LaunchEntity) + assert.Equal(t, execution.Phase, core.WorkflowExecution_UNDEFINED.String()) expectedSpec := execRequest.Spec expectedSpec.Metadata.Principal = principal expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{ @@ -120,9 +120,8 @@ func TestCreateExecutionModel(t *testing.T) { expectedCreatedAt, _ := ptypes.TimestampProto(createdAt) expectedClosure, _ := proto.Marshal(&admin.ExecutionClosure{ - Phase: core.WorkflowExecution_RUNNING, + Phase: core.WorkflowExecution_UNDEFINED, CreatedAt: expectedCreatedAt, - StartedAt: expectedCreatedAt, UpdatedAt: expectedCreatedAt, WorkflowId: workflowIdentifier, StateChangeDetails: &admin.ExecutionStateChangeDetails{ @@ -145,7 +144,6 @@ func TestCreateExecutionModel(t *testing.T) { RequestSpec: execRequest.Spec, LaunchPlanID: lpID, WorkflowID: wfID, - Phase: core.WorkflowExecution_RUNNING, CreatedAt: createdAt, WorkflowIdentifier: workflowIdentifier, ParentNodeExecutionID: nodeID, @@ -169,6 +167,7 @@ func TestCreateExecutionModel(t *testing.T) { assert.Equal(t, nodeID, execution.ParentNodeExecutionID) assert.Equal(t, sourceID, execution.SourceExecutionID) assert.Equal(t, "launch_plan", execution.LaunchEntity) + assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase) expectedSpec := execRequest.Spec expectedSpec.Metadata.Principal = principal expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{ @@ -191,7 +190,6 @@ func TestCreateExecutionModel(t *testing.T) { }, }, CreatedAt: expectedCreatedAt, - StartedAt: expectedCreatedAt, UpdatedAt: expectedCreatedAt, WorkflowId: workflowIdentifier, StateChangeDetails: &admin.ExecutionStateChangeDetails{ @@ -214,7 +212,6 @@ func TestCreateExecutionModel(t *testing.T) { RequestSpec: execRequest.Spec, LaunchPlanID: lpID, WorkflowID: wfID, - Phase: core.WorkflowExecution_RUNNING, CreatedAt: createdAt, WorkflowIdentifier: workflowIdentifier, ParentNodeExecutionID: nodeID, @@ -238,6 +235,7 @@ func TestCreateExecutionModel(t *testing.T) { assert.Equal(t, nodeID, execution.ParentNodeExecutionID) assert.Equal(t, sourceID, execution.SourceExecutionID) assert.Equal(t, "launch_plan", execution.LaunchEntity) + assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase) expectedSpec := execRequest.Spec expectedSpec.Metadata.Principal = principal expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{ @@ -260,7 +258,6 @@ func TestCreateExecutionModel(t *testing.T) { }, }, CreatedAt: expectedCreatedAt, - StartedAt: expectedCreatedAt, UpdatedAt: expectedCreatedAt, WorkflowId: workflowIdentifier, StateChangeDetails: &admin.ExecutionStateChangeDetails{ @@ -283,7 +280,6 @@ func TestCreateExecutionModel(t *testing.T) { RequestSpec: execRequest.Spec, LaunchPlanID: lpID, WorkflowID: wfID, - Phase: core.WorkflowExecution_RUNNING, CreatedAt: createdAt, WorkflowIdentifier: workflowIdentifier, ParentNodeExecutionID: nodeID, @@ -307,6 +303,7 @@ func TestCreateExecutionModel(t *testing.T) { assert.Equal(t, nodeID, execution.ParentNodeExecutionID) assert.Equal(t, sourceID, execution.SourceExecutionID) assert.Equal(t, "launch_plan", execution.LaunchEntity) + assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase) expectedSpec := execRequest.Spec expectedSpec.Metadata.Principal = principal expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{ @@ -329,7 +326,6 @@ func TestCreateExecutionModel(t *testing.T) { }, }, CreatedAt: expectedCreatedAt, - StartedAt: expectedCreatedAt, UpdatedAt: expectedCreatedAt, WorkflowId: workflowIdentifier, StateChangeDetails: &admin.ExecutionStateChangeDetails{