Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Python AfterProcessingTime behaves different than Java #23071

Closed
InigoSJ opened this issue Sep 7, 2022 · 8 comments · Fixed by #23100
Closed

[Bug]: Python AfterProcessingTime behaves different than Java #23071

InigoSJ opened this issue Sep 7, 2022 · 8 comments · Fixed by #23100
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python

Comments

@InigoSJ
Copy link
Contributor

InigoSJ commented Sep 7, 2022

What happened?

The Python trigger AfterProcessingTime behaves different than Java's AfterProcessingTime.pastFirstElementInPane().plusDelayOf.

While Java behaves as "wait X time since the first element to trigger", Python behaves similar to a Session Window, where the wait is since the previous element instead of the first element in pane:

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py#L387

  def on_element(self, element, window, context):
    context.set_timer(
        '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)

You can see this in this example:

Python

def get_input_stream():
  stream = (
      TestStream().add_elements([
          TimestampedValue("1", timestamp=0)
      ])
      .advance_watermark_to(1.5)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("2", timestamp=1)
      ])
      .advance_watermark_to(2.5)
      .advance_processing_time(1) # Running Processing time 2.5 (it should trigger now)
      .add_elements([
          TimestampedValue("3", timestamp=3)
      ])
      .advance_watermark_to(3)
      .advance_processing_time(0.5) # Running Processing time 3
      .add_elements([
          TimestampedValue("4", timestamp=4)
      ])
      .advance_watermark_to(5)
      .advance_processing_time(2) # Running Processing time 5, it triggers now since it's 2s since last element
      .add_elements([
          TimestampedValue("5", timestamp=4)
      ])
      .advance_watermark_to(6)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("6", timestamp=8)
      ])
      .advance_watermark_to(9)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("7", timestamp=7),
      ])
      .advance_watermark_to(10)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("8", timestamp=8),
      ])
      .advance_watermark_to(11)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("9", timestamp=7),
      ])
      .advance_watermark_to(12)
      .advance_processing_time(1.5) # Running Processing time 11.5, not triggering since 5
      .add_elements([
          TimestampedValue("10", timestamp=9),
      ])
      .advance_processing_time(2)
      .advance_watermark_to_infinity()
  )
  return stream

options = PipelineOptions(streaming=True)

p = TestPipeline(options=options)

window_size_seconds = 10
window_allowed_lateness_seconds = 5
count_pass = 3
delay = 2

stream = get_input_stream()

(p | stream
   | WindowInto(
      FixedWindows(size=window_size_seconds),
      allowed_lateness=window_allowed_lateness_seconds,
      accumulation_mode=trigger.AccumulationMode.DISCARDING,
      trigger=trigger.Repeatedly(trigger.AfterProcessingTime(delay))
  )
 | Map(lambda e: ("key", e))
 | GroupByKey()
 | Map(print)
)

p.run()

Java

Pipeline p = Pipeline.create(options);

        Integer windowLength = 10;
        Integer allowLateSize = 5;
        Integer delay = 2;

        TestStream<String> streamEvents = TestStream.create(StringUtf8Coder.of())
                .addElements(
                        TimestampedValue.of("1", new Instant(0))
                )
                .advanceWatermarkTo(new Instant(1500))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("2", new Instant(1000))
                )
                .advanceWatermarkTo(new Instant(2500))
                .advanceProcessingTime(Duration.millis(1000))  // Running Processing time 2.5, it triggers here
                .addElements(
                        TimestampedValue.of("3", new Instant(3000))
                )
                .advanceWatermarkTo(new Instant(3000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("4", new Instant(4000))
                )
                .advanceWatermarkTo(new Instant(5000))
                .advanceProcessingTime(Duration.standardSeconds(2))
                .addElements(
                        TimestampedValue.of("5", new Instant(4000))
                )
                .advanceWatermarkTo(new Instant(6000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("6", new Instant(8000))
                        )
                .advanceWatermarkTo(new Instant(9000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("7", new Instant(7000))
                )
                .advanceWatermarkTo(new Instant(10000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("8", new Instant(8000))

                )
                .advanceWatermarkTo(new Instant(11000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("9", new Instant(7000))

                )
                .advanceWatermarkTo(new Instant(12000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("10", new Instant(9000))
                )
                .advanceWatermarkToInfinity();

        p.apply(streamEvents)
                .apply("KVs", ParDo.of(new DoFn<String, KV<String, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(KV.of("key", c.element()));
                    }
                }))
                .apply(Window.<KV<String, String>>into(
                        FixedWindows.of(Duration.standardSeconds(windowLength)))
                            .withAllowedLateness(Duration.standardSeconds(allowLateSize))
                            .triggering(
                                Repeatedly.forever(
                                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(delay)))
                                )
                        .discardingFiredPanes()
                )
                .apply(GroupByKey.create())
                .apply("Log", ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        LOG.info("\n TRIGGER " + c.element().getValue().toString());
                        c.output(c.pane().toString());
                    }
                }));


        p.run();

