Skip to content

Commit

Permalink
sending all subtask info on map task start (flyteorg#252)
Browse files Browse the repository at this point in the history
* added SubTaskMetadata type

Signed-off-by: Daniel Rammer <[email protected]>

* implemented SubTaskMetadata and integrated into k8s-array plugin

Signed-off-by: Daniel Rammer <[email protected]>

* using SubTaskMetadata in aws_batch plugin

Signed-off-by: Daniel Rammer <[email protected]>

* populating CacheStatus and Logs on ExternalResource

Signed-off-by: Daniel Rammer <[email protected]>

* removed SubTaskMetadata and instead directly using ExternalResource

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl version

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* documentation

Signed-off-by: Daniel Rammer <[email protected]>

* documentation

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issue

Signed-off-by: Daniel Rammer <[email protected]>

* added unit test for InitializeExternalResources

Signed-off-by: Daniel Rammer <[email protected]>

* maintaing phase version on   WaitingForResources phase change

Signed-off-by: Daniel Rammer <[email protected]>

* removing phase from log name

Signed-off-by: Daniel Rammer <[email protected]>

* fixed tests

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 7, 2022
1 parent 6de98f5 commit 8d33e2a
Show file tree
Hide file tree
Showing 15 changed files with 309 additions and 179 deletions.
4 changes: 2 additions & 2 deletions flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.0.0
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v0.23.0
github.com/flyteorg/flyteidl v0.24.7
github.com/flyteorg/flytestdlib v0.4.13
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.4.3
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/imdario/mergo v0.3.11 // indirect
github.com/imdario/mergo v0.3.11
github.com/kubeflow/common v0.4.0
github.com/kubeflow/mpi-operator/v2 v2.0.0-20210920181600-c5c0c3ef99ec
github.com/kubeflow/pytorch-operator v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.23.0 h1:Pjl9Tq1pJfIK0au5PiqPVpl25xTYosN6BruZl+PgWAk=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.7 h1:J9Wnk+JHpbRxOVs5moehQ7BRsZefsdABYuRYinXih8c=
github.com/flyteorg/flyteidl v0.24.7/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.4.13 h1:TzgqhECRGfOHYH1A7rUwcKEEH2rTtPxGy+oYcif7iBw=
github.com/flyteorg/flytestdlib v0.4.13/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs=
Expand Down
6 changes: 5 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ func (p Phase) IsWaitingForResources() bool {
type ExternalResource struct {
// A unique identifier for the external resource
ExternalID string
// Captures the status of caching for this external resource
CacheStatus core.CatalogCacheStatus
// A unique index for the external resource. Although the ID may change, this will remain the same
// throughout task event reports and retries.
Index uint32
// The nubmer of times this external resource has been attempted
// Log information for the external resource
Logs []*core.TaskLog
// The number of times this external resource has been attempted
RetryAttempt uint32
// Phase (if exists) associated with the external resource
Phase Phase
Expand Down
20 changes: 18 additions & 2 deletions flyteplugins/go/tasks/plugins/array/awsbatch/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/flyteorg/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"

idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
)

const (
Expand Down Expand Up @@ -104,15 +106,29 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
}

// Always attempt to augment phase with task logs.
subTaskDetails, err := GetTaskLinks(ctx, tCtx.TaskExecutionMetadata(), e.jobStore, pluginState)
var logLinks []*idlCore.TaskLog
var externalResources []*core.ExternalResource
switch p {
case arrayCore.PhaseStart:
externalResources, err = arrayCore.InitializeExternalResources(ctx, tCtx, pluginState.State,
func(tCtx core.TaskExecutionContext, childIndex int) string {
// subTaskIDs for the the aws_batch are generated based on the job ID, therefore
// to initialize we default to an empty string which will be updated later.
return ""
},
)
default:
logLinks, externalResources, err = GetTaskLinks(ctx, tCtx.TaskExecutionMetadata(), e.jobStore, pluginState)
}

if err != nil {
return core.UnknownTransition, err
}

logger.Infof(ctx, "Exiting handle with phase [%v]", pluginState.State.CurrentPhase)

// Determine transition information from the state
phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, subTaskDetails.LogLinks, subTaskDetails.SubTaskIDs)
phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, logLinks, externalResources)
if err != nil {
return core.UnknownTransition, err
}
Expand Down
44 changes: 19 additions & 25 deletions flyteplugins/go/tasks/plugins/array/awsbatch/task_links.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,14 @@ func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore
}
}

