From e26735d69f0935c6f7ac14dd0fb0e58ff390392a Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Fri, 25 Aug 2023 17:01:03 -0700 Subject: [PATCH] connect err (#28170) Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- sdks/go/pkg/beam/runners/prism/internal/execute.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index e9c898699c72..496767103434 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -263,7 +263,9 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro wk.Descriptors[stage.ID] = stage.desc case wk.ID: // Great! this is for this environment. // Broken abstraction. - buildDescriptor(stage, comps, wk) + if err := buildDescriptor(stage, comps, wk); err != nil { + return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err) + } stages[stage.ID] = stage slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName()))) outputs := maps.Keys(stage.OutputsToCoders)