Skip to content

Commit

Permalink
Merge pull request #15795 from cdapio/sidhdirenge-patch-2
Browse files Browse the repository at this point in the history
[CDAP-21079] Update primary key sequencing for better indexing in Spanner Messaging Service
  • Loading branch information
sidhdirenge authored Jan 9, 2025
2 parents a3e1b36 + 36dbaff commit 9e317c1
Showing 1 changed file with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,15 @@ private String getCreateTopicMetadataDDLStatement() {
* </p>
*/
private String getCreateTopicDDLStatement(TopicId topicId) {
return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s"
+ " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s INT64, %s BYTES(MAX) )"
+ " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(%s, INTERVAL 7 DAY))",
getTableName(topicId), SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PUBLISH_TS_FIELD,
PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD,
PUBLISH_TS_FIELD, PUBLISH_TS_FIELD);
return String.format(
"CREATE TABLE IF NOT EXISTS %s ( %s TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),"
+ " %s INT64, %s INT64, %s INT64, %s BYTES(MAX) )"
+ " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY"
+ " (OLDER_THAN(%s, INTERVAL 7 DAY))", getTableName(topicId),
PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD,
PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD,
PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD,
PUBLISH_TS_FIELD);
}

private void updateTopicMetadataTable(List<TopicMetadata> topics) throws IOException {
Expand Down Expand Up @@ -482,9 +485,9 @@ public CloseableIterator<RawMessage> fetch(MessageFetchRequest messageFetchReque
// order by
// publish_ts, sequence_id, payload_sequence_id
String sqlStatement = String.format(
"SELECT %s, %s, UNIX_MICROS(%s) %s, %s, %s FROM %s where (%s > TIMESTAMP_MICROS(%s)) or"
"SELECT UNIX_MICROS(%s) %s, %s, %s, %s, %s FROM %s where (%s > TIMESTAMP_MICROS(%s)) or"
+ " (%s = TIMESTAMP_MICROS(%s) and %s > %s) order by" + " %s, %s, %s LIMIT %s",
SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PUBLISH_TS_FIELD, PUBLISH_TS_MICROS_FIELD,
PUBLISH_TS_FIELD, PUBLISH_TS_MICROS_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD,
PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD,
getTableName(messageFetchRequest.getTopicId()), PUBLISH_TS_FIELD, startTime,
PUBLISH_TS_FIELD, startTime, SEQUENCE_ID_FIELD, sequenceId, PUBLISH_TS_FIELD,
Expand Down

0 comments on commit 9e317c1

Please sign in to comment.