type SubTaskDetails struct {
LogLinks []*idlCore.TaskLog
SubTaskIDs []*string
}

func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata, jobStore *JobStore, state *State) (
SubTaskDetails, error) {
[]*idlCore.TaskLog, []*pluginCore.ExternalResource, error) {

logLinks := make([]*idlCore.TaskLog, 0, 4)
subTaskIDs := make([]*string, 0)
externalResources := make([]*pluginCore.ExternalResource, 0)

if state.GetExternalJobID() == nil {
return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, nil
return logLinks, externalResources, nil
}

// TODO: Add tasktemplate container config to job config
Expand All @@ -67,20 +59,14 @@ func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata
})

if err != nil {
return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, errors.Wrapf(errors2.DownstreamSystemError, err, "Failed to retrieve a job from job store.")
return logLinks, externalResources, errors.Wrapf(errors2.DownstreamSystemError, err, "Failed to retrieve a job from job store.")
}

if job == nil {
logger.Debugf(ctx, "Job [%v] not found in jobs store. It might have been evicted. If reasonable, bump the max "+
"size of the LRU cache.", *state.GetExternalJobID())

return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, nil
return logLinks, externalResources, nil
}

detailedArrayStatus := state.GetArrayStatus().Detailed
Expand All @@ -90,19 +76,27 @@ func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata
finalPhase := pluginCore.Phases[finalPhaseIdx]

// The caveat here is that we will mark all attempts with the final phase we are tracking in the state.
subTaskLogLinks := make([]*idlCore.TaskLog, 0)
retryAttempt := 0
for attemptIdx, attempt := range subJob.Attempts {
if len(attempt.LogStream) > 0 {
logLinks = append(logLinks, &idlCore.TaskLog{
subTaskLogLinks = append(subTaskLogLinks, &idlCore.TaskLog{
Name: fmt.Sprintf("AWS Batch #%v-%v (%v)", originalIndex, attemptIdx, finalPhase),
Uri: fmt.Sprintf(LogStreamFormatter, jobStore.GetRegion(), attempt.LogStream),
})
}

retryAttempt = attemptIdx
}
subTaskIDs = append(subTaskIDs, &subJob.ID)

externalResources = append(externalResources, &pluginCore.ExternalResource{
ExternalID: subJob.ID,
Index: uint32(originalIndex),
Logs: subTaskLogLinks,
RetryAttempt: uint32(retryAttempt),
Phase: finalPhase,
})
}

return SubTaskDetails{
LogLinks: logLinks,
SubTaskIDs: subTaskIDs,
}, nil
return logLinks, externalResources, nil
}
66 changes: 66 additions & 0 deletions flyteplugins/go/tasks/plugins/array/core/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package core

import (
"context"

idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
)

// InitializeExternalResources constructs an ExternalResource array where each element describes the
// initial state of the subtask. This involves labeling all cached subtasks as successful with a
// cache hit and initializing others to undefined state.
func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionContext, state *State,
generateSubTaskID func(core.TaskExecutionContext, int) string) ([]*core.ExternalResource, error) {
externalResources := make([]*core.ExternalResource, state.GetOriginalArraySize())

taskTemplate, err := tCtx.TaskReader().Read(ctx)
if err != nil {
return externalResources, err
} else if taskTemplate == nil {
return externalResources, errors.Errorf(errors.BadTaskSpecification, "Required value not set, taskTemplate is nil")
}

executeSubTaskCount := 0
cachedSubTaskCount := 0
for i := 0; i < int(state.GetOriginalArraySize()); i++ {
var cacheStatus idlCore.CatalogCacheStatus
var childIndex int
var phase core.Phase

if state.IndexesToCache.IsSet(uint(i)) {
// if not cached set to PhaseUndefined and set cacheStatus according to Discoverable
phase = core.PhaseUndefined
if taskTemplate.Metadata == nil || !taskTemplate.Metadata.Discoverable {
cacheStatus = idlCore.CatalogCacheStatus_CACHE_DISABLED
} else {
cacheStatus = idlCore.CatalogCacheStatus_CACHE_MISS
}

childIndex = executeSubTaskCount
executeSubTaskCount++
} else {
// if cached set to PhaseSuccess and mark as CACHE_HIT
phase = core.PhaseSuccess
cacheStatus = idlCore.CatalogCacheStatus_CACHE_HIT

// child index is computed as a pseudo-value to ensure non-overlapping subTaskIDs
childIndex = state.GetExecutionArraySize() + cachedSubTaskCount
cachedSubTaskCount++
}

subTaskID := generateSubTaskID(tCtx, childIndex)
externalResources[i] = &core.ExternalResource{
ExternalID: subTaskID,
CacheStatus: cacheStatus,
Index: uint32(i),
Logs: nil,
RetryAttempt: 0,
Phase: phase,
}
}

return externalResources, nil
}
70 changes: 70 additions & 0 deletions flyteplugins/go/tasks/plugins/array/core/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package core

