Skip to content

Commit

Permalink
Guarantee in order delivery for partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Ari Ekmekji committed Oct 30, 2023
1 parent 6525cc2 commit e0afd4c
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 453 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Rockset Kafka Connect Changelog
## v2.0.0 2023-10-30
- New configuration option `rockset.retry.backoff.ms`
- Bug fix for potential out-of-order message delivery

## v1.4.3 2023-09-15
- Update rockset-java client dependency

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>rockset</groupId>
<artifactId>kafka-connect-rockset</artifactId>
<version>1.4.3</version>
<version>2.0.0</version>
<packaging>jar</packaging>

<name>kafka-connect-rockset</name>
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/rockset/RocksetConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class RocksetConnectorConfig extends AbstractConfig {
public static final String ROCKSET_WORKSPACE = "rockset.workspace";
public static final String ROCKSET_TASK_THREADS = "rockset.task.threads";
public static final String ROCKSET_BATCH_SIZE = "rockset.batch.size";
public static final String ROCKSET_RETRY_BACKOFF_MS = "rockset.retry.backoff.ms";

private RocksetConnectorConfig(ConfigDef config, Map<String, String> originals) {
super(config, originals, true);
Expand All @@ -44,7 +45,7 @@ public static ConfigDef config() {
.documentation("Rockset API Server URL")
.importance(Importance.HIGH)
.validator(RocksetConnectorConfig::validateApiServer)
.defaultValue("https://api.rs2.usw2.rockset.com")
.defaultValue("https://api.usw2a1.rockset.com")
.build())
.define(
ConfigKeyBuilder.of(ROCKSET_INTEGRATION_KEY, Type.STRING)
Expand Down Expand Up @@ -91,6 +92,12 @@ public static ConfigDef config() {
"(Deprecated) Rockset workspace that incoming documents will be written to.")
.importance(Importance.HIGH)
.defaultValue("commons")
.build())
.define(
ConfigKeyBuilder.of(ROCKSET_RETRY_BACKOFF_MS, Type.INT)
.documentation("How long to backoff in milliseconds between retriable errors")
.importance(Importance.MEDIUM)
.defaultValue(5000)
.build());
}

Expand Down Expand Up @@ -154,4 +161,8 @@ public int getRocksetBatchSize() {
public String getFormat() {
return this.getString(FORMAT);
}

public int getRetryBackoffMs() {
return this.getInt(ROCKSET_RETRY_BACKOFF_MS);
}
}
205 changes: 61 additions & 144 deletions src/main/java/rockset/RocksetSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package rockset;

import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import okhttp3.OkHttpClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -28,25 +21,15 @@
import rockset.parser.JsonParser;
import rockset.parser.RecordParser;
import rockset.utils.BlockingExecutor;
import rockset.utils.RetriableTask;

public class RocksetSinkTask extends SinkTask {
private static Logger log = LoggerFactory.getLogger(RocksetSinkTask.class);
private RequestWrapper rw;

// taskExecutorService is responsible to run the task of sending data
// to Rockset. If the task fails, it submits it to retryExecutorService
// to be submitted back to taskExecutorService after a delay.
private BlockingExecutor taskExecutorService;

// retryExecutorService scheduled tasks to be retried after a delay and
// submits it to taskExecutorService. If the retryExecutorService is full
// it will fail the task immediately (retrying is best effort).
// Make sure this has more threads than the task executor thread pool always
// otherwise it can lead to deadlock.
private ExecutorService retryExecutorService;

private Map<TopicPartition, List<RetriableTask>> futureMap;

private static final Logger log = LoggerFactory.getLogger(RocksetSinkTask.class);

private RequestWrapper requestWrapper;

private BlockingExecutor executorService;

private RocksetConnectorConfig config;
private RecordParser recordParser;

Expand All @@ -71,164 +54,98 @@ public void start(Map<String, String> settings) {
.writeTimeout(1, TimeUnit.MINUTES)
.readTimeout(1, TimeUnit.MINUTES)
.build();
this.rw = new RocksetRequestWrapper(config, httpClient);

int numThreads = this.config.getRocksetTaskThreads();
this.taskExecutorService =
new BlockingExecutor(numThreads, Executors.newFixedThreadPool(numThreads));

this.retryExecutorService =
new ThreadPoolExecutor(
numThreads * 2,
numThreads * 2,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(numThreads * 10));

this.futureMap = new HashMap<>();
this.requestWrapper = new RocksetRequestWrapper(config, httpClient);
this.executorService =
new BlockingExecutor(
config.getRocksetTaskThreads(),
Executors.newScheduledThreadPool(config.getRocksetTaskThreads()));
this.recordParser = getRecordParser(config.getFormat());
}

@TestOnly
public void start(
Map<String, String> settings,
RequestWrapper rw,
ExecutorService executorService,
ExecutorService retryExecutorService) {
RequestWrapper requestWrapper,
BlockingExecutor executorService) {
this.config = new RocksetConnectorConfig(settings);
this.rw = rw;
this.taskExecutorService =
new BlockingExecutor(config.getRocksetTaskThreads(), executorService);
this.retryExecutorService = retryExecutorService;
this.futureMap = new HashMap<>();
this.requestWrapper = requestWrapper;
this.executorService = executorService;
this.recordParser = getRecordParser(config.getFormat());
}

@Override
public void stop() {
log.info("Stopping Rockset Kafka Connect Plugin, waiting for active tasks to complete");
if (taskExecutorService != null) {
taskExecutorService.shutdownNow();
if (executorService != null) {
executorService.shutdownNow();
}
log.info("Stopped Rockset Kafka Connect Plugin");
}

// open() will be called for a partition before any put() is called for it
@Override
public void open(Collection<TopicPartition> partitions) {
log.debug(String.format("Opening %d partitions: %s", partitions.size(), partitions));
partitions.forEach(
tp -> {
Preconditions.checkState(!futureMap.containsKey(tp));
futureMap.put(tp, new ArrayList<>());
});
log.debug("Opening {} partitions: {}", partitions.size(), partitions);
}

@Override
public void close(Collection<TopicPartition> partitions) {
log.debug(String.format("Closing %d partitions: %s", partitions.size(), partitions));
partitions.forEach(
tp -> {
Preconditions.checkState(futureMap.containsKey(tp));
futureMap.remove(tp);
});
log.debug("Closing {} partitions: {}", partitions.size(), partitions);
}

// put() doesn't need to block until the writes complete, that is what flush() is for
@Override
public void put(Collection<SinkRecord> records) {
if (records.size() == 0) {
log.debug("zero records in put call, returning");
return;
}

groupRecordsByTopicPartition(records)
.forEach(
(tp, recordBatch) -> {
try {
RetriableTask task =
new RetriableTask(
taskExecutorService,
retryExecutorService,
() -> addDocs(tp.topic(), recordBatch));

// this should only block if all the threads are busy
taskExecutorService.submit(task);

Preconditions.checkState(futureMap.containsKey(tp));
futureMap.get(tp).add(task);
} catch (InterruptedException e) {
throw new ConnectException("Failed to put records", e);
Map<TopicPartition, Future<?>> futures =
records.stream()
.collect(Collectors.groupingBy(r -> new TopicPartition(r.topic(), r.kafkaPartition())))
.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
executorService.submit(
() ->
requestWrapper.addDoc(
e.getKey().topic(),
e.getValue(),
recordParser,
config.getRocksetBatchSize()))));
futures.forEach(
(tp, f) -> {
try {
f.get();
} catch (Exception e) {
// Stop all tasks still executing since this put() will be retried anyway
futures.values().forEach(g -> g.cancel(true));
if (isRetriableException(e)) {
if (context != null) {
context.timeout(config.getRetryBackoffMs());
}
});
throw new RetriableException(
String.format(
"Encountered a retriable exception for topic %s partition %s",
tp.topic(), tp.partition()),
e);
}
throw new ConnectException(
String.format(
"Encountered an unexpected exception for topic %s partition %s",
tp.topic(), tp.partition()),
e);
}
});
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
map.forEach(
(tp, offsetAndMetadata) -> {
log.debug(
"Flushing for topic: {}, partition: {}, offset: {}, metadata: {}",
tp.topic(),
tp.partition(),
offsetAndMetadata.offset(),
offsetAndMetadata.metadata());
checkForFailures(tp);
});
}

private Map<TopicPartition, Collection<SinkRecord>> groupRecordsByTopicPartition(
Collection<SinkRecord> records) {
Map<TopicPartition, Collection<SinkRecord>> topicPartitionedRecords = new HashMap<>();
for (SinkRecord record : records) {
TopicPartition key = new TopicPartition(record.topic(), record.kafkaPartition());
topicPartitionedRecords.computeIfAbsent(key, k -> new ArrayList<>()).add(record);
}

return topicPartitionedRecords;
// Not needed
}

private boolean isRetriableException(Throwable e) {
return (e.getCause() != null && e.getCause() instanceof RetriableException);
}

private void checkForFailures(TopicPartition tp) {
if (futureMap.get(tp) == null) {
return;
}

List<RetriableTask> futures = futureMap.get(tp);
Iterator<RetriableTask> futureIterator = futures.iterator();
while (futureIterator.hasNext()) {
Future<Void> future = futureIterator.next();
try {
future.get();
} catch (Exception e) {
if (isRetriableException(e)) {
throw new RetriableException(
String.format(
"Unable to write document for topic: %s, partition: %s, in Rockset,"
+ " should retry, cause: %s",
tp.topic(), tp.partition(), e.getMessage()),
e);
}

throw new RuntimeException(
String.format(
"Unable to write document for topic: %s, partition: %s, in Rockset," + " cause: %s",
tp.topic(), tp.partition(), e.getMessage()),
e);
} finally {
futureIterator.remove();
}
}
}

private void addDocs(String topic, Collection<SinkRecord> records) {
log.debug("Adding {} records to Rockset for topic: {}", records.size(), topic);
this.rw.addDoc(topic, records, recordParser, this.config.getRocksetBatchSize());
}

@Override
public String version() {
return VersionUtil.version(this.getClass());
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/rockset/utils/BlockingExecutor.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package rockset.utils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import org.apache.kafka.connect.errors.RetriableException;

/**
* Use BlockingExecutor to block threads submitting tasks, when the executor is completely occupied.
Expand All @@ -19,8 +21,14 @@ public BlockingExecutor(int numThreads, ExecutorService executorService) {

// returns immediately if a thread is available to run the task,
// else blocks until one of the active tasks completes
public Future<?> submit(RetriableTask task) throws InterruptedException {
semaphore.acquire();
public Future<?> submit(Runnable task) {
try {
semaphore.acquire();
} catch (InterruptedException unused) {
CompletableFuture<Void> a = new CompletableFuture<>();
a.completeExceptionally(new RetriableException("Thread interrrupted"));
return a;
}
return executorService.submit(
() -> {
try {
Expand Down
Loading

0 comments on commit e0afd4c

Please sign in to comment.