From 7e5392c0ffe9527de1c6b8c4f98788ccc28f5cbc Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 11 Sep 2024 01:17:36 +0000 Subject: [PATCH 01/10] Support BundleFinalization DoFn parameter --- .../pkg/beam/core/runtime/graphx/translate.go | 1 + sdks/go/pkg/beam/core/typex/special.go | 12 ++- sdks/go/pkg/beam/forward.go | 1 + .../runners/prism/internal/handlepardo.go | 6 +- .../runners/prism/internal/jobservices/job.go | 1 + .../beam/runners/prism/internal/preprocess.go | 1 + .../pkg/beam/runners/prism/internal/stage.go | 12 ++- .../runners/prism/internal/worker/bundle.go | 14 +++ sdks/go/test/integration/primitives/pardo.go | 102 +++++++++++++++++- .../test/integration/primitives/pardo_test.go | 97 +++++++++++++++++ 10 files changed, 239 insertions(+), 8 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 65280ef6b930..1e30d4258507 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -494,6 +494,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { m.requirements[URNRequiresSplittableDoFn] = true } if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok { + payload.RequestsFinalization = true m.requirements[URNRequiresBundleFinalization] = true } if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); ok { diff --git a/sdks/go/pkg/beam/core/typex/special.go b/sdks/go/pkg/beam/core/typex/special.go index edc1249fe763..af36ba92d280 100644 --- a/sdks/go/pkg/beam/core/typex/special.go +++ b/sdks/go/pkg/beam/core/typex/special.go @@ -69,8 +69,18 @@ type Window interface { Equals(o Window) bool } -// BundleFinalization allows registering callbacks to be performed after the runner durably persists bundle results. +// BundleFinalization allows registering callbacks for the runner to invoke after the bundle completes and the runner +// commits the output. Parameter is accessible during DoFn StartBundle, ProcessElement, FinishBundle. +// However, if your DoFn implementation requires BundleFinalization in StartBundle or FinishBundle, it is needed in the +// ProcessElement signature, even if not invoked, +// Common use cases for BundleFinalization would be to perform work after elements in a bundle have been processed. +// See beam.ParDo for documentation on these DoFn lifecycle methods. type BundleFinalization interface { + + // RegisterCallback registers the runner to invoke func() after the runner persists the bundle of processed elements. + // The time.Duration configures the callback expiration, after which the runner will not invoke func(). + // Returning error communicates to the runner that bundle finalization failed and the runner may choose to attempt + // finalization again. RegisterCallback(time.Duration, func() error) } diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go index 210c39ab4e49..b2f610b703e9 100644 --- a/sdks/go/pkg/beam/forward.go +++ b/sdks/go/pkg/beam/forward.go @@ -204,6 +204,7 @@ type Window = typex.Window // BundleFinalization represents the parameter used to register callbacks to // be run once the runner has durably persisted output for a bundle. +// See typex.BundleFinalization for more details. type BundleFinalization = typex.BundleFinalization // These are the reflect.Type instances of the universal types, which are used diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go index 2d3425af33c6..13e9b6f1b79d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go @@ -78,11 +78,7 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb } // Lets check for and remove anything that makes things less simple. - if pdo.OnWindowExpirationTimerFamilySpec == "" && - !pdo.RequestsFinalization && - !pdo.RequiresStableInput && - !pdo.RequiresTimeSortedInput && - pdo.RestrictionCoderId == "" { + if pdo.RestrictionCoderId == "" { // Which inputs are Side inputs don't change the graph further, // so they're not included here. Any nearly any ParDo can have them. diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 6cde48ded9ac..1407feafe325 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -44,6 +44,7 @@ import ( var supportedRequirements = map[string]struct{}{ urns.RequirementSplittableDoFn: {}, urns.RequirementStatefulProcessing: {}, + urns.RequirementBundleFinalization: {}, } // TODO, move back to main package, and key off of executor handlers? diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index ed7f168e36ee..7de32f85b7ee 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -445,6 +445,7 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa if err := (proto.UnmarshalOptions{}).Unmarshal(t.GetSpec().GetPayload(), pardo); err != nil { return fmt.Errorf("unable to decode ParDoPayload for %v", link.Transform) } + stg.finalize = pardo.RequestsFinalization if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 { stg.stateful = true } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index da23ca8ccce1..7a9e73752ac2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "runtime/debug" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" @@ -62,6 +63,7 @@ type stage struct { sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers internalCols []string // PCollections that escape. Used for precise coder sending. envID string + finalize bool stateful bool // hasTimers indicates the transform+timerfamily pairs that need to be waited on for // the stage to be considered complete. @@ -82,7 +84,7 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c defer func() { // Convert execution panics to errors to fail the bundle. if e := recover(); e != nil { - err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v", e, s) + err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v,stackTrace:\n%s", e, s, debug.Stack()) } }() slog.Debug("Execute: starting bundle", "bundle", rb) @@ -278,6 +280,14 @@ progress: slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput)) } em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals) + if s.finalize { + _, err := b.Finalize(ctx, wk) + if err != nil { + slog.Debug("SDK Error from bundle finalization", "bundle", rb, "error", err.Error()) + panic(err) + } + slog.Info("finalized bundle", "bundle", rb) + } b.OutputData = engine.TentativeData{} // Clear the data. return nil } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 50e427ca36f5..e09494734199 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -206,6 +206,20 @@ func (b *B) Cleanup(wk *W) { wk.mu.Unlock() } +func (b *B) Finalize(ctx context.Context, wk *W) (*fnpb.FinalizeBundleResponse, error) { + resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{ + Request: &fnpb.InstructionRequest_FinalizeBundle{ + FinalizeBundle: &fnpb.FinalizeBundleRequest{ + InstructionId: b.InstID, + }, + }, + }) + if resp.GetError() != "" { + return nil, fmt.Errorf("finalize[%v] error from SDK: %v", b.InstID, resp.GetError()) + } + return resp.GetFinalizeBundle(), nil +} + // Progress sends a progress request for the given bundle to the passed in worker, blocking on the response. func (b *B) Progress(ctx context.Context, wk *W) (*fnpb.ProcessBundleProgressResponse, error) { resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{ diff --git a/sdks/go/test/integration/primitives/pardo.go b/sdks/go/test/integration/primitives/pardo.go index 2c2383ea90ba..df62b615d9c2 100644 --- a/sdks/go/test/integration/primitives/pardo.go +++ b/sdks/go/test/integration/primitives/pardo.go @@ -18,10 +18,11 @@ package primitives import ( "flag" "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "reflect" + "time" ) func init() { @@ -32,6 +33,9 @@ func init() { register.Function3x2(asymJoinFn) register.Function5x0(splitByName) register.Function2x0(emitPipelineOptions) + beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem()) + beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem()) register.Iter1[int]() register.Iter2[int, int]() @@ -192,3 +196,99 @@ func emitPipelineOptions(_ []byte, emit func(string)) { emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B"))) emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C"))) } + +// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a +// beam.BundleFinalization in its ProcessElement method. +func ParDoProcessElementBundleFinalizer(fn beam.EncodedFunc) *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + + imp := beam.Impulse(s) + beam.ParDo0(s, &processElemBundleFinalizer{fn}, imp) + + return p +} + +type processElemBundleFinalizer struct { + Callback beam.EncodedFunc +} + +func (fn *processElemBundleFinalizer) ProcessElement(bf beam.BundleFinalization, _ []byte) { + bf.RegisterCallback(time.Second, func() error { + ret := fn.Callback.Fn.Call([]any{})[0] + if ret != nil { + return ret.(error) + } + return nil + }) +} + +// ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a noop +// beam.BundleFinalization in its ProcessElement method and a beam.BundleFinalization in its FinishBundle method. +func ParDoFinishBundleFinalizer(fn beam.EncodedFunc) *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + + imp := beam.Impulse(s) + beam.ParDo0(s, &finalizerInFinishBundle{fn}, imp) + + return p +} + +type finalizerInFinishBundle struct { + Callback beam.EncodedFunc +} + +// ProcessElement requires beam.BundleFinalization in its method signature in order for FinishBundle's +// beam.BundleFinalization to be invoked. +func (fn *finalizerInFinishBundle) ProcessElement(_ beam.BundleFinalization, _ []byte) {} + +func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) { + bf.RegisterCallback(time.Second, func() error { + ret := fn.Callback.Fn.Call([]any{})[0] + if ret != nil { + return ret.(error) + } + return nil + }) +} + +// ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a beam.BundleFinalization +// in all three lifecycle methods StartBundle, ProcessElement, FinishBundle. +func ParDoFinalizerInAll(start, process, finish beam.EncodedFunc) *beam.Pipeline { + p, s := beam.NewPipelineWithRoot() + + imp := beam.Impulse(s) + beam.ParDo0(s, &finalizerInAll{ + Start: start, + Process: process, + Finish: finish, + }, imp) + + return p +} + +type finalizerInAll struct { + Start beam.EncodedFunc + Process beam.EncodedFunc + Finish beam.EncodedFunc +} + +func (fn *finalizerInAll) StartBundle(bf beam.BundleFinalization) { + bf.RegisterCallback(time.Second, func() error { + fn.Start.Fn.Call([]any{}) + return nil + }) +} + +func (fn *finalizerInAll) ProcessElement(bf beam.BundleFinalization, _ []byte) { + bf.RegisterCallback(time.Second, func() error { + fn.Process.Fn.Call([]any{}) + return nil + }) +} + +func (fn *finalizerInAll) FinishBundle(bf beam.BundleFinalization) { + bf.RegisterCallback(time.Second, func() error { + fn.Finish.Fn.Call([]any{}) + return nil + }) +} diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index d2ad57b350b3..56489ef7431b 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -16,6 +16,10 @@ package primitives import ( + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "sync/atomic" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" @@ -46,3 +50,96 @@ func TestParDoPipelineOptions(t *testing.T) { integration.CheckFilters(t) ptest.RunAndValidate(t, ParDoPipelineOptions()) } + +var countInvokeBundleFinalizer atomic.Int32 + +func TestParDoBundleFinalizer(t *testing.T) { + integration.CheckFilters(t) + for _, tt := range []struct { + name string + fn func() error + pipelineFn func(beam.EncodedFunc) *beam.Pipeline + want int32 + wantErr bool + }{ + { + name: "InProcessElement", + fn: func() error { + countInvokeBundleFinalizer.Add(1) + return nil + }, + pipelineFn: ParDoProcessElementBundleFinalizer, + want: 1, + }, + { + name: "InProcessElement_withErr", + fn: func() error { + return fmt.Errorf("error") + }, + pipelineFn: ParDoProcessElementBundleFinalizer, + wantErr: true, + }, + { + name: "InFinishBundle", + fn: func() error { + countInvokeBundleFinalizer.Add(1) + return nil + }, + pipelineFn: ParDoFinishBundleFinalizer, + want: 1, + }, + { + name: "InFinishBundle_withError", + fn: func() error { + return fmt.Errorf("error") + }, + pipelineFn: ParDoFinishBundleFinalizer, + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + countInvokeBundleFinalizer.Store(0) + enc := beam.EncodedFunc{ + Fn: reflectx.MakeFunc(tt.fn), + } + p := tt.pipelineFn(enc) + _, err := ptest.RunWithMetrics(p) + if err == nil && tt.wantErr { + t.Errorf("error nil from pipeline Job, wantErr: %v", tt.wantErr) + } + if tt.wantErr { + return + } + if err != nil { + t.Fatalf("Failed to execute job: %v", err) + } + if got := countInvokeBundleFinalizer.Load(); got != tt.want { + t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, tt.want) + } + }) + } +} + +func TestParDoBundleFinalizerInAll(t *testing.T) { + var want int32 = 7 + countInvokeBundleFinalizer.Store(0) + startFn := func() { + countInvokeBundleFinalizer.Add(1) + } + startEnc := beam.EncodedFunc{Fn: reflectx.MakeFunc(startFn)} + processFn := func() { + countInvokeBundleFinalizer.Add(2) + } + processEnc := beam.EncodedFunc{Fn: reflectx.MakeFunc(processFn)} + finishFn := func() { + countInvokeBundleFinalizer.Add(4) + } + finishEnc := beam.EncodedFunc{Fn: reflectx.MakeFunc(finishFn)} + + p := ParDoFinalizerInAll(startEnc, processEnc, finishEnc) + ptest.RunAndValidate(t, p) + + if got := countInvokeBundleFinalizer.Load(); got != want { + t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, want) + } +} From ff8d1766b2e152840f3d5a53bbb3f8685466ff6d Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 11 Sep 2024 16:26:28 +0000 Subject: [PATCH 02/10] Replace beam.Register with register.DoFn2x0 --- sdks/go/test/integration/primitives/pardo.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/go/test/integration/primitives/pardo.go b/sdks/go/test/integration/primitives/pardo.go index df62b615d9c2..2214ee04cb29 100644 --- a/sdks/go/test/integration/primitives/pardo.go +++ b/sdks/go/test/integration/primitives/pardo.go @@ -21,7 +21,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" - "reflect" "time" ) @@ -33,9 +32,9 @@ func init() { register.Function3x2(asymJoinFn) register.Function5x0(splitByName) register.Function2x0(emitPipelineOptions) - beam.RegisterDoFn(reflect.TypeOf((*processElemBundleFinalizer)(nil)).Elem()) - beam.RegisterDoFn(reflect.TypeOf((*finalizerInFinishBundle)(nil)).Elem()) - beam.RegisterDoFn(reflect.TypeOf((*finalizerInAll)(nil)).Elem()) + register.DoFn2x0[beam.BundleFinalization, []byte]((*processElemBundleFinalizer)(nil)) + register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInFinishBundle)(nil)) + register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInAll)(nil)) register.Iter1[int]() register.Iter2[int, int]() From 255f469260f68503c94f5ae6df4cb147bf5a05d0 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 17 Sep 2024 17:35:19 +0000 Subject: [PATCH 03/10] Add TestParDoBundleFinalizer.* to filters --- sdks/go/test/integration/integration.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index aec69036eeb5..68c0735c437d 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -104,6 +104,9 @@ var directFilters = []string{ "TestSetState", "TestSetStateClear", "TestTimers.*", // no timer support for the go direct runner. + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var portableFilters = []string{ @@ -190,6 +193,9 @@ var flinkFilters = []string{ "TestTimers_EventTime_Unbounded", // (failure when comparing on side inputs (NPE on window lookup)) "TestTimers_ProcessingTime.*", // Flink doesn't support processing time timers. + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var samzaFilters = []string{ @@ -231,6 +237,9 @@ var samzaFilters = []string{ // Samza does not support state. "TestTimers.*", + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var sparkFilters = []string{ @@ -265,6 +274,9 @@ var sparkFilters = []string{ "TestTimers_EventTime_Unbounded", // Side inputs in executable stage not supported. "TestTimers_ProcessingTime_Infinity", // Spark doesn't support test stream. + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var dataflowFilters = []string{ From 7475036b093bd01240cec71dd7cb3a21257ffc19 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 17 Sep 2024 20:03:35 +0000 Subject: [PATCH 04/10] Register test funcs --- .../test/integration/primitives/pardo_test.go | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index 56489ef7431b..6d315321e886 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -53,6 +53,11 @@ func TestParDoPipelineOptions(t *testing.T) { var countInvokeBundleFinalizer atomic.Int32 +func init() { + beam.RegisterFunction(incrInvokeBF) + beam.RegisterFunction(retError) +} + func TestParDoBundleFinalizer(t *testing.T) { integration.CheckFilters(t) for _, tt := range []struct { @@ -63,11 +68,8 @@ func TestParDoBundleFinalizer(t *testing.T) { wantErr bool }{ { - name: "InProcessElement", - fn: func() error { - countInvokeBundleFinalizer.Add(1) - return nil - }, + name: "InProcessElement", + fn: incrInvokeBF, pipelineFn: ParDoProcessElementBundleFinalizer, want: 1, }, @@ -80,19 +82,14 @@ func TestParDoBundleFinalizer(t *testing.T) { wantErr: true, }, { - name: "InFinishBundle", - fn: func() error { - countInvokeBundleFinalizer.Add(1) - return nil - }, + name: "InFinishBundle", + fn: incrInvokeBF, pipelineFn: ParDoFinishBundleFinalizer, want: 1, }, { - name: "InFinishBundle_withError", - fn: func() error { - return fmt.Errorf("error") - }, + name: "InFinishBundle_withError", + fn: retError, pipelineFn: ParDoFinishBundleFinalizer, wantErr: true, }, @@ -143,3 +140,12 @@ func TestParDoBundleFinalizerInAll(t *testing.T) { t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, want) } } + +func incrInvokeBF() error { + countInvokeBundleFinalizer.Add(1) + return nil +} + +func retError() error { + return fmt.Errorf("error") +} From 25c16ef4899ea5581b1d04c38bb0c23be1c69053 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 17 Sep 2024 22:18:50 +0000 Subject: [PATCH 05/10] Add filter to portable runner tests --- sdks/go/test/integration/integration.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 68c0735c437d..de782daa2d5d 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -137,6 +137,9 @@ var portableFilters = []string{ // The portable runner does not uniquify timers. (data elements re-fired) "TestTimers.*", + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var prismFilters = []string{ From 96912211654083a7197b2657e31a88ec1c30dc77 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 17 Sep 2024 22:49:05 +0000 Subject: [PATCH 06/10] Temporarily skip test --- sdks/go/test/integration/primitives/pardo_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index 6d315321e886..98d3d21db99b 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -74,10 +74,8 @@ func TestParDoBundleFinalizer(t *testing.T) { want: 1, }, { - name: "InProcessElement_withErr", - fn: func() error { - return fmt.Errorf("error") - }, + name: "InProcessElement_withErr", + fn: retError, pipelineFn: ParDoProcessElementBundleFinalizer, wantErr: true, }, @@ -118,6 +116,7 @@ func TestParDoBundleFinalizer(t *testing.T) { } func TestParDoBundleFinalizerInAll(t *testing.T) { + t.Skip() var want int32 = 7 countInvokeBundleFinalizer.Store(0) startFn := func() { From 46816665128ffe6bbdf6338093b244afb28572a6 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 18 Sep 2024 18:20:16 +0000 Subject: [PATCH 07/10] Simply tests; refactor per PR comments --- .../pkg/beam/runners/prism/internal/stage.go | 2 +- .../runners/prism/internal/worker/bundle.go | 3 - sdks/go/test/integration/primitives/pardo.go | 52 +++++------- .../test/integration/primitives/pardo_test.go | 82 ++----------------- 4 files changed, 32 insertions(+), 107 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 7a9e73752ac2..86744c16dddc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -283,7 +283,7 @@ progress: if s.finalize { _, err := b.Finalize(ctx, wk) if err != nil { - slog.Debug("SDK Error from bundle finalization", "bundle", rb, "error", err.Error()) + slog.Error("SDK Error from bundle finalization", "bundle", rb, "error", err.Error()) panic(err) } slog.Info("finalized bundle", "bundle", rb) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index e09494734199..3ccafdb81e9a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -214,9 +214,6 @@ func (b *B) Finalize(ctx context.Context, wk *W) (*fnpb.FinalizeBundleResponse, }, }, }) - if resp.GetError() != "" { - return nil, fmt.Errorf("finalize[%v] error from SDK: %v", b.InstID, resp.GetError()) - } return resp.GetFinalizeBundle(), nil } diff --git a/sdks/go/test/integration/primitives/pardo.go b/sdks/go/test/integration/primitives/pardo.go index 2214ee04cb29..5b04293c9e08 100644 --- a/sdks/go/test/integration/primitives/pardo.go +++ b/sdks/go/test/integration/primitives/pardo.go @@ -21,6 +21,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "sync/atomic" "time" ) @@ -196,45 +197,47 @@ func emitPipelineOptions(_ []byte, emit func(string)) { emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C"))) } +var CountInvokeBundleFinalizer atomic.Int32 + +const ( + BundleFinalizerStart = 1 + BundleFinalizerProcess = 2 + BundleFinalizerFinish = 4 +) + // ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a // beam.BundleFinalization in its ProcessElement method. -func ParDoProcessElementBundleFinalizer(fn beam.EncodedFunc) *beam.Pipeline { +func ParDoProcessElementBundleFinalizer() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() imp := beam.Impulse(s) - beam.ParDo0(s, &processElemBundleFinalizer{fn}, imp) + beam.ParDo0(s, &processElemBundleFinalizer{}, imp) return p } type processElemBundleFinalizer struct { - Callback beam.EncodedFunc } func (fn *processElemBundleFinalizer) ProcessElement(bf beam.BundleFinalization, _ []byte) { bf.RegisterCallback(time.Second, func() error { - ret := fn.Callback.Fn.Call([]any{})[0] - if ret != nil { - return ret.(error) - } + CountInvokeBundleFinalizer.Add(BundleFinalizerProcess) return nil }) } // ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a noop // beam.BundleFinalization in its ProcessElement method and a beam.BundleFinalization in its FinishBundle method. -func ParDoFinishBundleFinalizer(fn beam.EncodedFunc) *beam.Pipeline { +func ParDoFinishBundleFinalizer() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() imp := beam.Impulse(s) - beam.ParDo0(s, &finalizerInFinishBundle{fn}, imp) + beam.ParDo0(s, &finalizerInFinishBundle{}, imp) return p } -type finalizerInFinishBundle struct { - Callback beam.EncodedFunc -} +type finalizerInFinishBundle struct{} // ProcessElement requires beam.BundleFinalization in its method signature in order for FinishBundle's // beam.BundleFinalization to be invoked. @@ -242,52 +245,41 @@ func (fn *finalizerInFinishBundle) ProcessElement(_ beam.BundleFinalization, _ [ func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) { bf.RegisterCallback(time.Second, func() error { - ret := fn.Callback.Fn.Call([]any{})[0] - if ret != nil { - return ret.(error) - } + CountInvokeBundleFinalizer.Add(BundleFinalizerFinish) return nil }) } // ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a beam.BundleFinalization // in all three lifecycle methods StartBundle, ProcessElement, FinishBundle. -func ParDoFinalizerInAll(start, process, finish beam.EncodedFunc) *beam.Pipeline { +func ParDoFinalizerInAll() *beam.Pipeline { p, s := beam.NewPipelineWithRoot() imp := beam.Impulse(s) - beam.ParDo0(s, &finalizerInAll{ - Start: start, - Process: process, - Finish: finish, - }, imp) + beam.ParDo0(s, &finalizerInAll{}, imp) return p } -type finalizerInAll struct { - Start beam.EncodedFunc - Process beam.EncodedFunc - Finish beam.EncodedFunc -} +type finalizerInAll struct{} func (fn *finalizerInAll) StartBundle(bf beam.BundleFinalization) { bf.RegisterCallback(time.Second, func() error { - fn.Start.Fn.Call([]any{}) + CountInvokeBundleFinalizer.Add(BundleFinalizerStart) return nil }) } func (fn *finalizerInAll) ProcessElement(bf beam.BundleFinalization, _ []byte) { bf.RegisterCallback(time.Second, func() error { - fn.Process.Fn.Call([]any{}) + CountInvokeBundleFinalizer.Add(BundleFinalizerProcess) return nil }) } func (fn *finalizerInAll) FinishBundle(bf beam.BundleFinalization) { bf.RegisterCallback(time.Second, func() error { - fn.Finish.Fn.Call([]any{}) + CountInvokeBundleFinalizer.Add(BundleFinalizerFinish) return nil }) } diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index 98d3d21db99b..5451ccbede9c 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -16,10 +16,7 @@ package primitives import ( - "fmt" "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" - "sync/atomic" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" @@ -51,100 +48,39 @@ func TestParDoPipelineOptions(t *testing.T) { ptest.RunAndValidate(t, ParDoPipelineOptions()) } -var countInvokeBundleFinalizer atomic.Int32 - -func init() { - beam.RegisterFunction(incrInvokeBF) - beam.RegisterFunction(retError) -} - func TestParDoBundleFinalizer(t *testing.T) { integration.CheckFilters(t) for _, tt := range []struct { name string - fn func() error - pipelineFn func(beam.EncodedFunc) *beam.Pipeline + pipelineFn func() *beam.Pipeline want int32 - wantErr bool }{ { name: "InProcessElement", - fn: incrInvokeBF, - pipelineFn: ParDoProcessElementBundleFinalizer, - want: 1, - }, - { - name: "InProcessElement_withErr", - fn: retError, pipelineFn: ParDoProcessElementBundleFinalizer, - wantErr: true, + want: BundleFinalizerProcess, }, { name: "InFinishBundle", - fn: incrInvokeBF, pipelineFn: ParDoFinishBundleFinalizer, - want: 1, + want: BundleFinalizerFinish, }, { - name: "InFinishBundle_withError", - fn: retError, - pipelineFn: ParDoFinishBundleFinalizer, - wantErr: true, + name: "InStartProcessFinishBundle", + pipelineFn: ParDoFinalizerInAll, + want: BundleFinalizerStart + BundleFinalizerProcess + BundleFinalizerFinish, }, } { t.Run(tt.name, func(t *testing.T) { - countInvokeBundleFinalizer.Store(0) - enc := beam.EncodedFunc{ - Fn: reflectx.MakeFunc(tt.fn), - } - p := tt.pipelineFn(enc) + CountInvokeBundleFinalizer.Store(0) + p := tt.pipelineFn() _, err := ptest.RunWithMetrics(p) - if err == nil && tt.wantErr { - t.Errorf("error nil from pipeline Job, wantErr: %v", tt.wantErr) - } - if tt.wantErr { - return - } if err != nil { t.Fatalf("Failed to execute job: %v", err) } - if got := countInvokeBundleFinalizer.Load(); got != tt.want { + if got := CountInvokeBundleFinalizer.Load(); got != tt.want { t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, tt.want) } }) } } - -func TestParDoBundleFinalizerInAll(t *testing.T) { - t.Skip() - var want int32 = 7 - countInvokeBundleFinalizer.Store(0) - startFn := func() { - countInvokeBundleFinalizer.Add(1) - } - startEnc := beam.EncodedFunc{Fn: reflectx.MakeFunc(startFn)} - processFn := func() { - countInvokeBundleFinalizer.Add(2) - } - processEnc := beam.EncodedFunc{Fn: reflectx.MakeFunc(processFn)} - finishFn := func() { - countInvokeBundleFinalizer.Add(4) - } - finishEnc := beam.EncodedFunc{Fn: reflectx.MakeFunc(finishFn)} - - p := ParDoFinalizerInAll(startEnc, processEnc, finishEnc) - ptest.RunAndValidate(t, p) - - if got := countInvokeBundleFinalizer.Load(); got != want { - t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, want) - } -} - -func incrInvokeBF() error { - countInvokeBundleFinalizer.Add(1) - return nil -} - -func retError() error { - return fmt.Errorf("error") -} From 5ee1c710f0f7016f9b73f59692ce7d431fe9a43d Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Thu, 19 Sep 2024 18:11:36 +0000 Subject: [PATCH 08/10] Skip tests for not lookback mode --- sdks/go/test/integration/primitives/pardo_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index 5451ccbede9c..7f803c11380f 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -17,6 +17,7 @@ package primitives import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" @@ -50,6 +51,9 @@ func TestParDoPipelineOptions(t *testing.T) { func TestParDoBundleFinalizer(t *testing.T) { integration.CheckFilters(t) + if !jobopts.IsLoopback() { + t.Skip("Only Loopback mode is supported") + } for _, tt := range []struct { name string pipelineFn func() *beam.Pipeline From 2d7ddc249200c3cb4279671017683ad159703e54 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Fri, 20 Sep 2024 22:27:49 +0000 Subject: [PATCH 09/10] Clean up tests; add to coverage --- .../prism/internal/unimplemented_test.go | 1 + sdks/go/test/integration/primitives/pardo.go | 23 +++++-------------- .../test/integration/primitives/pardo_test.go | 5 ++-- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 6afb04521af0..f8917c72ccde 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -83,6 +83,7 @@ func TestImplemented(t *testing.T) { {pipeline: primitives.Checkpoints}, {pipeline: primitives.CoGBK}, {pipeline: primitives.ReshuffleKV}, + {pipeline: primitives.ParDoProcessElementBundleFinalizer}, // The following have been "allowed" to unblock further development // But it's not clear these tests truly validate the expected behavior diff --git a/sdks/go/test/integration/primitives/pardo.go b/sdks/go/test/integration/primitives/pardo.go index 5b04293c9e08..dc59d8f67b80 100644 --- a/sdks/go/test/integration/primitives/pardo.go +++ b/sdks/go/test/integration/primitives/pardo.go @@ -18,11 +18,12 @@ package primitives import ( "flag" "fmt" + "sync/atomic" + "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" - "sync/atomic" - "time" ) func init() { @@ -207,13 +208,9 @@ const ( // ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a // beam.BundleFinalization in its ProcessElement method. -func ParDoProcessElementBundleFinalizer() *beam.Pipeline { - p, s := beam.NewPipelineWithRoot() - +func ParDoProcessElementBundleFinalizer(s beam.Scope) { imp := beam.Impulse(s) beam.ParDo0(s, &processElemBundleFinalizer{}, imp) - - return p } type processElemBundleFinalizer struct { @@ -228,13 +225,9 @@ func (fn *processElemBundleFinalizer) ProcessElement(bf beam.BundleFinalization, // ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a noop // beam.BundleFinalization in its ProcessElement method and a beam.BundleFinalization in its FinishBundle method. -func ParDoFinishBundleFinalizer() *beam.Pipeline { - p, s := beam.NewPipelineWithRoot() - +func ParDoFinishBundleFinalizer(s beam.Scope) { imp := beam.Impulse(s) beam.ParDo0(s, &finalizerInFinishBundle{}, imp) - - return p } type finalizerInFinishBundle struct{} @@ -252,13 +245,9 @@ func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) { // ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a beam.BundleFinalization // in all three lifecycle methods StartBundle, ProcessElement, FinishBundle. -func ParDoFinalizerInAll() *beam.Pipeline { - p, s := beam.NewPipelineWithRoot() - +func ParDoFinalizerInAll(s beam.Scope) { imp := beam.Impulse(s) beam.ParDo0(s, &finalizerInAll{}, imp) - - return p } type finalizerInAll struct{} diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index 7f803c11380f..18ca58968118 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -56,7 +56,7 @@ func TestParDoBundleFinalizer(t *testing.T) { } for _, tt := range []struct { name string - pipelineFn func() *beam.Pipeline + pipelineFn func(s beam.Scope) want int32 }{ { @@ -77,7 +77,8 @@ func TestParDoBundleFinalizer(t *testing.T) { } { t.Run(tt.name, func(t *testing.T) { CountInvokeBundleFinalizer.Store(0) - p := tt.pipelineFn() + p, s := beam.NewPipelineWithRoot() + tt.pipelineFn(s) _, err := ptest.RunWithMetrics(p) if err != nil { t.Fatalf("Failed to execute job: %v", err) From 2690975d9ae7d85c78f18e099e9b022c409c266a Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Fri, 20 Sep 2024 22:51:20 +0000 Subject: [PATCH 10/10] Fix import ordering --- sdks/go/test/integration/primitives/pardo_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index 18ca58968118..aa6cb3de2008 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -16,10 +16,10 @@ package primitives import ( - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" "github.com/apache/beam/sdks/v2/go/test/integration" )