Skip to content

Commit

Permalink
Kafka source offset-based deduplication.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Jan 23, 2025
1 parent 952610e commit 9b30e79
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 1,142 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
this.partitions = partitions;
Expand All @@ -66,6 +70,23 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}

@Override
public byte[] getOffsetLimit() {
if (!reader.isPresent()) {
throw new RuntimeException(
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
}
if (!reader.get().offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
}

// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
PartitionMark partition = partitions.get(/* index= */ 0);
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
}

/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
.setRedistributed(false)
.setAllowDuplicates(false)
.setRedistributeNumKeys(0)
.setOffsetDeduplication(false)
.build();
}

Expand Down Expand Up @@ -717,6 +718,9 @@ public abstract static class Read<K, V>
@Pure
public abstract int getRedistributeNumKeys();

@Pure
public abstract boolean isOffsetDeduplication();

@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();

Expand Down Expand Up @@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(

abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

abstract Builder<K, V> setOffsetDeduplication(boolean offsetDeduplication);

abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

Expand Down Expand Up @@ -892,6 +898,10 @@ static <K, V> void setupExternalBuilder(
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
}
// TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
if (config.offsetDeduplication != null) {
builder.setOffsetDeduplication(config.offsetDeduplication);
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -959,6 +969,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
private Boolean offsetDeduplication;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public void setOffsetDeduplication(Boolean offsetDeduplication) {
this.offsetDeduplication = offsetDeduplication;
}
}
}

Expand Down Expand Up @@ -1086,6 +1101,10 @@ public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

public Read<K, V> withOffsetDeduplication(boolean offsetDeduplication) {
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ Object getDefaultValue() {
return false;
}
},
;
OFFSET_DEDUPLICATION(LEGACY) {
@Override
Object getDefaultValue() {
return false;
}
};

private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -142,4 +144,17 @@ void update(double quantity) {
return avg;
}
}

static final class OffsetBasedDeduplication {
private static final ByteBuffer offsetBuffer = ByteBuffer.allocate(Long.BYTES);

static byte[] encodeOffset(long offset) {
offsetBuffer.putLong(/* index= */ 0, offset);
return offsetBuffer.array();
}

static byte[] getUniqueId(String topic, int partition, long offset) {
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ public boolean advance() throws IOException {
curTimestamp =
pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
curRecord = record;
if (this.offsetBasedDeduplicationSupported) {
curOffset = KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(offset);
curId =
KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
rawRecord.topic(), rawRecord.partition(), rawRecord.offset());
}

int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
Expand Down Expand Up @@ -299,6 +305,28 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}

@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curId == null) {
throw new NoSuchElementException("KafkaUnboundedReader's curId is null.");
}
return curId;
}

@Override
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curOffset == null) {
throw new NoSuchElementException("KafkaUnboundedReader's curOffset is null.");
}
return curOffset;
}

@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
Expand All @@ -314,6 +342,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}

public boolean offsetBasedDeduplicationSupported() {
return this.offsetBasedDeduplicationSupported;
}

////////////////////////////////////////////////////////////////////////////////////////////////

private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
Expand All @@ -336,6 +368,10 @@ public long getSplitBacklogBytes() {
private @Nullable Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

private final boolean offsetBasedDeduplicationSupported;
private byte[] curOffset = new byte[0];
private byte[] curId = new byte[0];

private @Nullable Deserializer<K> keyDeserializerInstance = null;
private @Nullable Deserializer<V> valueDeserializerInstance = null;

Expand Down Expand Up @@ -507,6 +543,7 @@ Instant updateAndGetWatermark() {
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported();

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
// XXX make all splits have the same # of partitions
while (partitions.size() % numSplits > 0) {
++numSplits;
int numSplits;
if (offsetBasedDeduplicationSupported()) {
// Enforce 1:1 split to partition ratio for offset deduplication.
numSplits = partitions.size();
LOG.info(
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
+ "Forcing the number of splits to equal the number of total partitions: {}.",
numSplits);
} else {
numSplits = Math.min(desiredNumSplits, partitions.size());
// Make all splits have the same # of partitions.
while (partitions.size() % numSplits > 0) {
++numSplits;
}
}
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);

Expand Down Expand Up @@ -177,6 +187,11 @@ public boolean requiresDeduping() {
return false;
}

@Override
public boolean offsetBasedDeduplicationSupported() {
return spec.isOffsetDeduplication();
}

@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void testConstructKafkaRead() throws Exception {
Field.of("consumer_polling_timeout", FieldType.INT64),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -123,6 +124,7 @@ public void testConstructKafkaRead() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -247,7 +249,8 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
Field.of("timestamp_policy", FieldType.STRING),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -258,6 +261,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl
.addBooleanField("redistribute")
.addBooleanField("allows_duplicates")
.addNullableInt32Field("redistribute_num_keys")
.addBooleanField("offset_deduplication")
.addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration())
.addByteArrayField("timestamp_policy_factory")
.addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES)
Expand Down Expand Up @@ -221,6 +222,7 @@ public Row toConfigRow(Read<?, ?> transform) {
fieldValues.put("redistribute", transform.isRedistributed());
fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys());
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
fieldValues.put("offset_deduplication", transform.isOffsetDeduplication());
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

Expand Down Expand Up @@ -349,6 +351,10 @@ public Row toConfigRow(Read<?, ?> transform) {
}
}
}
Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
if (offsetDeduplication != null) {
transform = transform.withOffsetDeduplication(offsetDeduplication);
}
Duration maxReadTime = configRow.getValue("max_read_time");
if (maxReadTime != null) {
transform =
Expand Down

0 comments on commit 9b30e79

Please sign in to comment.