The output in Python is two panes ['1', '2', '3', '4'], ['5', '6', '7', '8', '9', '10'] and Java is the "right" output ['1', '2'], ['3', '4'], ['5', '6'], ['7', '8'], ['9', '10'].


The fix doesn't seem hard (worse thing to say ever), but given that users may be using this trigger already, I am not sure how to proceed.

Issue Priority

Priority: 2

Issue Component

Component: sdk-py-core

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Sep 7, 2022

I have made a fix for this, changing it to how Java works, will submit the PR tomorrow

@TheNeuralBit
Copy link
Member

Is there a possibility this will be a breaking change for people that rely on the old behavior?

@pabloem
Copy link
Member

pabloem commented Sep 13, 2022

hm there is. However this code change only affects DirectRunner pipelines. For other runners, the trigger code is executed by the runners....

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Sep 13, 2022

Just for clarification, I tested the trigger in Dataflow (both Legacy and UW) and it works as in Java (as it should).

I am not aware how this works in other runners, but I think users would expect it to work as in Java and how it's explained in the documentation.

https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.trigger.html?#apache_beam.transforms.trigger.AfterProcessingTime

Do you have any suggestion on how to approach this @TheNeuralBit ? Maybe adding notes in the release?

Let me know if you want to discuss this offline

@TheNeuralBit
Copy link
Member

TheNeuralBit commented Sep 13, 2022

Just for clarification, I tested the trigger in Dataflow (both Legacy and UW) and it works as in Java (as it should).

Do you mean it worked as in Java on Dataflow, even before your change?

I am not aware how this works in other runners, but I think users would expect it to work as in Java and how it's explained in the documentation.

Yes but users sometimes come to rely on buggy behavior :) (obligatory xkcd: https://xkcd.com/1172/)

Do you have any suggestion on how to approach this @TheNeuralBit ? Maybe adding notes in the release?

I don't think I have enough context here to advise on a specific action. If the change really is just affecting the DirectRunner (non-production) I think it makes sense to just mention this in the release notes. If there's a possibility it will affect production pipelines on Dataflow or other runners we might consider a deprecation cycle (add a warning that behavior will change in a future release, possibly with a new trigger option that preserves existing behavior).

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Sep 13, 2022

Do you mean it worked as in Java on Dataflow, even before your change?

Yes, it works as Java on Dataflow (both Legacy and UW)

I don't think I have enough context here to advise on a specific action. If the change really is just affecting the DirectRunner (non-production) I think it makes sense to just mention this in the release notes. If there's a possibility it will affect production pipelines on Dataflow or other runners we might consider a deprecation cycle (add a warning that behavior will change in a future release, possibly with a new trigger option that preserves existing behavior).

I don't have the capacity to test on other runners besides DF and DirectRunner, so not sure how to proceed.

To be fair, the documentation marks the trigger as experimental. Considering the description, how Java works and what most user would actually want (*) and this being experimental, I think adding this change will help more than can hinder users. I'd say it's worse if a customer relies on it working as described but it doesn't, than a customer relying on something buggy.

(*) As of now (with no fix), the trigger just works as a session window, which doesn't really add value to the poll of triggers, while having something as "wait X time and then trigger" does.

Of course, this is just my personal opinion and I'm nobody to decide what to do. Let me know what you want to do and i'd do my best :D

@TheNeuralBit
Copy link
Member

Got it, maybe let's just record it as a bugfix in CHANGES.md

@InigoSJ
Copy link
Contributor Author

InigoSJ commented Sep 19, 2022

Created this PR

#23297

@tvalentyn tvalentyn added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Sep 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants