diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 6b45044d2138..530ead673a7b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -86,6 +86,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap; @@ -462,7 +463,24 @@ public PCollection> expand(PCollection input) { // iterable to finalize if there are no results. return input .getPipeline() - .apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder))); + .apply( + "AsPossiblyEmptyList", + Reify.viewInGlobalWindow( + // Insert a reshuffle before taking the view to consolidate the (typically) + // one-output-per-bundle writes. + // This avoids producing a huge number of tiny files in the case that side + // inputs are materialized to disk bundle-by-bundle. + input.apply("Consolidate", Reshuffle.viaRandomKey()).apply(View.asIterable()), + IterableCoder.of(resultCoder))) + // View.asIterable() can be (significantly) cheaper than View.asList(), as it does not + // create a backing indexable view, but we must return a list to maintain update + // compatibility for consumers that are shared between this path and the streaming one. + .apply( + "IterableToList", + MapElements.via( + new SimpleFunction, List>( + x -> ImmutableList.copyOf(x)) {})) + .setCoder(ListCoder.of(resultCoder)); } } }