diff --git a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java index 1dee6f1749df..f1c3edd51730 100644 --- a/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java +++ b/cdap-messaging-ext-spanner/src/main/java/io/cdap/cdap/messaging/spanner/SpannerMessagingService.java @@ -187,12 +187,15 @@ private String getCreateTopicMetadataDDLStatement() { *

*/ private String getCreateTopicDDLStatement(TopicId topicId) { - return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s" - + " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s INT64, %s BYTES(MAX) )" - + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(%s, INTERVAL 7 DAY))", - getTableName(topicId), SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PUBLISH_TS_FIELD, - PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, - PUBLISH_TS_FIELD, PUBLISH_TS_FIELD); + return String.format( + "CREATE TABLE IF NOT EXISTS %s ( %s TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true)," + + " %s INT64, %s INT64, %s INT64, %s BYTES(MAX) )" + + " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + + " (OLDER_THAN(%s, INTERVAL 7 DAY))", getTableName(topicId), + PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, + PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD, + PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, + PUBLISH_TS_FIELD); } private void updateTopicMetadataTable(List topics) throws IOException { @@ -482,9 +485,9 @@ public CloseableIterator fetch(MessageFetchRequest messageFetchReque // order by // publish_ts, sequence_id, payload_sequence_id String sqlStatement = String.format( - "SELECT %s, %s, UNIX_MICROS(%s) %s, %s, %s FROM %s where (%s > TIMESTAMP_MICROS(%s)) or" + "SELECT UNIX_MICROS(%s) %s, %s, %s, %s, %s FROM %s where (%s > TIMESTAMP_MICROS(%s)) or" + " (%s = TIMESTAMP_MICROS(%s) and %s > %s) order by" + " %s, %s, %s LIMIT %s", - SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PUBLISH_TS_FIELD, PUBLISH_TS_MICROS_FIELD, + PUBLISH_TS_FIELD, PUBLISH_TS_MICROS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD, getTableName(messageFetchRequest.getTopicId()), PUBLISH_TS_FIELD, startTime, PUBLISH_TS_FIELD, startTime, SEQUENCE_ID_FIELD, sequenceId, PUBLISH_TS_FIELD,