Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
Create executions in flytectl (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmahindrakar-oss authored Mar 22, 2021
1 parent 0a2f561 commit 0b5ef91
Show file tree
Hide file tree
Showing 40 changed files with 3,000 additions and 130 deletions.
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_support_tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ package tools
import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/lyft/flytestdlib/cli/pflags"
_ "github.com/flyteorg/flytestdlib/cli/pflags"
_ "github.com/vektra/mockery/cmd/mockery"
)
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_test_targets/download_tooling.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ set -e
# In the format of "<cli>:<package>" or ":<package>" if no cli
tools=(
"github.com/vektra/mockery/cmd/mockery"
"github.com/lyft/flytestdlib/cli/pflags"
"github.com/flyteorg/flytestdlib/cli/pflags"
"github.com/golangci/golangci-lint/cmd/golangci-lint"
"github.com/alvaroloes/enumer"
)
Expand Down
2 changes: 2 additions & 0 deletions cmd/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func RemoteCreateCommand() *cobra.Command {
createResourcesFuncs := map[string]cmdcore.CommandEntry{
"project": {CmdFunc: createProjectsCommand, Aliases: []string{"projects"}, ProjectDomainNotRequired: true, PFlagProvider: projectConfig, Short: projectShort,
Long: projectLong},
"execution": {CmdFunc: createExecutionCommand, Aliases: []string{"executions"}, ProjectDomainNotRequired: false, PFlagProvider: executionConfig, Short: executionShort,
Long: executionLong},
}
cmdcore.AddCommands(createCmd, createResourcesFuncs)
return createCmd
Expand Down
28 changes: 24 additions & 4 deletions cmd/create/create_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,43 @@
package create

import (
"context"
"sort"
"testing"

cmdCore "github.com/flyteorg/flytectl/cmd/core"
"github.com/flyteorg/flytectl/cmd/testutils"
"github.com/flyteorg/flyteidl/clients/go/admin/mocks"

"github.com/stretchr/testify/assert"
)

const testDataFolder = "../testdata/"

var (
err error
ctx context.Context
mockClient *mocks.AdminServiceClient
args []string
cmdCtx cmdCore.CommandContext
)
var setup = testutils.Setup
var tearDownAndVerify = testutils.TearDownAndVerify

func TestCreateCommand(t *testing.T) {
createCommand := RemoteCreateCommand()
assert.Equal(t, createCommand.Use, "create")
assert.Equal(t, createCommand.Short, "Used for creating various flyte resources including tasks/workflows/launchplans/executions/project.")
assert.Equal(t, len(createCommand.Commands()), 1)
assert.Equal(t, len(createCommand.Commands()), 2)
cmdNouns := createCommand.Commands()
// Sort by Use value.
sort.Slice(cmdNouns, func(i, j int) bool {
return cmdNouns[i].Use < cmdNouns[j].Use
})
assert.Equal(t, cmdNouns[0].Use, "project")
assert.Equal(t, cmdNouns[0].Aliases, []string{"projects"})
assert.Equal(t, cmdNouns[0].Short, "Create project resources")
assert.Equal(t, cmdNouns[0].Use, "execution")
assert.Equal(t, cmdNouns[0].Aliases, []string{"executions"})
assert.Equal(t, cmdNouns[0].Short, executionShort)
assert.Equal(t, cmdNouns[1].Use, "project")
assert.Equal(t, cmdNouns[1].Aliases, []string{"projects"})
assert.Equal(t, cmdNouns[1].Short, "Create project resources")
}
129 changes: 129 additions & 0 deletions cmd/create/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package create

import (
"context"
"fmt"

"github.com/flyteorg/flytectl/cmd/config"
cmdCore "github.com/flyteorg/flytectl/cmd/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
)

const (
executionShort = "Create execution resources"
executionLong = `
Create the executions for given workflow/task in a project and domain.
There are three steps in generating an execution.
- Generate the execution spec file using the get command.
- Update the inputs for the execution if needed.
- Run the execution by passing in the generated yaml file.
The spec file should be generated first and then run the execution using the spec file.
You can reference the flytectl get task for more details
::
flytectl get tasks -d development -p flytectldemo core.advanced.run_merge_sort.merge --version v2 --execFile execution_spec.yaml
The generated file would look similar to this
.. code-block:: yaml
iamRoleARN: ""
inputs:
sorted_list1:
- 0
sorted_list2:
- 0
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: core.advanced.run_merge_sort.merge
version: "v2"
The generated file can be modified to change the input values.
.. code-block:: yaml
iamRoleARN: 'arn:aws:iam::12345678:role/defaultrole'
inputs:
sorted_list1:
- 2
- 4
- 6
sorted_list2:
- 1
- 3
- 5
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: core.advanced.run_merge_sort.merge
version: "v2"
And then can be passed through the command line.
Notice the source and target domain/projects can be different.
The root project and domain flags of -p and -d should point to task/launch plans project/domain.
::
flytectl create execution --execFile execution_spec.yaml -p flytectldemo -d development --targetProject flytesnacks
Usage
`
)

//go:generate pflags ExecutionConfig --default-var executionConfig

// ExecutionConfig hold configuration for create execution flags and configuration of the actual task or workflow to be launched.
type ExecutionConfig struct {
// pflag section
ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params.If not specified defaults to <<workflow/task>_name>.execution_spec.yaml"`
TargetDomain string `json:"targetDomain" pflag:",project where execution needs to be created.If not specified configured domain would be used."`
TargetProject string `json:"targetProject" pflag:",project where execution needs to be created.If not specified configured project would be used."`
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."`
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."`
// Non plfag section is read from the execution config generated by get task/launchplan
Workflow string `json:"workflow,omitempty"`
Task string `json:"task,omitempty"`
Version string `json:"version"`
Inputs map[string]interface{} `json:"inputs"`
}

type ExecutionParams struct {
name string
isTask bool
}

var (
executionConfig = &ExecutionConfig{}
)

func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error {
var execParams ExecutionParams
var err error
sourceProject := config.GetConfig().Project
sourceDomain := config.GetConfig().Domain
if execParams, err = readConfigAndValidate(config.GetConfig().Project, config.GetConfig().Domain); err != nil {
return err
}
var executionRequest *admin.ExecutionCreateRequest
if execParams.isTask {
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil {
return err
}
} else {
if executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil {
return err
}
}
exec, _err := cmdCtx.AdminClient().CreateExecution(ctx, executionRequest)
if _err != nil {
return _err
}
fmt.Printf("execution identifier %v\n", exec.Id)
return nil
}
166 changes: 166 additions & 0 deletions cmd/create/execution_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package create

import (
"testing"

"github.com/flyteorg/flytectl/cmd/config"
"github.com/flyteorg/flytectl/cmd/testutils"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/protobuf/types/known/timestamppb"
)

// This function needs to be called after testutils.Steup()
func createExecutionSetup() {
ctx = testutils.Ctx
cmdCtx = testutils.CmdCtx
mockClient = testutils.MockClient
sortedListLiteralType := core.Variable{
Type: &core.LiteralType{
Type: &core.LiteralType_CollectionType{
CollectionType: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_INTEGER,
},
},
},
},
}
variableMap := map[string]*core.Variable{
"sorted_list1": &sortedListLiteralType,
"sorted_list2": &sortedListLiteralType,
}

