Skip to content

Commit

Permalink
[apache#32221] [prism] Terminate streams for each timerfamily+transfo…
Browse files Browse the repository at this point in the history
…rm pair. (apache#32223)

* [apache#32221] Mark is-last for each timer stream correctly.

* Remove test override for clear, since it now passes unmodified.

* delint

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
2 people authored and reeba212 committed Dec 4, 2024
1 parent be1f371 commit 0e8ed92
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 44 deletions.
22 changes: 12 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ type link struct {
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
stateful bool
hasTimers []string
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
stateful bool
// hasTimers indicates the transform+timerfamily pairs that need to be waited on for
// the stage to be considered complete.
hasTimers []struct{ Transform, TimerFamily string }
processingTimeTimers map[string]bool

exe transformExecuter
Expand Down Expand Up @@ -396,7 +398,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
}
}
for timerID, v := range pardo.GetTimerFamilySpecs() {
stg.hasTimers = append(stg.hasTimers, tid)
stg.hasTimers = append(stg.hasTimers, struct{ Transform, TimerFamily string }{Transform: tid, TimerFamily: timerID})
if v.TimeDomain == pipepb.TimeDomain_PROCESSING_TIME {
if stg.processingTimeTimers == nil {
stg.processingTimeTimers = map[string]bool{}
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type B struct {
InputTransformID string
Input []*engine.Block // Data and Timers for this bundle.
EstimatedInputElements int
HasTimers []string
HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate.

// IterableSideInputData is a map from transformID + inputID, to window, to data.
IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
Expand Down Expand Up @@ -175,7 +175,8 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
for _, tid := range b.HasTimers {
timers = append(timers, &fnpb.Elements_Timers{
InstructionId: b.InstID,
TransformId: tid,
TransformId: tid.Transform,
TimerFamilyId: tid.TimerFamily,
IsLast: true,
})
}
Expand Down
32 changes: 0 additions & 32 deletions sdks/python/apache_beam/runners/portability/prism_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from apache_beam.runners.portability import portable_runner_test
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import userstate
from apache_beam.transforms import window
from apache_beam.utils import timestamp

Expand Down Expand Up @@ -195,37 +194,6 @@ def test_windowing(self):
assert_that(
res, equal_to([('k', [1, 2]), ('k', [100, 101, 102]), ('k', [123])]))

# The fn_runner_test.py version of this test doesn't execute the process
# method for some reason. Overridden here to validate that the cleared
# timer won't re-fire.
def test_pardo_timers_clear(self):
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)

class TimerDoFn(beam.DoFn):
def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
unused_key, ts = element
timer.set(ts)
timer.set(2 * ts)

@userstate.on_timer(timer_spec)
def process_timer(
self,
ts=beam.DoFn.TimestampParam,
timer=beam.DoFn.TimerParam(timer_spec)):
timer.set(timestamp.Timestamp(micros=2 * ts.micros))
timer.clear() # Shouldn't fire again
yield 'fired'

with self.create_pipeline() as p:
actual = (
p
| beam.Create([('k1', 10), ('k2', 100)])
| beam.ParDo(TimerDoFn())
| beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))

expected = [('fired', ts) for ts in (20, 200)]
assert_that(actual, equal_to(expected))

# Can't read host files from within docker, read a "local" file there.
def test_read(self):
print('name:', __name__)
Expand Down

0 comments on commit 0e8ed92

Please sign in to comment.