Skip to content

Commit

Permalink
Offset-based deduplication for Unbounded Source and Dataflow Runner (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp authored Jan 24, 2025
1 parent b8bd935 commit 6b52518
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.internal;

import static com.google.api.client.util.Base64.encodeBase64String;
import static org.apache.beam.runners.dataflow.util.Structs.addBoolean;
import static org.apache.beam.runners.dataflow.util.Structs.addString;
import static org.apache.beam.runners.dataflow.util.Structs.addStringList;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
Expand Down Expand Up @@ -46,6 +47,10 @@ public class CustomSources {
private static final String SERIALIZED_SOURCE = "serialized_source";
@VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";

@VisibleForTesting
static final String SERIALIZED_OFFSET_BASED_DEDUPLICATION =
"serialized_offset_based_deduplication";

private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);

private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
Expand Down Expand Up @@ -93,6 +98,10 @@ public static com.google.api.services.dataflow.model.Source serializeToCloudSour
}
checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
addBoolean(
cloudSource.getSpec(),
SERIALIZED_OFFSET_BASED_DEDUPLICATION,
unboundedSource.offsetBasedDeduplicationSupported());
} else {
throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,27 @@ public boolean workIsFailed() {
return work != null && work.isFailed();
}

public boolean offsetBasedDeduplicationSupported() {
return activeReader != null
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
}

public byte[] getCurrentRecordId() {
if (!offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getCurrentRecordId() while offset-based deduplication is not enabled.");
}
return activeReader.getCurrentRecordId();
}

public byte[] getCurrentRecordOffset() {
if (!offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getCurrentRecordOffset() while offset-based deduplication is not enabled.");
}
return activeReader.getCurrentRecordOffset();
}

public void start(
@Nullable Object key,
Work work,
Expand Down Expand Up @@ -440,6 +461,13 @@ public Map<Long, Runnable> flushState() {
throw new RuntimeException("Exception while encoding checkpoint", e);
}
sourceStateBuilder.setState(stream.toByteString());
if (activeReader.getCurrentSource().offsetBasedDeduplicationSupported()) {
byte[] offsetLimit = checkpointMark.getOffsetLimit();
if (offsetLimit.length == 0) {
throw new RuntimeException("Checkpoint offset limit must be non-empty.");
}
sourceStateBuilder.setOffsetLimit(ByteString.copyFrom(offsetLimit));
}
}
outputBuilder.setSourceWatermark(WindmillTimeUtils.harnessToWindmillTimestamp(watermark));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,33 @@ public long add(WindowedValue<T> data) throws IOException {
.setData(value)
.setMetadata(metadata);
keyedOutput.addMessages(builder.build());

long offsetSize = 0;
if (context.offsetBasedDeduplicationSupported()) {
if (id.size() > 0) {
throw new RuntimeException(
"Unexpected record ID via ValueWithRecordIdCoder while offset-based deduplication enabled.");
}
byte[] rawId = context.getCurrentRecordId();
if (rawId.length == 0) {
throw new RuntimeException(
"Unexpected empty record ID while offset-based deduplication enabled.");
}
id = ByteString.copyFrom(rawId);

byte[] rawOffset = context.getCurrentRecordOffset();
if (rawOffset.length == 0) {
throw new RuntimeException(
"Unexpected empty record offset while offset-based deduplication enabled.");
}
ByteString offset = ByteString.copyFrom(rawOffset);
offsetSize = offset.size();
keyedOutput.addMessageOffsets(offset);
}

keyedOutput.addMessagesIds(id);
return (long) key.size() + value.size() + metadata.size() + id.size();

return (long) key.size() + value.size() + metadata.size() + id.size() + offsetSize;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ message KeyedMessageBundle {
optional fixed64 sharding_key = 4;
repeated Message messages = 2;
repeated bytes messages_ids = 3;
repeated bytes message_offsets = 5;
}

message LatencyAttribution {
Expand Down Expand Up @@ -410,6 +411,7 @@ message SourceState {
optional bytes state = 1;
repeated fixed64 finalize_ids = 2;
optional bool only_finalize = 3;
optional bytes offset_limit = 4;
}

message WatermarkHold {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,21 @@ public boolean requiresDeduping() {
return false;
}

/**
* If offsetBasedDeduplicationSupported returns true, then the UnboundedSource needs to provide
* the following:
*
* <ul>
* <li>UnboundedReader which provides offsets that are unique for each element and
* lexicographically ordered.
* <li>CheckpointMark which provides an offset greater than all elements read and less than or
* equal to the next offset that will be read.
* </ul>
*/
public boolean offsetBasedDeduplicationSupported() {
return false;
}

/**
* A marker representing the progress and state of an {@link
* org.apache.beam.sdk.io.UnboundedSource.UnboundedReader}.
Expand Down Expand Up @@ -139,6 +154,12 @@ public void finalizeCheckpoint() throws IOException {
// nothing to do
}
}

/* Get offset limit for unbounded source split checkpoint. */
default byte[] getOffsetLimit() {
throw new RuntimeException(
"CheckpointMark must override getOffsetLimit() if offset-based deduplication is enabled for the UnboundedSource.");
}
}

/**
Expand Down Expand Up @@ -203,6 +224,12 @@ public byte[] getCurrentRecordId() throws NoSuchElementException {
return EMPTY;
}

/* Returns the offset for the current record of this unbounded reader. */
public byte[] getCurrentRecordOffset() {
throw new RuntimeException(
"UnboundedReader must override getCurrentRecordOffset() if offset-based deduplication is enabled for the UnboundedSource.");
}

/**
* Returns a timestamp before or at the timestamps of all future elements read by this reader.
*
Expand Down

0 comments on commit 6b52518

Please sign in to comment.