Skip to content

Commit

Permalink
[#33513][prism]Handle Time sorted requirement and drop late data. (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Jan 7, 2025
1 parent 6618968 commit 6509e51
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
16 changes: 9 additions & 7 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
Expand Down Expand Up @@ -160,6 +161,14 @@ def sickbayTests = [
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',


// These tests fail once Late Data was being precisely dropped.
// They set a single element to be late data, and expect it (correctly) to be preserved.
// Since presently, these are treated as No-ops, the fix is to disable the
// dropping behavior when a stage's input is a Reshuffle/Redistribute transform.
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming',

// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
Expand All @@ -177,13 +186,6 @@ def sickbayTests = [
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',

// Requires Time Sorted Input
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',

// Missing output due to processing time timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',

Expand Down
12 changes: 12 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,18 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st
func (ss *stageState) AddPending(newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Data that arrives after the *output* watermark is late.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if e.window.MaxTimestamp() < threshold {
continue
}
origPending = append(origPending, e)
}
newPending = origPending
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
Expand Down
7 changes: 5 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb

// At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal.

// StatefulDoFns need to be marked as being roots.
// ForceRoots cause fusion breaks in the optimized graph.
// StatefulDoFns need to be marked as being roots, for correct per-key state handling.
// Prism already sorts input elements for a stage by EventTime, so a fusion break enables the sorted behavior.
var forcedRoots []string
if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 {
if len(pdo.GetStateSpecs())+len(pdo.GetTimerFamilySpecs()) > 0 ||
pdo.GetRequiresTimeSortedInput() {
forcedRoots = append(forcedRoots, tid)
}

Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var supportedRequirements = map[string]struct{}{
urns.RequirementStatefulProcessing: {},
urns.RequirementBundleFinalization: {},
urns.RequirementOnWindowExpiration: {},
urns.RequirementTimeSortedInput: {},
}

// TODO, move back to main package, and key off of executor handlers?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3764,7 +3764,9 @@ public void testRequiresTimeSortedInputWithLateData() {
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
input =
input.advanceWatermarkTo(
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
}
}
testTimeSortedInput(
Expand Down Expand Up @@ -3796,7 +3798,9 @@ public void testTwoRequiresTimeSortedInputWithLateData() {
if (stamp == 100) {
// advance watermark when we have 100 remaining elements
// all the rest are going to be late elements
input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp));
input =
input.advanceWatermarkTo(
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1)));
}
}
// apply the sorted function for the first time
Expand Down

0 comments on commit 6509e51

Please sign in to comment.