Skip to content

Commit

Permalink
Use WindowedValue.withValue on hot paths apache#21250
Browse files Browse the repository at this point in the history
This removed about half of the overhead for outputting a value in the common scenario where we are already using a valid timestamp (the input timestamp) and also that we can use the `withValue` hot path which is optimized for certain use cases (e.g. the globally windowed value case).

Before:
```
Benchmark                                Mode  Cnt     Score     Error  Units
ProcessBundleBenchmark.testLargeBundle  thrpt   15  3616.761 ± 157.844  ops/s
```

After:
```
Benchmark                                Mode  Cnt     Score     Error  Units
ProcessBundleBenchmark.testLargeBundle  thrpt   15  3666.889 ± 151.448  ops/s
```

This is for apache#21250.
  • Loading branch information
lukecwik committed Feb 16, 2023
1 parent 3e080ff commit d8f63c9
Showing 1 changed file with 49 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,29 @@ public TimerMap timerFamily(String timerFamilyId) {
currentElement.getPane());
}

@Override
public void output(OutputT output) {
// Don't need to check timestamp since we can always output using the input timestamp.
outputTo(
mainOutputConsumer,
WindowedValue.of(
output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
FnDataReceiver<WindowedValue<T>> consumer =
(FnDataReceiver) localNameToConsumer.get(tag.getId());
if (consumer == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
// Don't need to check timestamp since we can always output using the input timestamp.
outputTo(
consumer,
WindowedValue.of(
output, currentElement.getTimestamp(), currentWindow, currentElement.getPane()));
}

@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
// TODO: Check that timestamp is valid once all runners can provide proper timestamps.
Expand Down Expand Up @@ -2350,6 +2373,24 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {

/** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */
private class NonWindowObservingProcessBundleContext extends ProcessBundleContextBase {

@Override
public void output(OutputT output) {
// Don't need to check timestamp since we can always output using the input timestamp.
outputTo(mainOutputConsumer, currentElement.withValue(output));
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
FnDataReceiver<WindowedValue<T>> consumer =
(FnDataReceiver) localNameToConsumer.get(tag.getId());
if (consumer == null) {
throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
}
// Don't need to check timestamp since we can always output using the input timestamp.
outputTo(consumer, currentElement.withValue(output));
}

@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(timestamp);
Expand Down Expand Up @@ -2489,8 +2530,7 @@ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {

@Override
public void output(Row output) {
ProcessBundleContextBase.this.outputWithTimestamp(
fromRowFunction.apply(output), currentElement.getTimestamp());
ProcessBundleContextBase.this.output(fromRowFunction.apply(output));
}

@Override
Expand Down Expand Up @@ -2523,8 +2563,7 @@ private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) {
return new OutputReceiver<T>() {
@Override
public void output(T output) {
ProcessBundleContextBase.this.outputWithTimestamp(
tag, output, currentElement.getTimestamp());
ProcessBundleContextBase.this.output(tag, output);
}

@Override
Expand Down Expand Up @@ -2555,8 +2594,7 @@ private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {

@Override
public void output(Row output) {
ProcessBundleContextBase.this.outputWithTimestamp(
tag, fromRowFunction.apply(output), currentElement.getTimestamp());
ProcessBundleContextBase.this.output(tag, fromRowFunction.apply(output));
}

@Override
Expand Down Expand Up @@ -2615,16 +2653,6 @@ public PipelineOptions pipelineOptions() {
return pipelineOptions;
}

@Override
public void output(OutputT output) {
outputWithTimestamp(output, currentElement.getTimestamp());
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
outputWithTimestamp(tag, output, currentElement.getTimestamp());
}

@Override
public InputT element() {
return currentElement.getValue();
Expand Down Expand Up @@ -2777,8 +2805,7 @@ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {

@Override
public void output(Row output) {
context.outputWithTimestamp(
fromRowFunction.apply(output), currentElement.getTimestamp());
context.output(fromRowFunction.apply(output));
}

@Override
Expand Down Expand Up @@ -2810,7 +2837,7 @@ private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) {
return new OutputReceiver<T>() {
@Override
public void output(T output) {
context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
context.output(tag, output);
}

@Override
Expand Down Expand Up @@ -2841,8 +2868,7 @@ private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {

@Override
public void output(Row output) {
context.outputWithTimestamp(
tag, fromRowFunction.apply(output), currentElement.getTimestamp());
context.output(tag, fromRowFunction.apply(output));
}

@Override
Expand Down Expand Up @@ -3071,7 +3097,7 @@ private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) {
return new OutputReceiver<T>() {
@Override
public void output(T output) {
context.outputWithTimestamp(tag, output, currentElement.getTimestamp());
context.output(tag, output);
}

@Override
Expand Down Expand Up @@ -3102,8 +3128,7 @@ private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {

@Override
public void output(Row output) {
context.outputWithTimestamp(
tag, fromRowFunction.apply(output), currentElement.getTimestamp());
context.output(tag, fromRowFunction.apply(output));
}

@Override
Expand Down

0 comments on commit d8f63c9

Please sign in to comment.