Skip to content

Commit

Permalink
SVR-396: Cherry-pick: ValueNotIn Filter and Fix execution phase bug (#…
Browse files Browse the repository at this point in the history
…182)

## Overview
Cherry picks 
[b35246b](b35246b) and [920fd67](920fd67)

## Test Plan
-  [b35246b](b35246b)
The ValueNotIn filter works as expected in staging.
-  [920fd67](920fd67)
Originally this [execution](https://dogfood.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/amks92fslmngjr8lzxrt) is in `UNDEFINED` phase in DB. After deploying the changes, the [execution](https://dogfood.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/asnxrszrxk99nrzh97f9) is in `FAILED` phase now, which is what we expected.


## Rollout Plan (if applicable)
*TODO: Describe any deployment or compatibility considerations for rolling out this change.*

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed

## Jira Issue
https://unionai.atlassian.net/browse/<project-number>

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
troychiu authored Apr 2, 2024
1 parent b313d2c commit db4b8b8
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 43 deletions.
7 changes: 4 additions & 3 deletions docs/concepts/admin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,15 @@ The fully supported set of filter functions are

- contains
- gt (greater than)
- gte (greter than or equal to)
- gte (greater than or equal to)
- lt (less than)
- lte (less than or equal to)
- eq (equal)
- ne (not equal)
- value_in (for repeated sets of values)
- value_in (value in repeated sets of values)
- value_not_in (value not in repeated sets of values)

"value_in" is a special case where multiple values are passed to the filter expression. For example::
"value_in" and "value_not_in" are special cases where multiple values are passed to the filter expression. For example::

value_in(phase, RUNNING;SUCCEEDED;FAILED)

Expand Down
26 changes: 22 additions & 4 deletions flyteadmin/pkg/common/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
Equal
NotEqual
ValueIn
ValueNotIn
)

// String formats for various filter expression queries
Expand All @@ -43,6 +44,7 @@ const (
equalQuery = "%s = ?"
notEqualQuery = "%s <> ?"
valueInQuery = "%s in (?)"
valueNotInQuery = "%s not in (?)"
)

