Skip to content

Commit

Permalink
feat(pipeline): return the error from a component inside an iterator (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
donch1989 authored Jan 16, 2025
1 parent 6054251 commit bebe57f
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 42 deletions.
92 changes: 67 additions & 25 deletions pkg/worker/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,32 @@ import (
)

type setupReader struct {
memoryStore memory.MemoryStore
workflowID string
compID string
wfm memory.WorkflowMemory
conditionMap map[int]int
}

func NewSetupReader(wfm memory.WorkflowMemory, compID string, conditionMap map[int]int) *setupReader {
func NewSetupReader(memoryStore memory.MemoryStore, workflowID string, compID string, conditionMap map[int]int) *setupReader {
return &setupReader{
memoryStore: memoryStore,
workflowID: workflowID,
compID: compID,
wfm: wfm,
conditionMap: conditionMap,
}
}

func (i *setupReader) Read(ctx context.Context) (setups []*structpb.Struct, err error) {
wfm, err := i.memoryStore.GetWorkflowMemory(ctx, i.workflowID)
if err != nil {
return nil, err
}
for idx := range len(i.conditionMap) {
setupTemplate, err := i.wfm.GetComponentData(ctx, i.conditionMap[idx], i.compID, memory.ComponentDataSetupTemplate)
setupTemplate, err := wfm.GetComponentData(ctx, i.conditionMap[idx], i.compID, memory.ComponentDataSetupTemplate)
if err != nil {
return nil, err
}
setupVal, err := recipe.Render(ctx, setupTemplate, i.conditionMap[idx], i.wfm, false)
setupVal, err := recipe.Render(ctx, setupTemplate, i.conditionMap[idx], wfm, false)
if err != nil {
return nil, err
}
Expand All @@ -48,16 +54,18 @@ func (i *setupReader) Read(ctx context.Context) (setups []*structpb.Struct, err
}

type inputReader struct {
memoryStore memory.MemoryStore
workflowID string
compID string
wfm memory.WorkflowMemory
originalIdx int
binaryFetcher external.BinaryFetcher
}

func NewInputReader(wfm memory.WorkflowMemory, compID string, originalIdx int, binaryFetcher external.BinaryFetcher) *inputReader {
func NewInputReader(memoryStore memory.MemoryStore, workflowID string, compID string, originalIdx int, binaryFetcher external.BinaryFetcher) *inputReader {
return &inputReader{
memoryStore: memoryStore,
workflowID: workflowID,
compID: compID,
wfm: wfm,
originalIdx: originalIdx,
binaryFetcher: binaryFetcher,
}
Expand All @@ -67,18 +75,22 @@ func NewInputReader(wfm memory.WorkflowMemory, compID string, originalIdx int, b
// ReadData() instead.
// structpb is not suitable for handling binary data and will be phased out gradually.
func (i *inputReader) read(ctx context.Context) (inputVal format.Value, err error) {
wfm, err := i.memoryStore.GetWorkflowMemory(ctx, i.workflowID)
if err != nil {
return nil, err
}

inputTemplate, err := i.wfm.GetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInputTemplate)
inputTemplate, err := wfm.GetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInputTemplate)
if err != nil {
return nil, err
}

inputVal, err = recipe.Render(ctx, inputTemplate, i.originalIdx, i.wfm, false)
inputVal, err = recipe.Render(ctx, inputTemplate, i.originalIdx, wfm, false)
if err != nil {
return nil, err
}

if err = i.wfm.SetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInput, inputVal); err != nil {
if err = wfm.SetComponentData(ctx, i.originalIdx, i.compID, memory.ComponentDataInput, inputVal); err != nil {
return nil, err
}
return inputVal, nil
Expand Down Expand Up @@ -116,16 +128,18 @@ func (i *inputReader) ReadData(ctx context.Context, input any) (err error) {
}

type outputWriter struct {
memoryStore memory.MemoryStore
workflowID string
compID string
wfm memory.WorkflowMemory
originalIdx int
streaming bool
}

func NewOutputWriter(wfm memory.WorkflowMemory, compID string, originalIdx int, streaming bool) *outputWriter {
func NewOutputWriter(memoryStore memory.MemoryStore, workflowID string, compID string, originalIdx int, streaming bool) *outputWriter {
return &outputWriter{
memoryStore: memoryStore,
workflowID: workflowID,
compID: compID,
wfm: wfm,
originalIdx: originalIdx,
streaming: streaming,
}
Expand Down Expand Up @@ -157,22 +171,26 @@ func (o *outputWriter) Write(ctx context.Context, output *structpb.Struct) (err
// Use WriteData() instead. structpb is not suitable for handling binary data
// and will be phased out gradually.
func (o *outputWriter) write(ctx context.Context, val format.Value) (err error) {
wfm, err := o.memoryStore.GetWorkflowMemory(ctx, o.workflowID)
if err != nil {
return err
}

if err := o.wfm.SetComponentData(ctx, o.originalIdx, o.compID, memory.ComponentDataOutput, val); err != nil {
if err := wfm.SetComponentData(ctx, o.originalIdx, o.compID, memory.ComponentDataOutput, val); err != nil {
return err
}

if o.streaming {
outputTemplate, err := o.wfm.Get(ctx, o.originalIdx, string(memory.PipelineOutputTemplate))
outputTemplate, err := wfm.Get(ctx, o.originalIdx, string(memory.PipelineOutputTemplate))
if err != nil {
return err
}

output, err := recipe.Render(ctx, outputTemplate, o.originalIdx, o.wfm, true)
output, err := recipe.Render(ctx, outputTemplate, o.originalIdx, wfm, true)
if err != nil {
return err
}
err = o.wfm.SetPipelineData(ctx, o.originalIdx, memory.PipelineOutput, output)
err = wfm.SetPipelineData(ctx, o.originalIdx, memory.PipelineOutput, output)
if err != nil {
return err
}
Expand All @@ -182,20 +200,44 @@ func (o *outputWriter) write(ctx context.Context, val format.Value) (err error)
}

type errorHandler struct {
memoryStore memory.MemoryStore
workflowID string
compID string
wfm memory.WorkflowMemory
originalIdx int

parentWorkflowID *string
parentCompID *string
parentOriginalIdx *int
}

func NewErrorHandler(wfm memory.WorkflowMemory, compID string, originalIdx int) *errorHandler {
func NewErrorHandler(memoryStore memory.MemoryStore, workflowID string, compID string, originalIdx int, parentWorkflowID *string, parentCompID *string, parentOriginalIdx *int) *errorHandler {
return &errorHandler{
compID: compID,
wfm: wfm,
originalIdx: originalIdx,
memoryStore: memoryStore,
workflowID: workflowID,
compID: compID,
originalIdx: originalIdx,
parentWorkflowID: parentWorkflowID,
parentCompID: parentCompID,
parentOriginalIdx: parentOriginalIdx,
}
}

func (e *errorHandler) Error(ctx context.Context, err error) {
_ = e.wfm.SetComponentStatus(ctx, e.originalIdx, e.compID, memory.ComponentStatusErrored, true)
_ = e.wfm.SetComponentErrorMessage(ctx, e.originalIdx, e.compID, errmsg.MessageOrErr(err))

wfm, wfmErr := e.memoryStore.GetWorkflowMemory(ctx, e.workflowID)
if wfmErr != nil {
return
}

_ = wfm.SetComponentStatus(ctx, e.originalIdx, e.compID, memory.ComponentStatusErrored, true)
_ = wfm.SetComponentErrorMessage(ctx, e.originalIdx, e.compID, errmsg.MessageOrErr(err))

if e.parentWorkflowID != nil {
iterWfm, iterWfmErr := e.memoryStore.GetWorkflowMemory(ctx, *e.parentWorkflowID)
if iterWfmErr != nil {
return
}
_ = iterWfm.SetComponentStatus(ctx, *e.parentOriginalIdx, *e.parentCompID, memory.ComponentStatusErrored, true)
_ = iterWfm.SetComponentErrorMessage(ctx, *e.parentOriginalIdx, *e.parentCompID, errmsg.MessageOrErr(err))
}
}
58 changes: 41 additions & 17 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type TriggerPipelineWorkflowParam struct {
Mode mgmtpb.Mode
TriggerFromAPI bool
WorkerUID uuid.UUID

// If the pipeline trigger is from an iterator, these fields will be set
ParentWorkflowID *string
ParentCompID *string
ParentOriginalIdx *int
}

type SchedulePipelineLoaderActivityParam struct {
Expand All @@ -69,6 +74,11 @@ type ComponentActivityParam struct {
Task string
SystemVariables recipe.SystemVariables // TODO: we should store vars directly in trigger memory.
Streaming bool

// If the component belongs to an iterator, these fields will be set
ParentWorkflowID *string
ParentCompID *string
ParentOriginalIdx *int
}

type PreIteratorActivityParam struct {
Expand Down Expand Up @@ -301,13 +311,16 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip
}).Get(ctx, nil)

args := &ComponentActivityParam{
WorkflowID: workflowID,
ID: compID,
UpstreamIDs: upstreamIDs,
Type: comp.Type,
Task: comp.Task,
Condition: comp.Condition,
SystemVariables: param.SystemVariables,
WorkflowID: workflowID,
ID: compID,
UpstreamIDs: upstreamIDs,
Type: comp.Type,
Task: comp.Task,
Condition: comp.Condition,
SystemVariables: param.SystemVariables,
ParentWorkflowID: param.ParentWorkflowID,
ParentCompID: param.ParentCompID,
ParentOriginalIdx: param.ParentOriginalIdx,
}

futures = append(futures, workflow.ExecuteActivity(ctx, w.ComponentActivity, args))
Expand Down Expand Up @@ -351,12 +364,13 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip
workflow.WithChildOptions(ctx, childWorkflowOptions),
"TriggerPipelineWorkflow",
&TriggerPipelineWorkflowParam{
TriggerFromAPI: false,
SystemVariables: param.SystemVariables,
Mode: mgmtpb.Mode_MODE_SYNC,
WorkerUID: param.WorkerUID,
// TODO: support streaming inside iterator.
// IsStreaming: param.IsStreaming,
TriggerFromAPI: false,
SystemVariables: param.SystemVariables,
Mode: mgmtpb.Mode_MODE_SYNC,
WorkerUID: param.WorkerUID,
ParentWorkflowID: &workflowID,
ParentCompID: &compID,
ParentOriginalIdx: &iter,
}))
}
for iter := 0; iter < len(itFutures); iter++ {
Expand Down Expand Up @@ -538,7 +552,7 @@ func (w *worker) ComponentActivity(ctx context.Context, param *ComponentActivity
return nil
}

setups, err := NewSetupReader(wfm, param.ID, conditionMap).Read(ctx)
setups, err := NewSetupReader(w.memoryStore, param.WorkflowID, param.ID, conditionMap).Read(ctx)
if err != nil {
return componentActivityError(ctx, wfm, err, componentActivityErrorType, param.ID)
}
Expand All @@ -563,10 +577,11 @@ func (w *worker) ComponentActivity(ctx context.Context, param *ComponentActivity

jobs := make([]*componentbase.Job, len(conditionMap))
for idx, originalIdx := range conditionMap {

jobs[idx] = &componentbase.Job{
Input: NewInputReader(wfm, param.ID, originalIdx, w.binaryFetcher),
Output: NewOutputWriter(wfm, param.ID, originalIdx, wfm.IsStreaming()),
Error: NewErrorHandler(wfm, param.ID, originalIdx),
Input: NewInputReader(w.memoryStore, param.WorkflowID, param.ID, originalIdx, w.binaryFetcher),
Output: NewOutputWriter(w.memoryStore, param.WorkflowID, param.ID, originalIdx, wfm.IsStreaming()),
Error: NewErrorHandler(w.memoryStore, param.WorkflowID, param.ID, originalIdx, param.ParentWorkflowID, param.ParentCompID, param.ParentOriginalIdx),
}
}
err = execution.Execute(
Expand Down Expand Up @@ -905,6 +920,15 @@ func (w *worker) PostIteratorActivity(ctx context.Context, param *PostIteratorAc
return componentActivityError(ctx, wfm, err, postIteratorActivityErrorType, param.ID)
}

errored, err := wfm.GetComponentStatus(ctx, originalIdx, param.ID, memory.ComponentStatusErrored)

if err != nil {
return componentActivityError(ctx, wfm, err, postIteratorActivityErrorType, param.ID)
}
if errored {
return nil
}

output := data.Map{}
for k, v := range param.OutputElements {
elemVals := data.Array{}
Expand Down

0 comments on commit bebe57f

Please sign in to comment.