task1 := &admin.Task{
Id: &core.Identifier{
Name: "task1",
Version: "v2",
},
Closure: &admin.TaskClosure{
CreatedAt: &timestamppb.Timestamp{Seconds: 1, Nanos: 0},
CompiledTask: &core.CompiledTask{
Template: &core.TaskTemplate{
Interface: &core.TypedInterface{
Inputs: &core.VariableMap{
Variables: variableMap,
},
},
},
},
},
}
mockClient.OnGetTaskMatch(ctx, mock.Anything).Return(task1, nil)
parameterMap := map[string]*core.Parameter{
"numbers": {
Var: &core.Variable{
Type: &core.LiteralType{
Type: &core.LiteralType_CollectionType{
CollectionType: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_INTEGER,
},
},
},
},
},
},
"numbers_count": {
Var: &core.Variable{
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_INTEGER,
},
},
},
},
"run_local_at_count": {
Var: &core.Variable{
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_INTEGER,
},
},
},
Behavior: &core.Parameter_Default{
Default: &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_Integer{
Integer: 10,
},
},
},
},
},
},
},
},
}
launchPlan1 := &admin.LaunchPlan{
Id: &core.Identifier{
Name: "core.advanced.run_merge_sort.merge_sort",
Version: "v3",
},
Spec: &admin.LaunchPlanSpec{
DefaultInputs: &core.ParameterMap{
Parameters: parameterMap,
},
},
Closure: &admin.LaunchPlanClosure{
CreatedAt: &timestamppb.Timestamp{Seconds: 0, Nanos: 0},
ExpectedInputs: &core.ParameterMap{
Parameters: parameterMap,
},
},
}
objectGetRequest := &admin.ObjectGetRequest{
Id: &core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
Name: "core.advanced.run_merge_sort.merge_sort",
Version: "v3",
},
}
mockClient.OnGetLaunchPlanMatch(ctx, objectGetRequest).Return(launchPlan1, nil)
}
func TestCreateLaunchPlanExecutionFunc(t *testing.T) {
setup()
createExecutionSetup()
executionCreateResponseLP := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "f652ea3596e7f4d80a0e",
},
}
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseLP, nil)
executionConfig.ExecFile = testDataFolder + "launchplan_execution_spec.yaml"
err = createExecutionCommand(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything)
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"f652ea3596e7f4d80a0e"`)
}

func TestCreateTaskExecutionFunc(t *testing.T) {
setup()
createExecutionSetup()
executionCreateResponseTask := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "ff513c0e44b5b4a35aa5",
},
}
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseTask, nil)
executionConfig.ExecFile = testDataFolder + "task_execution_spec.yaml"
err = createExecutionCommand(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything)
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"ff513c0e44b5b4a35aa5" `)
}
Loading

0 comments on commit 0b5ef91

Please sign in to comment.