Skip to content

Commit

Permalink
Implement file write fix in update-compatible way.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Apr 14, 2023
1 parent 487e43e commit 33293fe
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
Expand Down Expand Up @@ -386,7 +388,7 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
(getComputeNumShards() == null) ? null : input.apply(getComputeNumShards());

boolean fixedSharding = getComputeNumShards() != null || getNumShardsProvider() != null;
PCollection<Iterable<FileResult<DestinationT>>> tempFileResults;
PCollection<List<FileResult<DestinationT>>> tempFileResults;
if (fixedSharding) {
tempFileResults =
input
Expand Down Expand Up @@ -436,15 +438,15 @@ private DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations
}

private class GatherResults<ResultT>
extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> {
extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>> {
private final Coder<ResultT> resultCoder;

private GatherResults(Coder<ResultT> resultCoder) {
this.resultCoder = resultCoder;
}

@Override
public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
if (getWindowedWrites()) {
// Reshuffle the results to make them stable against retries.
// Use a single void key to maximize size of bundles for finalization.
Expand All @@ -453,7 +455,7 @@ public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
.apply("Reshuffle", Reshuffle.of())
.apply("Drop key", Values.create())
.apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<>()))
.setCoder(IterableCoder.of(resultCoder))
.setCoder(ListCoder.of(resultCoder))
// Reshuffle one more time to stabilize the contents of the bundle lists to finalize.
.apply(Reshuffle.viaRandomKey());
} else {
Expand All @@ -462,8 +464,23 @@ public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
return input
.getPipeline()
.apply(
"AsPossiblyEmptyList",
Reify.viewInGlobalWindow(
input.apply(View.asIterable()), IterableCoder.of(resultCoder)));
// Insert a reshuffle before taking the view to consolidate the (typically)
// one-output-per-bundle writes.
// This avoids producing a huge number of tiny files in the case that side
// inputs are materialized to disk bundle-by-bundle.
input.apply("Consolidate", Reshuffle.viaRandomKey()).apply(View.asIterable()),
IterableCoder.of(resultCoder)))
// View.asIterable() can be (significantly) cheaper than View.asList(), as it does not
// create a backing indexable view, but we must return a list to maintain update
// compatibility for consumers that are shared between this path and the streaming one.
.apply(
"IterableToList",
MapElements.via(
new SimpleFunction<Iterable<ResultT>, List<ResultT>>(
x -> ImmutableList.copyOf(x)) {}))
.setCoder(ListCoder.of(resultCoder));
}
}
}
Expand Down Expand Up @@ -729,7 +746,7 @@ public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
}

private class WriteAutoShardedBundlesToTempFiles
extends PTransform<PCollection<UserT>, PCollection<Iterable<FileResult<DestinationT>>>> {
extends PTransform<PCollection<UserT>, PCollection<List<FileResult<DestinationT>>>> {
private final Coder<DestinationT> destinationCoder;
private final Coder<FileResult<DestinationT>> fileResultCoder;

Expand All @@ -740,7 +757,7 @@ private WriteAutoShardedBundlesToTempFiles(
}

@Override
public PCollection<Iterable<FileResult<DestinationT>>> expand(PCollection<UserT> input) {
public PCollection<List<FileResult<DestinationT>>> expand(PCollection<UserT> input) {
// Auto-sharding is achieved via GroupIntoBatches.WithShardedKey which shards, groups and at
// the same time batches the input records. The sharding behavior depends on runners. The
// batching is per window and we also emit the batches if there are a certain number of
Expand Down Expand Up @@ -835,7 +852,7 @@ public DestinationT apply(FileResult<DestinationT> input) {
ParDo.of(
new DoFn<
KV<DestinationT, Iterable<FileResult<DestinationT>>>,
Iterable<FileResult<DestinationT>>>() {
List<FileResult<DestinationT>>>() {
@ProcessElement
public void processElement(
@Element KV<DestinationT, Iterable<FileResult<DestinationT>>> element,
Expand All @@ -847,7 +864,7 @@ public void processElement(
c.output(result);
}
}))
.setCoder(IterableCoder.of(fileResultCoder));
.setCoder(ListCoder.of(fileResultCoder));
}
}

Expand Down Expand Up @@ -1021,7 +1038,7 @@ public void finishBundle(FinishBundleContext c) throws Exception {

private class FinalizeTempFileBundles
extends PTransform<
PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
private final @Nullable PCollectionView<Integer> numShardsView;
private final Coder<DestinationT> destinationCoder;

Expand All @@ -1033,7 +1050,7 @@ private FinalizeTempFileBundles(

@Override
public WriteFilesResult<DestinationT> expand(
PCollection<Iterable<FileResult<DestinationT>>> input) {
PCollection<List<FileResult<DestinationT>>> input) {

List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs());
if (numShardsView != null) {
Expand All @@ -1054,7 +1071,7 @@ public WriteFilesResult<DestinationT> expand(
}

private class FinalizeFn
extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
extends DoFn<List<FileResult<DestinationT>>, KV<DestinationT, String>> {
@ProcessElement
public void process(ProcessContext c) throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
Expand Down Expand Up @@ -1105,7 +1122,7 @@ private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
return resultsToFinalFilenames;
}

private static class GatherBundlesPerWindowFn<T> extends DoFn<T, Iterable<T>> {
private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
private transient @Nullable Multimap<BoundedWindow, T> bundles = null;

@StartBundle
Expand Down

0 comments on commit 33293fe

Please sign in to comment.