diff --git a/pkg/component/base/execution_wrapper.go b/pkg/component/base/execution_wrapper.go index 701f0df54..a57ce0a9d 100644 --- a/pkg/component/base/execution_wrapper.go +++ b/pkg/component/base/execution_wrapper.go @@ -76,6 +76,10 @@ func (e *ExecutionWrapper) Execute(ctx context.Context, jobs []*Job) (err error) inputs := make([]*structpb.Struct, len(jobs)) outputs := make([]*structpb.Struct, len(jobs)) + validInputs := make([]*structpb.Struct, 0, len(jobs)) + validJobs := make([]*Job, 0, len(jobs)) + validJobIdx := make([]int, 0, len(jobs)) + // Note: We need to check usage of all inputs simultaneously, so all inputs // must be read before execution. for batchIdx, job := range jobs { @@ -84,16 +88,19 @@ func (e *ExecutionWrapper) Execute(ctx context.Context, jobs []*Job) (err error) job.Error.Error(ctx, err) continue } + validInputs = append(validInputs, inputs[batchIdx]) + validJobs = append(validJobs, job) + validJobIdx = append(validJobIdx, batchIdx) } if err = h.Check(ctx, inputs); err != nil { return err } - wrappedJobs := make([]*Job, len(jobs)) - for batchIdx, job := range jobs { + wrappedJobs := make([]*Job, len(validJobs)) + for batchIdx, job := range validJobs { wrappedJobs[batchIdx] = &Job{ - Input: NewInputReader(inputs[batchIdx], e.GetTaskInputSchema()), + Input: NewInputReader(validInputs[batchIdx], e.GetTaskInputSchema()), Output: NewOutputWriter(job.Output, e.GetTaskOutputSchema()), Error: job.Error, } @@ -106,7 +113,7 @@ func (e *ExecutionWrapper) Execute(ctx context.Context, jobs []*Job) (err error) // Since there might be multiple writes, we collect the usage at the end of // the execution.​ for batchIdx, job := range wrappedJobs { - outputs[batchIdx] = job.Output.(*outputWriter).GetOutput() + outputs[validJobIdx[batchIdx]] = job.Output.(*outputWriter).GetOutput() } if err := h.Collect(ctx, inputs, outputs); err != nil { return err