Skip to content

Commit

Permalink
Save raw output data from execution events (flyteorg#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Oct 13, 2021
1 parent e220298 commit e94ed6b
Show file tree
Hide file tree
Showing 20 changed files with 583 additions and 165 deletions.
53 changes: 21 additions & 32 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ func (m *ExecutionManager) launchSingleTaskExecution(
workflowExecutionID, err)
return nil, nil, err
}
m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(proto.Size(request.Inputs)))
return ctx, executionModel, nil

}

func resolvePermissions(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole {
Expand Down Expand Up @@ -1148,7 +1148,7 @@ func (m *ExecutionManager) emitOverallWorkflowExecutionTime(

func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admin.WorkflowExecutionEventRequest) (
*admin.WorkflowExecutionEventResponse, error) {
err := validation.ValidateCreateWorkflowEventRequest(request)
err := validation.ValidateCreateWorkflowEventRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes)
if err != nil {
logger.Debugf(ctx, "received invalid CreateWorkflowEventRequest [%s]: %v", request.RequestId, err)
return nil, err
Expand Down Expand Up @@ -1202,6 +1202,9 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
m.systemMetrics.ActiveExecutions.Dec()
m.systemMetrics.ExecutionsTerminated.Inc()
go m.emitOverallWorkflowExecutionTime(executionModel, request.Event.OccurredAt)
if request.Event.GetOutputData() != nil {
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
}

err = m.publishNotifications(ctx, request, *executionModel)
if err != nil {
Expand Down Expand Up @@ -1256,13 +1259,6 @@ func (m *ExecutionManager) GetExecutionData(
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err)
return nil, err
}
signedOutputsURLBlob := admin.UrlBlob{}
if execution.Closure.GetOutputs() != nil && execution.Closure.GetOutputs().GetUri() != "" {
signedOutputsURLBlob, err = m.urlData.Get(ctx, execution.Closure.GetOutputs().GetUri())
if err != nil {
return nil, err
}
}
// Prior to flyteidl v0.15.0, Inputs were held in ExecutionClosure and were not offloaded. Ensure we can return the inputs as expected.
if len(executionModel.InputsURI) == 0 {
closure := &admin.ExecutionClosure{}
Expand All @@ -1280,36 +1276,29 @@ func (m *ExecutionManager) GetExecutionData(
return nil, err
}
}
inputsURLBlob, err := m.urlData.Get(ctx, executionModel.InputsURI.String())
inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, executionModel.InputsURI.String())
if err != nil {
return nil, err
}
response := &admin.WorkflowExecutionGetDataResponse{
Outputs: &signedOutputsURLBlob,
Inputs: &inputsURLBlob,
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
remoteDataScheme := m.config.ApplicationConfiguration().GetRemoteDataConfig().Scheme
if util.ShouldFetchData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), inputsURLBlob) {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", executionModel.InputsURI, err)
}
response.FullInputs = &fullInputs
outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, util.ToExecutionClosureInterface(execution.Closure))
if err != nil {
return nil, err
}
if remoteDataScheme == common.Local || remoteDataScheme == common.None || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) {
var fullOutputs core.LiteralMap
outputsURI := execution.Closure.GetOutputs().GetUri()
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(outputsURI), &fullOutputs)
if err != nil {
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputsURI, err)
}
response.FullOutputs = &fullOutputs
response := &admin.WorkflowExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
}

m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
if response.Outputs.Bytes > 0 {
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
} else if response.FullOutputs != nil {
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs)))
}
return response, nil
}

