Skip to content

Commit

Permalink
Kafka source offset-based deduplication.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Jan 24, 2025
1 parent 3cb1440 commit ef33cba
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public static void verifyDeterministic(Coder<?> target, String message, Iterable
}
}

public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value) throws Exception {
public static <T> long getEncodedElementByteSizeUsingCoder(Coder<T> target, T value)
throws Exception {
return target.getEncodedElementByteSize(value);
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
this.partitions = partitions;
Expand All @@ -66,6 +70,23 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}

@Override
public byte[] getOffsetLimit() {
if (!reader.isPresent()) {
throw new RuntimeException(
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
}
if (!reader.get().offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
}

// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
PartitionMark partition = partitions.get(/* index= */ 0);
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
}

/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
.setRedistributed(false)
.setAllowDuplicates(false)
.setRedistributeNumKeys(0)
.setOffsetDeduplication(false)
.build();
}

Expand Down Expand Up @@ -717,6 +718,9 @@ public abstract static class Read<K, V>
@Pure
public abstract int getRedistributeNumKeys();

@Pure
public abstract boolean isOffsetDeduplication();

@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();

Expand Down Expand Up @@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(

abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

abstract Builder<K, V> setOffsetDeduplication(boolean offsetDeduplication);

abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

Expand Down Expand Up @@ -886,11 +892,20 @@ static <K, V> void setupExternalBuilder(
if (config.allowDuplicates != null) {
builder.setAllowDuplicates(config.allowDuplicates);
}

/*
* TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
* Until then, enforce explicit enablement only with redistributed without duplicates.
*/
if (config.redistribute
&& (config.allowDuplicates == null || !config.allowDuplicates)
&& config.offsetDeduplication != null) {
builder.setOffsetDeduplication(config.offsetDeduplication);
}
} else {
builder.setRedistributed(false);
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
builder.setOffsetDeduplication(false);
}
}

Expand Down Expand Up @@ -959,6 +974,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
private Boolean offsetDeduplication;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1015,6 +1031,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public void setOffsetDeduplication(Boolean offsetDeduplication) {
this.offsetDeduplication = offsetDeduplication;
}
}
}

Expand Down Expand Up @@ -1073,7 +1093,7 @@ public Read<K, V> withRedistribute() {
}

public Read<K, V> withAllowDuplicates(Boolean allowDuplicates) {
if (!isAllowDuplicates()) {
if (!isRedistributed()) {
LOG.warn("Setting this value without setting withRedistribute() will have no effect.");
}
return toBuilder().setAllowDuplicates(allowDuplicates).build();
Expand All @@ -1086,6 +1106,17 @@ public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

public Read<K, V> withOffsetDeduplication(boolean offsetDeduplication) {
/*
* TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
* Until then, enforce explicit enablement only with redistributed without duplicates.
*/
checkState(
isRedistributed() && !isAllowDuplicates(),
"withOffsetDeduplication is currently only supported with: withRedistribute() and withAllowDuplicates(false).");
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ Object getDefaultValue() {
return false;
}
},
;
OFFSET_DEDUPLICATION(LEGACY) {
@Override
Object getDefaultValue() {
return false;
}
};

private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -142,4 +144,15 @@ void update(double quantity) {
return avg;
}
}

static final class OffsetBasedDeduplication {

static byte[] encodeOffset(long offset) {
return Longs.toByteArray(offset);
}

static byte[] getUniqueId(String topic, int partition, long offset) {
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,29 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}

@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curRecord != null) {
return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
curRecord.getTopic(), curRecord.getPartition(), curRecord.getOffset());
}
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}

@Override
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curRecord != null) {
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(curRecord.getOffset());
}
throw new NoSuchElementException("KafkaUnboundedReader's curRecord is null.");
}

@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
Expand All @@ -314,6 +337,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}

public boolean offsetBasedDeduplicationSupported() {
return this.offsetBasedDeduplicationSupported;
}

////////////////////////////////////////////////////////////////////////////////////////////////

private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
Expand All @@ -336,6 +363,8 @@ public long getSplitBacklogBytes() {
private @Nullable Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

private final boolean offsetBasedDeduplicationSupported;

private @Nullable Deserializer<K> keyDeserializerInstance = null;
private @Nullable Deserializer<V> valueDeserializerInstance = null;

Expand Down Expand Up @@ -507,6 +536,7 @@ Instant updateAndGetWatermark() {
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported();

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
// XXX make all splits have the same # of partitions
while (partitions.size() % numSplits > 0) {
++numSplits;
int numSplits;
if (offsetBasedDeduplicationSupported()) {
// Enforce 1:1 split to partition ratio for offset deduplication.
numSplits = partitions.size();
LOG.info(
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
+ "Forcing the number of splits to equal the number of total partitions: {}.",
numSplits);
} else {
numSplits = Math.min(desiredNumSplits, partitions.size());
// Make all splits have the same # of partitions.
while (partitions.size() % numSplits > 0) {
++numSplits;
}
}
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);

Expand Down Expand Up @@ -177,6 +187,11 @@ public boolean requiresDeduping() {
return false;
}

@Override
public boolean offsetBasedDeduplicationSupported() {
return spec.isOffsetDeduplication();
}

@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void testConstructKafkaRead() throws Exception {
Field.of("consumer_polling_timeout", FieldType.INT64),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -123,6 +124,7 @@ public void testConstructKafkaRead() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -247,7 +249,8 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
Field.of("timestamp_policy", FieldType.STRING),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -258,6 +261,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl
.addBooleanField("redistribute")
.addBooleanField("allows_duplicates")
.addNullableInt32Field("redistribute_num_keys")
.addBooleanField("offset_deduplication")
.addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration())
.addByteArrayField("timestamp_policy_factory")
.addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES)
Expand Down Expand Up @@ -221,6 +222,7 @@ public Row toConfigRow(Read<?, ?> transform) {
fieldValues.put("redistribute", transform.isRedistributed());
fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys());
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
fieldValues.put("offset_deduplication", transform.isOffsetDeduplication());
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

Expand Down Expand Up @@ -349,6 +351,10 @@ public Row toConfigRow(Read<?, ?> transform) {
}
}
}
Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
if (offsetDeduplication != null) {
transform = transform.withOffsetDeduplication(offsetDeduplication);
}
Duration maxReadTime = configRow.getValue("max_read_time");
if (maxReadTime != null) {
transform =
Expand Down

0 comments on commit ef33cba

Please sign in to comment.