// Set of available filters which exclusively accept a single argument value.
Expand All @@ -58,7 +60,8 @@ var singleValueFilters = map[FilterExpression]bool{

// Set of available filters which exclusively accept repeated argument values.
var repeatedValueFilters = map[FilterExpression]bool{
ValueIn: true,
ValueIn: true,
ValueNotIn: true,
}

const EqualExpression = "eq"
Expand All @@ -72,6 +75,19 @@ var filterNameMappings = map[string]FilterExpression{
EqualExpression: Equal,
"ne": NotEqual,
"value_in": ValueIn,
"value_not_in": ValueNotIn,
}

var filterQueryMappings = map[FilterExpression]string{
Contains: containsQuery,
GreaterThan: greaterThanQuery,
GreaterThanOrEqual: greaterThanOrEqualQuery,
LessThan: lessThanQuery,
LessThanOrEqual: lessThanOrEqualQuery,
Equal: equalQuery,
NotEqual: notEqualQuery,
ValueIn: valueInQuery,
ValueNotIn: valueNotInQuery,
}

var executionIdentifierFields = map[string]bool{
Expand Down Expand Up @@ -109,6 +125,8 @@ func getFilterExpressionName(expression FilterExpression) string {
return "not equal"
case ValueIn:
return "value in"
case ValueNotIn:
return "value not in"
default:
return ""
}
Expand Down Expand Up @@ -167,9 +185,9 @@ func (f *inlineFilterImpl) GetField() string {

func (f *inlineFilterImpl) getGormQueryExpr(formattedField string) (GormQueryExpr, error) {

// ValueIn is special because it uses repeating values.
if f.function == ValueIn {
queryStr := fmt.Sprintf(valueInQuery, formattedField)
// Filters that use repeated values
if _, ok := repeatedValueFilters[f.function]; ok {
queryStr := fmt.Sprintf(filterQueryMappings[f.function], formattedField)
return GormQueryExpr{
Query: queryStr,
Args: f.repeatedValue,
Expand Down
39 changes: 21 additions & 18 deletions flyteadmin/pkg/common/filters_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -83,6 +84,14 @@ func TestNewRepeatedValueFilter(t *testing.T) {
assert.Equal(t, "projects.project in (?)", expression.Query)
assert.Equal(t, vals, expression.Args)

filter, err = NewRepeatedValueFilter(Workflow, ValueNotIn, "project", vals)
assert.NoError(t, err)

expression, err = filter.GetGormJoinTableQueryExpr("projects")
assert.NoError(t, err)
assert.Equal(t, "projects.project not in (?)", expression.Query)
assert.Equal(t, vals, expression.Args)

_, err = NewRepeatedValueFilter(Workflow, Equal, "domain", []string{"production", "qa"})
assert.EqualError(t, err, "invalid repeated value filter expression: equal")
}
Expand All @@ -96,16 +105,6 @@ func TestGetGormJoinTableQueryExpr(t *testing.T) {
assert.Equal(t, "workflows.domain = ?", gormQueryExpr.Query)
}

var expectedQueriesForFilters = map[FilterExpression]string{
Contains: "field LIKE ?",
GreaterThan: "field > ?",
GreaterThanOrEqual: "field >= ?",
LessThan: "field < ?",
LessThanOrEqual: "field <= ?",
Equal: "field = ?",
NotEqual: "field <> ?",
}

var expectedArgsForFilters = map[FilterExpression]string{
Contains: "%value%",
GreaterThan: "value",
Expand All @@ -117,27 +116,31 @@ var expectedArgsForFilters = map[FilterExpression]string{
}

func TestQueryExpressions(t *testing.T) {
for expression, expectedQuery := range expectedQueriesForFilters {
for expression := range singleValueFilters {
filter, err := NewSingleValueFilter(Workflow, expression, "field", "value")
assert.NoError(t, err)

gormQueryExpr, err := filter.GetGormQueryExpr()
assert.NoError(t, err)
expectedQuery := fmt.Sprintf(filterQueryMappings[expression], "field")
assert.Equal(t, expectedQuery, gormQueryExpr.Query)

expectedArg, ok := expectedArgsForFilters[expression]
assert.True(t, ok, "Missing expected argument for expression %s", expression)
assert.Equal(t, expectedArg, gormQueryExpr.Args)
}

// Also test the one repeated value filter
filter, err := NewRepeatedValueFilter(Workflow, ValueIn, "field", []string{"value"})
assert.NoError(t, err)
// Also test the repeated value filters
for expression := range repeatedValueFilters {
filter, err := NewRepeatedValueFilter(Workflow, expression, "field", []string{"value"})
assert.NoError(t, err)

gormQueryExpr, err := filter.GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "field in (?)", gormQueryExpr.Query)
assert.EqualValues(t, []string{"value"}, gormQueryExpr.Args)
gormQueryExpr, err := filter.GetGormQueryExpr()
assert.NoError(t, err)
expectedQuery := fmt.Sprintf(filterQueryMappings[expression], "field")
assert.Equal(t, expectedQuery, gormQueryExpr.Query)
assert.EqualValues(t, []string{"value"}, gormQueryExpr.Args)
}
}

func TestMapFilter(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ func (m *ExecutionManager) launchSingleTaskExecution(
TaskID: taskModel.ID,
WorkflowID: workflowModel.ID,
// The execution is not considered running until the propeller sends a specific event saying so.
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: m._clock.Now(),
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
Expand Down Expand Up @@ -1202,7 +1201,6 @@ func (m *ExecutionManager) launchExecution(
LaunchPlanID: launchPlanModel.ID,
WorkflowID: launchPlanModel.WorkflowID,
// The execution is not considered running until the propeller sends a specific event saying so.
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: m._clock.Now(),
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
Expand Down
7 changes: 1 addition & 6 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type CreateExecutionModelInput struct {
LaunchPlanID uint
WorkflowID uint
TaskID uint
Phase core.WorkflowExecution_Phase
CreatedAt time.Time
Notifications []*admin.Notification
WorkflowIdentifier *core.Identifier
Expand Down Expand Up @@ -77,7 +76,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
}
createdAt := timestamppb.New(input.CreatedAt)
closure := admin.ExecutionClosure{
Phase: input.Phase,
CreatedAt: createdAt,
UpdatedAt: createdAt,
Notifications: input.Notifications,
Expand All @@ -88,9 +86,6 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
OccurredAt: createdAt,
},
}
if input.Phase == core.WorkflowExecution_RUNNING {
closure.StartedAt = createdAt
}
if input.Error != nil {
closure.Phase = core.WorkflowExecution_FAILED
execErr := &core.ExecutionError{
Expand Down Expand Up @@ -130,7 +125,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
Org: input.WorkflowExecutionID.GetOrg(),
},
Spec: spec,
Phase: input.Phase.String(),
Phase: closure.Phase.String(),
Closure: closureBytes,
WorkflowID: input.WorkflowID,
ExecutionCreatedAt: &input.CreatedAt,
Expand Down
16 changes: 6 additions & 10 deletions flyteadmin/pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestCreateExecutionModel(t *testing.T) {
},
}
namespace := "ns"
t.Run("running", func(t *testing.T) {
t.Run("successful execution", func(t *testing.T) {
execution, err := CreateExecutionModel(CreateExecutionModelInput{
WorkflowExecutionID: core.WorkflowExecutionIdentifier{
Project: "project",
Expand All @@ -84,7 +84,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -107,6 +106,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, execution.Phase, core.WorkflowExecution_UNDEFINED.String())
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -120,9 +120,8 @@ func TestCreateExecutionModel(t *testing.T) {

expectedCreatedAt, _ := ptypes.TimestampProto(createdAt)
expectedClosure, _ := proto.Marshal(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_RUNNING,
Phase: core.WorkflowExecution_UNDEFINED,
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand All @@ -145,7 +144,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -169,6 +167,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -191,7 +190,6 @@ func TestCreateExecutionModel(t *testing.T) {
},
},
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand All @@ -214,7 +212,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -238,6 +235,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -260,7 +258,6 @@ func TestCreateExecutionModel(t *testing.T) {
},
},
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand All @@ -283,7 +280,6 @@ func TestCreateExecutionModel(t *testing.T) {
RequestSpec: execRequest.Spec,
LaunchPlanID: lpID,
WorkflowID: wfID,
Phase: core.WorkflowExecution_RUNNING,
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
Expand All @@ -307,6 +303,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
assert.Equal(t, "launch_plan", execution.LaunchEntity)
assert.Equal(t, core.WorkflowExecution_FAILED.String(), execution.Phase)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand All @@ -329,7 +326,6 @@ func TestCreateExecutionModel(t *testing.T) {
},
},
CreatedAt: expectedCreatedAt,
StartedAt: expectedCreatedAt,
UpdatedAt: expectedCreatedAt,
WorkflowId: workflowIdentifier,
StateChangeDetails: &admin.ExecutionStateChangeDetails{
Expand Down

0 comments on commit db4b8b8

Please sign in to comment.