Skip to content

Commit

Permalink
Merge pull request #26289 Use iterable side inputs rather than list s…
Browse files Browse the repository at this point in the history
…ide inputs in file writes.
  • Loading branch information
robertwb authored Apr 17, 2023
2 parents 9c00dca + 33293fe commit 80f1f6c
Showing 1 changed file with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
Expand Down Expand Up @@ -462,7 +463,24 @@ public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
// iterable to finalize if there are no results.
return input
.getPipeline()
.apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
.apply(
"AsPossiblyEmptyList",
Reify.viewInGlobalWindow(
// Insert a reshuffle before taking the view to consolidate the (typically)
// one-output-per-bundle writes.
// This avoids producing a huge number of tiny files in the case that side
// inputs are materialized to disk bundle-by-bundle.
input.apply("Consolidate", Reshuffle.viaRandomKey()).apply(View.asIterable()),
IterableCoder.of(resultCoder)))
// View.asIterable() can be (significantly) cheaper than View.asList(), as it does not
// create a backing indexable view, but we must return a list to maintain update
// compatibility for consumers that are shared between this path and the streaming one.
.apply(
"IterableToList",
MapElements.via(
new SimpleFunction<Iterable<ResultT>, List<ResultT>>(
x -> ImmutableList.copyOf(x)) {}))
.setCoder(ListCoder.of(resultCoder));
}
}
}
Expand Down

0 comments on commit 80f1f6c

Please sign in to comment.