Expand Down
56 changes: 22 additions & 34 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (m *NodeExecutionManager) uploadDynamicWorkflowClosure(

func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admin.NodeExecutionEventRequest) (
*admin.NodeExecutionEventResponse, error) {
if err := validation.ValidateNodeExecutionEventRequest(&request); err != nil {
if err := validation.ValidateNodeExecutionEventRequest(&request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil {
logger.Debugf(ctx, "CreateNodeEvent called with invalid identifier [%+v]: %v", request.Event.Id, err)
}
ctx = getNodeExecutionContext(ctx, request.Event.Id)
Expand Down Expand Up @@ -274,6 +274,9 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
} else if common.IsNodeExecutionTerminal(request.Event.Phase) {
m.metrics.ActiveNodeExecutions.Dec()
m.metrics.NodeExecutionsTerminated.Inc()
if request.Event.GetOutputData() != nil {
m.metrics.NodeExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
}
}
m.metrics.NodeExecutionEventsCreated.Inc()

Expand Down Expand Up @@ -437,41 +440,22 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
logger.Debugf(ctx, "failed to transform node execution model [%+v] when fetching data: %v", request.Id, err)
return nil, err
}
signedInputsURLBlob := admin.UrlBlob{}
if len(nodeExecution.InputUri) != 0 {
signedInputsURLBlob, err = m.urlData.Get(ctx, nodeExecution.InputUri)
if err != nil {
return nil, err
}

inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, nodeExecution.InputUri)
if err != nil {
return nil, err
}
signedOutputsURLBlob := admin.UrlBlob{}
if nodeExecution.Closure.GetOutputUri() != "" {
signedOutputsURLBlob, err = m.urlData.Get(ctx, nodeExecution.Closure.GetOutputUri())
if err != nil {
return nil, err
}
outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, nodeExecution.Closure)
if err != nil {
return nil, err
}
response := &admin.NodeExecutionGetDataResponse{
Inputs: &signedInputsURLBlob,
Outputs: &signedOutputsURLBlob,
}
if util.ShouldFetchData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedInputsURLBlob) {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", nodeExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if util.ShouldFetchOutputData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedOutputsURLBlob,
nodeExecution.Closure.GetOutputUri()) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v",
nodeExecution.Closure.GetOutputUri(), err)
}
response.FullOutputs = &fullOutputs
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
}

if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
Expand All @@ -488,7 +472,11 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
}

m.metrics.NodeExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
if response.Outputs.Bytes > 0 {
m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
} else if response.FullOutputs != nil {
m.metrics.NodeExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs)))
}

return response, nil
}
Expand Down
52 changes: 24 additions & 28 deletions pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState(

func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) (
*admin.TaskExecutionEventResponse, error) {
if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil {
return nil, err
}

// Get the parent node execution, if none found a MissingEntityError will be returned
nodeExecutionID := request.Event.ParentNodeExecutionId
taskExecutionID := core.TaskExecutionIdentifier{
Expand Down Expand Up @@ -182,6 +186,9 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
} else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 {
m.metrics.ActiveTaskExecutions.Dec()
m.metrics.TaskExecutionsTerminated.Inc()
if request.Event.GetOutputData() != nil {
m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
}
}

if err = m.notificationClient.Publish(ctx, proto.MessageName(&request), &request); err != nil {
Expand Down Expand Up @@ -287,42 +294,31 @@ func (m *TaskExecutionManager) GetTaskExecutionData(
request.Id, err)
return nil, err
}
signedInputsURLBlob, err := m.urlData.Get(ctx, taskExecution.InputUri)

inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, taskExecution.InputUri)
if err != nil {
return nil, err
}
signedOutputsURLBlob := admin.UrlBlob{}
if taskExecution.Closure.GetOutputUri() != "" {
signedOutputsURLBlob, err = m.urlData.Get(ctx, taskExecution.Closure.GetOutputUri())
if err != nil {
return nil, err
}
outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(),
m.storageClient, taskExecution.Closure)
if err != nil {
return nil, err
}

response := &admin.TaskExecutionGetDataResponse{
Inputs: &signedInputsURLBlob,
Outputs: &signedOutputsURLBlob,
}
if util.ShouldFetchData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedInputsURLBlob) {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", taskExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if util.ShouldFetchOutputData(m.config.ApplicationConfiguration().GetRemoteDataConfig(), signedOutputsURLBlob,
taskExecution.Closure.GetOutputUri()) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v",
taskExecution.Closure.GetOutputUri(), err)
}
response.FullOutputs = &fullOutputs
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
}

m.metrics.TaskExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.metrics.TaskExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
if response.Outputs.Bytes > 0 {
m.metrics.TaskExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
} else if response.FullOutputs != nil {
m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs)))
}
return response, nil
}