import (
"context"
"testing"

idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"

"github.com/flyteorg/flytestdlib/bitarray"

"github.com/stretchr/testify/assert"
)

func TestInitializeExternalResources(t *testing.T) {
ctx := context.TODO()
subTaskCount := 10
cachedCount := 4

indexesToCache := InvertBitSet(bitarray.NewBitSet(uint(subTaskCount)), uint(subTaskCount))
for i := 0; i < cachedCount; i++ {
indexesToCache.Clear(uint(i))
}

tr := &mocks.TaskReader{}
tr.OnRead(ctx).Return(&idlCore.TaskTemplate{
Metadata: &idlCore.TaskMetadata{
Discoverable: true,
},
}, nil)

tID := &mocks.TaskExecutionID{}
tID.OnGetGeneratedName().Return("notfound")

tMeta := &mocks.TaskExecutionMetadata{}
tMeta.OnGetTaskExecutionID().Return(tID)

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskReader().Return(tr)
tCtx.OnTaskExecutionMetadata().Return(tMeta)

state := State{
OriginalArraySize: int64(subTaskCount),
ExecutionArraySize: subTaskCount - cachedCount,
IndexesToCache: indexesToCache,
}

externalResources, err := InitializeExternalResources(ctx, tCtx, &state,
func(_ core.TaskExecutionContext, i int) string {
return ""
},
)

assert.Nil(t, err)
assert.Equal(t, subTaskCount, len(externalResources))
for i, externalResource := range externalResources {
assert.Equal(t, uint32(i), externalResource.Index)
assert.Equal(t, 0, len(externalResource.Logs))
assert.Equal(t, uint32(0), externalResource.RetryAttempt)
if i < cachedCount {
assert.Equal(t, core.PhaseSuccess, externalResource.Phase)
assert.Equal(t, idlCore.CatalogCacheStatus_CACHE_HIT, externalResource.CacheStatus)
} else {
assert.Equal(t, core.PhaseUndefined, externalResource.Phase)
assert.Equal(t, idlCore.CatalogCacheStatus_CACHE_MISS, externalResource.CacheStatus)
}
}
}
17 changes: 3 additions & 14 deletions flyteplugins/go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,30 +175,19 @@ func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 {
// Info fields will always be nil, because we're going to send log links individually. This simplifies our state
// handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping
// all the log links takes up a lot of space).
func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string) (core.PhaseInfo, error) {
func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, externalResources []*core.ExternalResource) (core.PhaseInfo, error) {
phaseInfo := core.PhaseInfoUndefined
t := time.Now()

nowTaskInfo := &core.TaskInfo{
OccurredAt: &t,
Logs: logLinks,
ExternalResources: make([]*core.ExternalResource, len(subTaskIDs)),
}

for childIndex, subTaskID := range subTaskIDs {
originalIndex := CalculateOriginalIndex(childIndex, state.GetIndexesToCache())

nowTaskInfo.ExternalResources[childIndex] = &core.ExternalResource{
ExternalID: *subTaskID,
Index: uint32(originalIndex),
RetryAttempt: uint32(state.RetryAttempts.GetItem(childIndex)),
Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIndex)],
}
ExternalResources: externalResources,
}

switch p, version := state.GetPhase(); p {
case PhaseStart:
phaseInfo = core.PhaseInfoInitializing(t, core.DefaultPhaseVersion, state.GetReason(), &core.TaskInfo{OccurredAt: &t})
phaseInfo = core.PhaseInfoInitializing(t, core.DefaultPhaseVersion, state.GetReason(), nowTaskInfo)

case PhasePreLaunch:
version := GetPhaseVersionOffset(p, 1) + version
Expand Down
Loading

0 comments on commit 8d33e2a

Please sign in to comment.