Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Prism] Support BundleFinalization DoFn parameter #32425

Merged
merged 11 commits into from
Sep 21, 2024
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
m.requirements[URNRequiresSplittableDoFn] = true
}
if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
payload.RequestsFinalization = true
m.requirements[URNRequiresBundleFinalization] = true
}
if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); ok {
Expand Down
12 changes: 11 additions & 1 deletion sdks/go/pkg/beam/core/typex/special.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,18 @@ type Window interface {
Equals(o Window) bool
}

// BundleFinalization allows registering callbacks to be performed after the runner durably persists bundle results.
// BundleFinalization allows registering callbacks for the runner to invoke after the bundle completes and the runner
// commits the output. Parameter is accessible during DoFn StartBundle, ProcessElement, FinishBundle.
// However, if your DoFn implementation requires BundleFinalization in StartBundle or FinishBundle, it is needed in the
// ProcessElement signature, even if not invoked,
// Common use cases for BundleFinalization would be to perform work after elements in a bundle have been processed.
// See beam.ParDo for documentation on these DoFn lifecycle methods.
type BundleFinalization interface {

// RegisterCallback registers the runner to invoke func() after the runner persists the bundle of processed elements.
// The time.Duration configures the callback expiration, after which the runner will not invoke func().
// Returning error communicates to the runner that bundle finalization failed and the runner may choose to attempt
// finalization again.
RegisterCallback(time.Duration, func() error)
}

Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ type Window = typex.Window

// BundleFinalization represents the parameter used to register callbacks to
// be run once the runner has durably persisted output for a bundle.
// See typex.BundleFinalization for more details.
type BundleFinalization = typex.BundleFinalization

// These are the reflect.Type instances of the universal types, which are used
Expand Down
6 changes: 1 addition & 5 deletions sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb
}

// Lets check for and remove anything that makes things less simple.
if pdo.OnWindowExpirationTimerFamilySpec == "" &&
!pdo.RequestsFinalization &&
!pdo.RequiresStableInput &&
!pdo.RequiresTimeSortedInput &&
pdo.RestrictionCoderId == "" {
if pdo.RestrictionCoderId == "" {
// Which inputs are Side inputs don't change the graph further,
// so they're not included here. Any nearly any ParDo can have them.

Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
var supportedRequirements = map[string]struct{}{
urns.RequirementSplittableDoFn: {},
urns.RequirementStatefulProcessing: {},
urns.RequirementBundleFinalization: {},
}

// TODO, move back to main package, and key off of executor handlers?
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa
if err := (proto.UnmarshalOptions{}).Unmarshal(t.GetSpec().GetPayload(), pardo); err != nil {
return fmt.Errorf("unable to decode ParDoPayload for %v", link.Transform)
}
stg.finalize = pardo.RequestsFinalization
if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 {
stg.stateful = true
}
Expand Down
12 changes: 11 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"runtime/debug"
"sync/atomic"
"time"

Expand Down Expand Up @@ -63,6 +64,7 @@ type stage struct {
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
finalize bool
stateful bool
// hasTimers indicates the transform+timerfamily pairs that need to be waited on for
// the stage to be considered complete.
Expand Down Expand Up @@ -106,7 +108,7 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c
defer func() {
// Convert execution panics to errors to fail the bundle.
if e := recover(); e != nil {
err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v", e, s)
err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v,stackTrace:\n%s", e, s, debug.Stack())
}
}()
slog.Debug("Execute: starting bundle", "bundle", rb)
Expand Down Expand Up @@ -322,6 +324,14 @@ progress:
slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
}
em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals)
if s.finalize {
_, err := b.Finalize(ctx, wk)
if err != nil {
slog.Error("SDK Error from bundle finalization", "bundle", rb, "error", err.Error())
panic(err)
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
}
slog.Info("finalized bundle", "bundle", rb)
}
b.OutputData = engine.TentativeData{} // Clear the data.
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestImplemented(t *testing.T) {
{pipeline: primitives.Checkpoints},
{pipeline: primitives.CoGBK},
{pipeline: primitives.ReshuffleKV},
{pipeline: primitives.ParDoProcessElementBundleFinalizer},

// The following have been "allowed" to unblock further development
// But it's not clear these tests truly validate the expected behavior
Expand Down
11 changes: 11 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}

func (b *B) Finalize(ctx context.Context, wk *W) (*fnpb.FinalizeBundleResponse, error) {
resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_FinalizeBundle{
FinalizeBundle: &fnpb.FinalizeBundleRequest{
InstructionId: b.InstID,
},
},
})
return resp.GetFinalizeBundle(), nil
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
}