Expand Down
111 changes: 106 additions & 5 deletions pkg/manager/impl/util/data.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,117 @@
package util

import (
"context"

"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
)

func ShouldFetchData(config *interfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool {
func shouldFetchData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool {
return config.Scheme == common.Local || config.Scheme == common.None || config.MaxSizeInBytes == 0 ||
(len(urlBlob.Url) > 0 && urlBlob.Bytes < config.MaxSizeInBytes)
urlBlob.Bytes < config.MaxSizeInBytes
}

func shouldFetchOutputData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob, outputURI string) bool {
return len(outputURI) > 0 && shouldFetchData(config, urlBlob)
}

func ShouldFetchOutputData(config *interfaces.RemoteDataConfig, urlBlob admin.UrlBlob, outputURI string) bool {
return ShouldFetchData(config, urlBlob) && len(outputURI) > 0
// Returns an inputs URL blob and if config settings permit, inline inputs data for an execution.
func GetInputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, inputURI string) (
*core.LiteralMap, *admin.UrlBlob, error) {
if len(inputURI) == 0 {
return nil, nil, nil
}
inputsURLBlob, err := urlData.Get(ctx, inputURI)
if err != nil {
return nil, nil, err
}

var fullInputs core.LiteralMap
if shouldFetchData(remoteDataConfig, inputsURLBlob) {
err := storageClient.ReadProtobuf(ctx, storage.DataReference(inputURI), &fullInputs)
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the input data.
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", inputURI, err)
}
}
return &fullInputs, &inputsURLBlob, nil
}

// Defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data.
type ExecutionClosure interface {
GetOutputUri() string //nolint
GetOutputData() *core.LiteralMap
}

// Wrapper around an admin.ExecutionClosure object which conforms to the output interface
// used by admin.NodeExecutionClosure and admin.TaskExecutionClosure
// Due to historical reasons, the workflow execution closure message is slightly different.
type workflowExecutionClosure struct {
*admin.ExecutionClosure
}

func (c workflowExecutionClosure) GetOutputUri() string { //nolint
var outputURI string
if c.ExecutionClosure != nil && c.ExecutionClosure.GetOutputs() != nil {
outputURI = c.ExecutionClosure.GetOutputs().GetUri()
}
return outputURI
}

func (c workflowExecutionClosure) GetOutputData() *core.LiteralMap {
if c.ExecutionClosure.GetOutputs() != nil && c.ExecutionClosure.GetOutputs().GetValues() != nil {
return c.ExecutionClosure.GetOutputs().GetValues()
}
return c.ExecutionClosure.GetOutputData()
}

// Converts a workflow execution closure to an implementation of the ExecutionClosure interface
// for use in producing execution output data.
func ToExecutionClosureInterface(closure *admin.ExecutionClosure) ExecutionClosure {
return &workflowExecutionClosure{
ExecutionClosure: closure,
}
}

// Returns an outputs URL blob and if config settings permit, inline outputs data for an execution.
func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, closure ExecutionClosure) (
*core.LiteralMap, *admin.UrlBlob, error) {
if closure == nil {
return nil, nil, nil
}
var outputsURLBlob admin.UrlBlob
if len(closure.GetOutputUri()) > 0 {
var err error
outputsURLBlob, err = urlData.Get(ctx, closure.GetOutputUri())
if err != nil {
return nil, nil, err
}
}

var fullOutputs = &core.LiteralMap{}
if closure.GetOutputData() != nil {
if int64(proto.Size(closure.GetOutputData())) < remoteDataConfig.MaxSizeInBytes {
fullOutputs = closure.GetOutputData()
} else {
logger.Debugf(ctx, "execution closure contains output data that exceeds max data size for responses")
}
} else if shouldFetchOutputData(remoteDataConfig, outputsURLBlob, closure.GetOutputUri()) {
err := storageClient.ReadProtobuf(ctx, storage.DataReference(closure.GetOutputUri()), fullOutputs)
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the output data.
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err)
}
}
return fullOutputs, &outputsURLBlob, nil
}
Loading

0 comments on commit e94ed6b

Please sign in to comment.