Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In place cleanup of several abstractions #17

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/rockset/RequestWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collection;
import org.apache.kafka.connect.sink.SinkRecord;
import rockset.parser.RecordParser;

public interface RequestWrapper {

Expand Down
12 changes: 0 additions & 12 deletions src/main/java/rockset/RocksetConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,10 @@ public String getRocksetApiServerUrl() {
return this.getString(ROCKSET_APISERVER_URL);
}

public String getRocksetApikey() {
return this.getString(ROCKSET_APIKEY);
}

public String getRocksetIntegrationKey() {
return this.getString(ROCKSET_INTEGRATION_KEY);
}

public String getRocksetCollection() {
return this.getString(ROCKSET_COLLECTION);
}

public String getRocksetWorkspace() {
return this.getString(ROCKSET_WORKSPACE);
}

public int getRocksetTaskThreads() {
return this.getInt(ROCKSET_TASK_THREADS);
}
Expand Down
54 changes: 21 additions & 33 deletions src/main/java/rockset/RocksetRequestWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -22,6 +21,7 @@
import org.slf4j.LoggerFactory;
import rockset.models.KafkaDocumentsRequest;
import rockset.models.KafkaMessage;
import rockset.parser.RecordParser;

public class RocksetRequestWrapper implements RequestWrapper {
private static Logger log = LoggerFactory.getLogger(RocksetRequestWrapper.class);
Expand All @@ -35,25 +35,11 @@ public class RocksetRequestWrapper implements RequestWrapper {
private String integrationKeyEncoded;
private String apiServer;

public RocksetRequestWrapper(RocksetConnectorConfig config) {
if (client == null) {
client =
new OkHttpClient.Builder()
.connectTimeout(1, TimeUnit.MINUTES)
.writeTimeout(1, TimeUnit.MINUTES)
.readTimeout(1, TimeUnit.MINUTES)
.build();
}

parseConnectionString(config.getRocksetIntegrationKey());
this.apiServer = config.getRocksetApiServerUrl();
}

// used for testing
public RocksetRequestWrapper(RocksetConnectorConfig config, OkHttpClient client) {
this.client = client;

parseConnectionString(config.getRocksetApiServerUrl());
parseConnectionString(config.getRocksetIntegrationKey());
this.apiServer = config.getRocksetApiServerUrl();
}

Expand All @@ -66,14 +52,6 @@ private static String base64EncodeAsUserPassword(String integrationKey) {
return Base64.getEncoder().encodeToString(userPassword.getBytes(StandardCharsets.UTF_8));
}

private boolean isInternalError(int code) {
return code == 500 // INTERNALERROR
|| code == 502
|| code == 503 // NOT_READY
|| code == 504
|| code == 429; // RESOURCEEXCEEDED
}

@Override
public void addDoc(
String topic, Collection<SinkRecord> records, RecordParser recordParser, int batchSize) {
Expand Down Expand Up @@ -105,6 +83,14 @@ public void addDoc(
sendDocs(topic, messages);
}

private boolean isRetriableHttpCode(int code) {
return code == 429 || code >= 500;
}

private boolean isSuccessHttpCode(int code) {
return code == 200;
}

private void sendDocs(String topic, List<KafkaMessage> messages) {
Preconditions.checkArgument(!messages.isEmpty());
log.debug("Sending batch of {} messages for topic: {} to Rockset", messages.size(), topic);
Expand All @@ -123,22 +109,24 @@ private void sendDocs(String topic, List<KafkaMessage> messages) {
.build();

try (Response response = client.newCall(request).execute()) {
if (isInternalError(response.code())) {
// internal errors are retriable
if (isSuccessHttpCode(response.code())) {
// Nothing to do, write succeeded
return;
}
if (isRetriableHttpCode(response.code())) {
throw new RetriableException(
String.format(
"Received internal error code: %s, message: %s. Can Retry.",
"Received retriable http status code %d, message: %s. Can Retry.",
response.code(), response.message()));
}

if (response.code() != 200) {
throw new ConnectException(
String.format(
"Unable to write document" + " in Rockset, cause: %s", response.message()));
}
// Anything that is not retriable and is not a success is a permanent error
throw new ConnectException(
String.format(
"Unable to write document" + " in Rockset, cause: %s", response.message()));
}
} catch (SocketTimeoutException ste) {
throw new RetriableException("Encountered socket timeout exception. Can Retry", ste);
throw new RetriableException("Encountered socket timeout exception. Can Retry.", ste);
} catch (RetriableException e) {
throw e;
} catch (Exception e) {
Expand Down
122 changes: 77 additions & 45 deletions src/main/java/rockset/RocksetSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rockset;

import com.github.jcustenborder.kafka.connect.utils.VersionUtil;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -13,14 +14,19 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rockset.parser.AvroParser;
import rockset.parser.JsonParser;
import rockset.parser.RecordParser;
import rockset.utils.BlockingExecutor;
import rockset.utils.RetriableTask;

Expand Down Expand Up @@ -57,8 +63,15 @@ private RecordParser getRecordParser(String format) {

@Override
public void start(Map<String, String> settings) {
log.info("Starting Rockset Kafka Connect Plugin");
this.config = new RocksetConnectorConfig(settings);
this.rw = new RocksetRequestWrapper(config);
OkHttpClient httpClient =
new OkHttpClient.Builder()
.connectTimeout(1, TimeUnit.MINUTES)
.writeTimeout(1, TimeUnit.MINUTES)
.readTimeout(1, TimeUnit.MINUTES)
.build();
this.rw = new RocksetRequestWrapper(config, httpClient);

int numThreads = this.config.getRocksetTaskThreads();
this.taskExecutorService =
Expand All @@ -76,6 +89,7 @@ public void start(Map<String, String> settings) {
this.recordParser = getRecordParser(config.getFormat());
}

@TestOnly
public void start(
Map<String, String> settings,
RequestWrapper rw,
Expand All @@ -88,51 +102,92 @@ public void start(
this.retryExecutorService = retryExecutorService;
this.futureMap = new HashMap<>();
this.recordParser = getRecordParser(config.getFormat());
log.info("Starting Rockset Kafka Connect Plugin");
}

@Override
public void put(Collection<SinkRecord> records) {
if (records.size() == 0) {
log.debug("zero records in put call, returning");
return;
public void stop() {
log.info("Stopping Rockset Kafka Connect Plugin, waiting for active tasks to complete");
if (taskExecutorService != null) {
taskExecutorService.shutdownNow();
}

submitForProcessing(records);
log.info("Stopped Rockset Kafka Connect Plugin");
}

private Map<TopicPartition, Collection<SinkRecord>> partitionRecordsByTopic(
Collection<SinkRecord> records) {
Map<TopicPartition, Collection<SinkRecord>> topicPartitionedRecords = new HashMap<>();
for (SinkRecord record : records) {
TopicPartition key = new TopicPartition(record.topic(), record.kafkaPartition());
topicPartitionedRecords.computeIfAbsent(key, k -> new ArrayList<>()).add(record);
}
// open() will be called for a partition before any put() is called for it
@Override
public void open(Collection<TopicPartition> partitions) {
log.debug(String.format("Opening %d partitions: %s", partitions.size(), partitions));
partitions.forEach(
tp -> {
Preconditions.checkState(!futureMap.containsKey(tp));
futureMap.put(tp, new ArrayList<>());
});
}

return topicPartitionedRecords;
@Override
public void close(Collection<TopicPartition> partitions) {
log.debug(String.format("Closing %d partitions: %s", partitions.size(), partitions));
partitions.forEach(
tp -> {
Preconditions.checkState(futureMap.containsKey(tp));
futureMap.remove(tp);
});
}

private void submitForProcessing(Collection<SinkRecord> records) {
partitionRecordsByTopic(records)
// put() doesn't need to block until the writes complete, that is what flush() is for
@Override
public void put(Collection<SinkRecord> records) {
if (records.size() == 0) {
log.debug("zero records in put call, returning");
return;
}

groupRecordsByTopicPartition(records)
.forEach(
(toppar, recordBatch) -> {
(tp, recordBatch) -> {
try {
RetriableTask task =
new RetriableTask(
taskExecutorService,
retryExecutorService,
() -> addDocs(toppar.topic(), recordBatch));
() -> addDocs(tp.topic(), recordBatch));

// this should only block if all the threads are busy
taskExecutorService.submit(task);

futureMap.computeIfAbsent(toppar, k -> new ArrayList<>()).add(task);
Preconditions.checkState(futureMap.containsKey(tp));
futureMap.get(tp).add(task);
} catch (InterruptedException e) {
throw new ConnectException("Failed to put records", e);
}
});
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
map.forEach(
(tp, offsetAndMetadata) -> {
log.debug(
"Flushing for topic: {}, partition: {}, offset: {}, metadata: {}",
tp.topic(),
tp.partition(),
offsetAndMetadata.offset(),
offsetAndMetadata.metadata());
checkForFailures(tp);
});
}

private Map<TopicPartition, Collection<SinkRecord>> groupRecordsByTopicPartition(
Collection<SinkRecord> records) {
Map<TopicPartition, Collection<SinkRecord>> topicPartitionedRecords = new HashMap<>();
for (SinkRecord record : records) {
TopicPartition key = new TopicPartition(record.topic(), record.kafkaPartition());
topicPartitionedRecords.computeIfAbsent(key, k -> new ArrayList<>()).add(record);
}

return topicPartitionedRecords;
}

private boolean isRetriableException(Throwable e) {
return (e.getCause() != null && e.getCause() instanceof RetriableException);
}
Expand All @@ -145,7 +200,7 @@ private void checkForFailures(TopicPartition tp) {
List<RetriableTask> futures = futureMap.get(tp);
Iterator<RetriableTask> futureIterator = futures.iterator();
while (futureIterator.hasNext()) {
Future future = futureIterator.next();
Future<Void> future = futureIterator.next();
try {
future.get();
} catch (Exception e) {
Expand Down Expand Up @@ -174,29 +229,6 @@ private void addDocs(String topic, Collection<SinkRecord> records) {
this.rw.addDoc(topic, records, recordParser, this.config.getRocksetBatchSize());
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
map.forEach(
(toppar, offsetAndMetadata) -> {
log.debug(
"Flushing for topic: {}, partition: {}, offset: {}, metadata: {}",
toppar.topic(),
toppar.partition(),
offsetAndMetadata.offset(),
offsetAndMetadata.metadata());
checkForFailures(toppar);
});
}

@Override
public void stop() {
log.info("Stopping Rockset Kafka Connect Plugin, waiting for active tasks to complete");
if (taskExecutorService != null) {
taskExecutorService.shutdownNow();
}
log.info("Stopped Rockset Kafka Connect Plugin");
}

@Override
public String version() {
return VersionUtil.version(this.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rockset;
package rockset.parser;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -20,12 +20,12 @@
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import rockset.LogicalConverters.DateConverter;
import rockset.LogicalConverters.LogicalTypeConverter;
import rockset.LogicalConverters.TimeConverter;
import rockset.LogicalConverters.TimestampConverter;
import rockset.parser.LogicalConverters.DateConverter;
import rockset.parser.LogicalConverters.LogicalTypeConverter;
import rockset.parser.LogicalConverters.TimeConverter;
import rockset.parser.LogicalConverters.TimestampConverter;

class AvroParser implements RecordParser {
public class AvroParser implements RecordParser {
private static final Map<String, LogicalTypeConverter> LOGICAL_TYPE_CONVERTERS =
ImmutableMap.of(
Time.SCHEMA.name(), new TimeConverter(),
Expand Down
Loading
Loading