From 6b525186caeb69132062bb6e0fd9c2e7f1bd182f Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Fri, 24 Jan 2025 01:38:31 -0800 Subject: [PATCH] Offset-based deduplication for Unbounded Source and Dataflow Runner (#33591) --- .../dataflow/internal/CustomSources.java | 9 ++++++ .../worker/StreamingModeExecutionContext.java | 28 +++++++++++++++++++ .../runners/dataflow/worker/WindmillSink.java | 27 +++++++++++++++++- .../windmill/src/main/proto/windmill.proto | 2 ++ .../apache/beam/sdk/io/UnboundedSource.java | 27 ++++++++++++++++++ 5 files changed, 92 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java index fcfe3fe3ce05..007402baf71b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.internal; import static com.google.api.client.util.Base64.encodeBase64String; +import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; import static org.apache.beam.runners.dataflow.util.Structs.addString; import static org.apache.beam.runners.dataflow.util.Structs.addStringList; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; @@ -46,6 +47,10 @@ public class CustomSources { private static final String SERIALIZED_SOURCE = "serialized_source"; @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits"; + @VisibleForTesting + static final String SERIALIZED_OFFSET_BASED_DEDUPLICATION = + "serialized_offset_based_deduplication"; + private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class); private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) { @@ -93,6 +98,10 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour } checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split"); addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits); + addBoolean( + cloudSource.getSpec(), + SERIALIZED_OFFSET_BASED_DEDUPLICATION, + unboundedSource.offsetBasedDeduplicationSupported()); } else { throw new IllegalArgumentException("Unexpected source kind: " + source.getClass()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 2463ca682ca2..097032e9822e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -195,6 +195,27 @@ public boolean workIsFailed() { return work != null && work.isFailed(); } + public boolean offsetBasedDeduplicationSupported() { + return activeReader != null + && activeReader.getCurrentSource().offsetBasedDeduplicationSupported(); + } + + public byte[] getCurrentRecordId() { + if (!offsetBasedDeduplicationSupported()) { + throw new RuntimeException( + "Unexpected getCurrentRecordId() while offset-based deduplication is not enabled."); + } + return activeReader.getCurrentRecordId(); + } + + public byte[] getCurrentRecordOffset() { + if (!offsetBasedDeduplicationSupported()) { + throw new RuntimeException( + "Unexpected getCurrentRecordOffset() while offset-based deduplication is not enabled."); + } + return activeReader.getCurrentRecordOffset(); + } + public void start( @Nullable Object key, Work work, @@ -440,6 +461,13 @@ public Map flushState() { throw new RuntimeException("Exception while encoding checkpoint", e); } sourceStateBuilder.setState(stream.toByteString()); + if (activeReader.getCurrentSource().offsetBasedDeduplicationSupported()) { + byte[] offsetLimit = checkpointMark.getOffsetLimit(); + if (offsetLimit.length == 0) { + throw new RuntimeException("Checkpoint offset limit must be non-empty."); + } + sourceStateBuilder.setOffsetLimit(ByteString.copyFrom(offsetLimit)); + } } outputBuilder.setSourceWatermark(WindmillTimeUtils.harnessToWindmillTimestamp(watermark)); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index f83c68ab3c90..52d7b2563f5a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -214,8 +214,33 @@ public long add(WindowedValue data) throws IOException { .setData(value) .setMetadata(metadata); keyedOutput.addMessages(builder.build()); + + long offsetSize = 0; + if (context.offsetBasedDeduplicationSupported()) { + if (id.size() > 0) { + throw new RuntimeException( + "Unexpected record ID via ValueWithRecordIdCoder while offset-based deduplication enabled."); + } + byte[] rawId = context.getCurrentRecordId(); + if (rawId.length == 0) { + throw new RuntimeException( + "Unexpected empty record ID while offset-based deduplication enabled."); + } + id = ByteString.copyFrom(rawId); + + byte[] rawOffset = context.getCurrentRecordOffset(); + if (rawOffset.length == 0) { + throw new RuntimeException( + "Unexpected empty record offset while offset-based deduplication enabled."); + } + ByteString offset = ByteString.copyFrom(rawOffset); + offsetSize = offset.size(); + keyedOutput.addMessageOffsets(offset); + } + keyedOutput.addMessagesIds(id); - return (long) key.size() + value.size() + metadata.size() + id.size(); + + return (long) key.size() + value.size() + metadata.size() + id.size() + offsetSize; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index b30cba60d65b..e1b52547fe73 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -56,6 +56,7 @@ message KeyedMessageBundle { optional fixed64 sharding_key = 4; repeated Message messages = 2; repeated bytes messages_ids = 3; + repeated bytes message_offsets = 5; } message LatencyAttribution { @@ -410,6 +411,7 @@ message SourceState { optional bytes state = 1; repeated fixed64 finalize_ids = 2; optional bool only_finalize = 3; + optional bytes offset_limit = 4; } message WatermarkHold { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index 840e4910e2a2..f38715b37e1e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -93,6 +93,21 @@ public boolean requiresDeduping() { return false; } + /** + * If offsetBasedDeduplicationSupported returns true, then the UnboundedSource needs to provide + * the following: + * + *
    + *
  • UnboundedReader which provides offsets that are unique for each element and + * lexicographically ordered. + *
  • CheckpointMark which provides an offset greater than all elements read and less than or + * equal to the next offset that will be read. + *
+ */ + public boolean offsetBasedDeduplicationSupported() { + return false; + } + /** * A marker representing the progress and state of an {@link * org.apache.beam.sdk.io.UnboundedSource.UnboundedReader}. @@ -139,6 +154,12 @@ public void finalizeCheckpoint() throws IOException { // nothing to do } } + + /* Get offset limit for unbounded source split checkpoint. */ + default byte[] getOffsetLimit() { + throw new RuntimeException( + "CheckpointMark must override getOffsetLimit() if offset-based deduplication is enabled for the UnboundedSource."); + } } /** @@ -203,6 +224,12 @@ public byte[] getCurrentRecordId() throws NoSuchElementException { return EMPTY; } + /* Returns the offset for the current record of this unbounded reader. */ + public byte[] getCurrentRecordOffset() { + throw new RuntimeException( + "UnboundedReader must override getCurrentRecordOffset() if offset-based deduplication is enabled for the UnboundedSource."); + } + /** * Returns a timestamp before or at the timestamps of all future elements read by this reader. *