From 406ca84be6bd09115134dcf4fcc27e358204dad5 Mon Sep 17 00:00:00 2001
From: Steven Baldwin <87034744+sbaldwin-rs@users.noreply.github.com>
Date: Wed, 24 Jan 2024 17:01:42 -0800
Subject: [PATCH] Propagate record timestamps (#21)
Propagates kafka record timestamps for detection latency calculations
---
CHANGELOG.md | 4 +
pom.xml | 2 +-
.../java/rockset/RocksetRequestWrapper.java | 10 +++
src/main/java/rockset/RocksetSinkTask.java | 2 +-
.../java/rockset/models/KafkaMessage.java | 79 +++++++++++++------
5 files changed, 71 insertions(+), 26 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b6a3a66..493fcdf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
# Rockset Kafka Connect Changelog
+
+## v2.1.0 2024-01-24
+- Propagate kafka record timestamps for source detection latency
+
## v2.0.0 2023-10-30
- New configuration option `rockset.retry.backoff.ms`
- Removed deprecated configurations `rockset.apikey`, `rockset.collection`, and `rockset.workspace`
diff --git a/pom.xml b/pom.xml
index fb2441b..ddd3099 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
rockset
kafka-connect-rockset
- 2.0.0
+ 2.1.0
jar
kafka-connect-rockset
diff --git a/src/main/java/rockset/RocksetRequestWrapper.java b/src/main/java/rockset/RocksetRequestWrapper.java
index 2f98c55..5e8233c 100644
--- a/src/main/java/rockset/RocksetRequestWrapper.java
+++ b/src/main/java/rockset/RocksetRequestWrapper.java
@@ -14,6 +14,7 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -68,6 +69,15 @@ public void addDoc(
.key(key)
.offset(record.kafkaOffset())
.partition(record.kafkaPartition());
+
+ if(record.timestamp() != null){
+ if (record.timestampType() == TimestampType.CREATE_TIME){
+ message.createTime(record.timestamp());
+ } else if (record.timestampType() == TimestampType.LOG_APPEND_TIME){
+ message.logAppendTime(record.timestamp());
+ }
+ }
+
messages.add(message);
} catch (Exception e) {
throw new ConnectException("Invalid JSON encountered in stream ", e);
diff --git a/src/main/java/rockset/RocksetSinkTask.java b/src/main/java/rockset/RocksetSinkTask.java
index 7fe51c6..85ca2ce 100644
--- a/src/main/java/rockset/RocksetSinkTask.java
+++ b/src/main/java/rockset/RocksetSinkTask.java
@@ -106,7 +106,7 @@ public void put(Collection records) {
e ->
executorService.submit(
() ->
- requestWrapper.addDoc(
+ requestWrapper.addDoc(
e.getKey().topic(),
e.getValue(),
recordParser,
diff --git a/src/main/java/rockset/models/KafkaMessage.java b/src/main/java/rockset/models/KafkaMessage.java
index fceed73..1e34491 100644
--- a/src/main/java/rockset/models/KafkaMessage.java
+++ b/src/main/java/rockset/models/KafkaMessage.java
@@ -1,9 +1,9 @@
package rockset.models;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Objects;
public class KafkaMessage {
@@ -23,6 +23,12 @@ public class KafkaMessage {
@SerializedName("key")
public Object key;
+ @SerializedName("create_time")
+ public Long createTime;
+
+ @SerializedName("log_append_time")
+ public Long logAppendTime;
+
/*
* Getters
*/
@@ -51,6 +57,19 @@ public Object getKey() {
return this.key;
}
+ @JsonProperty("create_time")
+ @ApiModelProperty(required = false, value = "Create time of the message")
+ public Long getCreateTime() {
+ return this.createTime;
+ }
+
+ @JsonProperty("log_append_time")
+ @ApiModelProperty(required = false, value = "Log append time of the message")
+ public Long getLogAppendTime() {
+ return this.logAppendTime;
+ }
+
+
/*
* Setters
*/
@@ -71,6 +90,16 @@ public void setKey(Object key) {
this.key = key;
}
+
+ public void setLogAppendTime(Long timestamp) {
+ this.logAppendTime = timestamp;
+ }
+
+ public void setCreateTime(Long timestamp) {
+ this.createTime = timestamp;
+ }
+
+
/*
* Builders
*/
@@ -95,6 +124,16 @@ public KafkaMessage key(Object key) {
return this;
}
+ public KafkaMessage logAppendTime(Long timestamp) {
+ this.logAppendTime = timestamp;
+ return this;
+ }
+
+ public KafkaMessage createTime(Long timestamp) {
+ this.createTime = timestamp;
+ return this;
+ }
+
/*
* Utilities
*/
@@ -107,35 +146,27 @@ private String toIndentedString(java.lang.Object o) {
}
@Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("class KafkaMessage {\n");
-
- sb.append(" partition: ").append(this.toIndentedString(this.partition)).append("\n");
- sb.append(" offset: ").append(this.toIndentedString(this.offset)).append("\n");
- sb.append(" document: ").append(this.toIndentedString(this.document)).append("\n");
- sb.append(" key: ").append(this.toIndentedString(this.key)).append("\n");
- sb.append("}");
- return sb.toString();
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ KafkaMessage that = (KafkaMessage) o;
+ return partition == that.partition && offset == that.offset && Objects.equal(document, that.document) && Objects.equal(key, that.key) && Objects.equal(createTime, that.createTime) && Objects.equal(logAppendTime, that.logAppendTime);
}
@Override
public int hashCode() {
- return Objects.hash(this.document, this.partition, this.offset, this.key);
+ return Objects.hashCode(document, partition, offset, key, createTime, logAppendTime);
}
@Override
- public boolean equals(java.lang.Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || this.getClass() != o.getClass()) {
- return false;
- }
- final KafkaMessage kafkaMessage = (KafkaMessage) o;
- return this.getPartition() == kafkaMessage.getPartition()
- && this.getOffset() == kafkaMessage.getOffset()
- && Objects.equals(this.document, kafkaMessage.document)
- && Objects.equals(this.key, kafkaMessage.key);
+ public String toString() {
+ return "KafkaMessage{" +
+ "document=" + document +
+ ", partition=" + partition +
+ ", offset=" + offset +
+ ", key=" + key +
+ ", createTime=" + createTime +
+ ", logAppendTime=" + logAppendTime +
+ '}';
}
}