// Progress sends a progress request for the given bundle to the passed in worker, blocking on the response.
func (b *B) Progress(ctx context.Context, wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ var directFilters = []string{
"TestSetState",
"TestSetStateClear",
"TestTimers.*", // no timer support for the go direct runner.

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var portableFilters = []string{
Expand Down Expand Up @@ -134,6 +137,9 @@ var portableFilters = []string{

// The portable runner does not uniquify timers. (data elements re-fired)
"TestTimers.*",

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var prismFilters = []string{
Expand Down Expand Up @@ -190,6 +196,9 @@ var flinkFilters = []string{

"TestTimers_EventTime_Unbounded", // (failure when comparing on side inputs (NPE on window lookup))
"TestTimers_ProcessingTime.*", // Flink doesn't support processing time timers.

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var samzaFilters = []string{
Expand Down Expand Up @@ -231,6 +240,9 @@ var samzaFilters = []string{

// Samza does not support state.
"TestTimers.*",

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var sparkFilters = []string{
Expand Down Expand Up @@ -265,6 +277,9 @@ var sparkFilters = []string{

"TestTimers_EventTime_Unbounded", // Side inputs in executable stage not supported.
"TestTimers_ProcessingTime_Infinity", // Spark doesn't support test stream.

// no support for BundleFinalizer
"TestParDoBundleFinalizer.*",
}

var dataflowFilters = []string{
Expand Down
80 changes: 80 additions & 0 deletions sdks/go/test/integration/primitives/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package primitives
import (
"flag"
"fmt"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
Expand All @@ -32,6 +34,9 @@ func init() {
register.Function3x2(asymJoinFn)
register.Function5x0(splitByName)
register.Function2x0(emitPipelineOptions)
register.DoFn2x0[beam.BundleFinalization, []byte]((*processElemBundleFinalizer)(nil))
register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInFinishBundle)(nil))
register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInAll)(nil))

register.Iter1[int]()
register.Iter2[int, int]()
Expand Down Expand Up @@ -192,3 +197,78 @@ func emitPipelineOptions(_ []byte, emit func(string)) {
emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
}

var CountInvokeBundleFinalizer atomic.Int32

const (
BundleFinalizerStart = 1
BundleFinalizerProcess = 2
BundleFinalizerFinish = 4
)

// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a
// beam.BundleFinalization in its ProcessElement method.
func ParDoProcessElementBundleFinalizer(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &processElemBundleFinalizer{}, imp)
}

type processElemBundleFinalizer struct {
}

func (fn *processElemBundleFinalizer) ProcessElement(bf beam.BundleFinalization, _ []byte) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
return nil
})
}

// ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a noop
// beam.BundleFinalization in its ProcessElement method and a beam.BundleFinalization in its FinishBundle method.
func ParDoFinishBundleFinalizer(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &finalizerInFinishBundle{}, imp)
}

type finalizerInFinishBundle struct{}

// ProcessElement requires beam.BundleFinalization in its method signature in order for FinishBundle's
// beam.BundleFinalization to be invoked.
func (fn *finalizerInFinishBundle) ProcessElement(_ beam.BundleFinalization, _ []byte) {}

func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
return nil
})
}

// ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a beam.BundleFinalization
// in all three lifecycle methods StartBundle, ProcessElement, FinishBundle.
func ParDoFinalizerInAll(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &finalizerInAll{}, imp)
}

type finalizerInAll struct{}

func (fn *finalizerInAll) StartBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerStart)
return nil
})
}

func (fn *finalizerInAll) ProcessElement(bf beam.BundleFinalization, _ []byte) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerProcess)
return nil
})
}

func (fn *finalizerInAll) FinishBundle(bf beam.BundleFinalization) {
bf.RegisterCallback(time.Second, func() error {
CountInvokeBundleFinalizer.Add(BundleFinalizerFinish)
return nil
})
}
43 changes: 43 additions & 0 deletions sdks/go/test/integration/primitives/pardo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package primitives
import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
)
Expand Down Expand Up @@ -46,3 +48,44 @@ func TestParDoPipelineOptions(t *testing.T) {
integration.CheckFilters(t)
ptest.RunAndValidate(t, ParDoPipelineOptions())
}

func TestParDoBundleFinalizer(t *testing.T) {
integration.CheckFilters(t)
if !jobopts.IsLoopback() {
t.Skip("Only Loopback mode is supported")
}
for _, tt := range []struct {
name string
pipelineFn func(s beam.Scope)
want int32
}{
{
name: "InProcessElement",
pipelineFn: ParDoProcessElementBundleFinalizer,
want: BundleFinalizerProcess,
},
{
name: "InFinishBundle",
pipelineFn: ParDoFinishBundleFinalizer,
want: BundleFinalizerFinish,
},
{
name: "InStartProcessFinishBundle",
pipelineFn: ParDoFinalizerInAll,
want: BundleFinalizerStart + BundleFinalizerProcess + BundleFinalizerFinish,
},
} {
t.Run(tt.name, func(t *testing.T) {
CountInvokeBundleFinalizer.Store(0)
p, s := beam.NewPipelineWithRoot()
tt.pipelineFn(s)
_, err := ptest.RunWithMetrics(p)
if err != nil {
t.Fatalf("Failed to execute job: %v", err)
}
if got := CountInvokeBundleFinalizer.Load(); got != tt.want {
t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, tt.want)
}
})
}
}
Loading