Skip to content

Commit

Permalink
Merge pull request #15784 from cdapio/spanner-messaging-3
Browse files Browse the repository at this point in the history
[CDAP-21094] Spanner Messaging - Break down bigger message payloads into parts.
  • Loading branch information
sidhdirenge authored Jan 9, 2025
2 parents 58890c5 + 468339c commit a3e1b36
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.TopicId;
import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -66,13 +71,23 @@ public class SpannerMessagingService implements MessagingService {
public static final String PUBLISH_TS_FIELD = "publish_ts";
public static final String PUBLISH_TS_MICROS_FIELD = "publish_ts_micros";
public static final String PAYLOAD_SEQUENCE_ID_FIELD = "payload_sequence_id";
public static final String PAYLOAD_REMAINING_CHUNKS_FIELD = "payload_remaining_chunks";
public static final String SEQUENCE_ID_FIELD = "sequence_id";
public static final String TOPIC_METADATA_TABLE = "topic_metadata";
public static final String TOPIC_ID_FIELD = "topic_id";
public static final String PROPERTIES_FIELD = "properties";
public static final String NAMESPACE_FIELD = "namespace";
public static final String TOPIC_TABLE_PREFIX = "messaging";

// Maximum size of data per cell in spanner is 10 MB.
// Thus, inserting messages with more than 10MB requires chunking them into multiple rows.
// Keeping max size as 9MB so that we do not touch the spanner threshold.
private static final int MAX_PAYLOAD_SIZE_IN_BYTES = 9 * 1024 * 1024;

// Maximum commit size per transaction is 100 MB.
// Keeping max size as 99MB so that we do not touch the spanner threshold.
private static final int MAX_BATCH_SIZE_IN_BYTES = 99 * 1024 * 1024;

private DatabaseClient client;

private DatabaseAdminClient adminClient;
Expand All @@ -87,7 +102,7 @@ public class SpannerMessagingService implements MessagingService {

private int publishDelayMillis;

private final ConcurrentLinkedQueue<StoreRequest> batch = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<RequestFuture> batch = new ConcurrentLinkedQueue<>();

@Override
public void initialize(MessagingServiceContext context) throws IOException {
Expand Down Expand Up @@ -160,6 +175,11 @@ private String getCreateTopicMetadataDDLStatement() {
* Guaranteed to be monotonically increasing and unique across transactions
* that modify the same fields.
* </li>
* <li>**`payload_remaining_chunks`:** This field helps reassemble large messages
* that are split across multiple rows. It indicates how many more parts are needed
* to reconstruct the complete message.
* A value of 0 means this is the final part.
* </li>
* <li>**`payload`:** The message body.</li>
* <li>Message durability is currently set to 7 days. This means that Messaging service
* allows consumers to fetch messages as old as 7 days.</li>
Expand All @@ -168,11 +188,11 @@ private String getCreateTopicMetadataDDLStatement() {
*/
private String getCreateTopicDDLStatement(TopicId topicId) {
return String.format("CREATE TABLE IF NOT EXISTS %s ( %s INT64, %s INT64, %s"
+ " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s BYTES(MAX) )"
+ " TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), %s INT64, %s BYTES(MAX) )"
+ " PRIMARY KEY (%s, %s, %s), ROW DELETION POLICY" + " (OLDER_THAN(%s, INTERVAL 7 DAY))",
getTableName(topicId), SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PUBLISH_TS_FIELD,
PAYLOAD_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PUBLISH_TS_FIELD,
PUBLISH_TS_FIELD);
PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD, SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD,
PUBLISH_TS_FIELD, PUBLISH_TS_FIELD);
}

private void updateTopicMetadataTable(List<TopicMetadata> topics) throws IOException {
Expand Down Expand Up @@ -275,57 +295,132 @@ public List<TopicId> listTopics(NamespaceId namespaceId)
* shows how messages would be persisted in the topic tables.
*
* <pre>
* TXN sequence_id payload_sequence_id publish_ts payload
* TXN1 0 0 ts1 msg1
* TXN1 1 0 ts1 msg2_p0
* TXN1 1 1 ts1 msg2_p1
* TXN2 0 0 ts2 msg3
* TXN3 0 0 ts3 msg4
* TXN sequence_id payload_sequence_id publish_ts payload payload_remaining_chunks
* TXN1 0 0 ts1 msg1 0
* TXN1 1 0 ts1 msg2_p0 2
* TXN1 1 1 ts1 msg2_p1 1
* TXN1 1 2 ts1 msg2_p2 0
* TXN2 0 0 ts2 msg3 0
* TXN3 0 0 ts3 msg4 0
* </pre>
*/
@Nullable
@Override
public RollbackDetail publish(StoreRequest request)
throws TopicNotFoundException, IOException, UnauthorizedException {
// All messages within a StoreRequest are published in a single batch to maintain atomicity.
// Caller should ensure that the request size does not exceed the max allowed size.
long requestSize = StreamSupport.stream(request.spliterator(), false)
.mapToLong(payload -> payload.length)
.sum();
if (requestSize > MAX_BATCH_SIZE_IN_BYTES) {
throw new IllegalArgumentException(String.format(
"Request size %d bytes exceeds the maximum allowed size of %d bytes for Cloud Spanner.",
requestSize, MAX_BATCH_SIZE_IN_BYTES));
}

CompletableFuture<Void> publishFuture = new CompletableFuture<>();
batch.add(new RequestFuture(request, publishFuture, requestSize));
long start = System.currentTimeMillis();
writeData(start);
// Wait for the corresponding future to finish.
publishFuture.join();

batch.add(request);
if (!batch.isEmpty()) {
int sequenceId = 0;
List<Mutation> batchCopy = new ArrayList<>(batch.size());
// We need to batch less than fetch limit since we read for publish_ts >= last_message.publish_ts
// see fetch for explanation of why we read for publish_ts >= last_message.publish_ts and
// not publish_ts > last_message.publish_ts
while (!batch.isEmpty()) {
StoreRequest headRequest = batch.poll();
for (byte[] payload : headRequest) {
//TODO: [CDAP-21094] Breakdown messages with larger payload into parts.
Mutation mutation = Mutation.newInsertBuilder(getTableName(headRequest.getTopicId()))
.set(SEQUENCE_ID_FIELD).to(sequenceId++).set(PAYLOAD_SEQUENCE_ID_FIELD).to(0)
.set(PUBLISH_TS_FIELD).to("spanner.commit_timestamp()").set(PAYLOAD_FIELD)
.to(ByteArray.copyFrom(payload)).build();
batchCopy.add(mutation);
}
//TODO: Add a RollbackDetail implementation that throws exceptions if any of the methods is called.
return null;
}

if (batch.isEmpty() && (sequenceId < publishBatchSize
|| System.currentTimeMillis() - start < publishBatchTimeoutMillis)) {
try {
Thread.sleep(publishDelayMillis);
} catch (InterruptedException e) {
throw new IOException(e);
/**
* This method efficiently batches and publishes messages to Cloud Spanner, ensuring atomicity and
* fault tolerance.
*/
private synchronized void writeData(long start) {
AtomicInteger sequenceId = new AtomicInteger(0);
long estimatedBatchSize = 0;
List<Mutation> mutations = new ArrayList<>(batch.size());
List<CompletableFuture<Void>> futures = new ArrayList<>(batch.size());

// Check if adding mutations related to the head will exceed the batch size limit.
// If exceeding limit, publish the accumulated mutations and then only pick the next item in queue.
while (!batch.isEmpty() && (estimatedBatchSize + batch.peek().getSize()
< MAX_BATCH_SIZE_IN_BYTES)) {
// Ensure that all messages within a StoreRequest are published in a single batch to maintain atomicity.
// If the batch publish succeeds, all messages are persisted, and the corresponding
// future is completed successfully.
// If the batch publish fails, none of the messages are persisted,
// and the future is completed exceptionally, allowing the caller to retry the entire StoreRequest.
RequestFuture headRequest = batch.poll();
List<Mutation> mutationsForRequest = createMutations(headRequest.getStoreRequest(),
sequenceId);
mutations.addAll(mutationsForRequest);
futures.add(headRequest.getFuture());
estimatedBatchSize += headRequest.getSize();

// For better performance, wait for more messages to accumulate before committing the batch.
if (batch.isEmpty() && (sequenceId.get() < publishBatchSize
|| System.currentTimeMillis() - start < publishBatchTimeoutMillis)) {
try {
Thread.sleep(publishDelayMillis);
} catch (InterruptedException e) {
for (CompletableFuture<Void> future : futures) {
future.completeExceptionally(e);
}
}
}
if (!batchCopy.isEmpty()) {
try {
client.write(batchCopy);
} catch (SpannerException e) {
throw new IOException(e);
}
commitPublishMutations(mutations, futures);
}

/**
* This method creates mutations for all the messages that are part of the StoreRequest. Messages
* with payload larger than 10MB need to be split across multiple rows.
*/
private List<Mutation> createMutations(StoreRequest request, AtomicInteger sequenceId) {
List<Mutation> mutations = new ArrayList<>();
TopicId topicId = request.getTopicId();
for (byte[] payload : request) {
int payloadSequenceId = 0;
int remainingChunksCount = (payload.length - 1) / MAX_PAYLOAD_SIZE_IN_BYTES;
int length = payload.length;
int offset = 0;
while (offset < length) {
int chunkSize = Math.min(MAX_PAYLOAD_SIZE_IN_BYTES, length - offset);
ByteBuffer payloadChunk = ByteBuffer.wrap(payload, offset, chunkSize);
Mutation mutation = createMutation(topicId, sequenceId.get(), payloadSequenceId++,
remainingChunksCount--, payloadChunk);
mutations.add(mutation);
offset += chunkSize;
}
sequenceId.set(sequenceId.get() + 1);
}
return mutations;
}

private Mutation createMutation(TopicId topicId, int sequenceId, int payloadSequenceId,
int remainingParts, ByteBuffer payload) {
Mutation mutation = Mutation.newInsertBuilder(getTableName(topicId)).set(SEQUENCE_ID_FIELD)
.to(sequenceId).set(PAYLOAD_SEQUENCE_ID_FIELD).to(payloadSequenceId).set(PUBLISH_TS_FIELD)
.to("spanner.commit_timestamp()").set(PAYLOAD_FIELD).to(ByteArray.copyFrom(payload))
.set(PAYLOAD_REMAINING_CHUNKS_FIELD).to(remainingParts).build();
LOG.trace("mutation to publish {}", mutation);
return mutation;
}

private void commitPublishMutations(List<Mutation> mutations,
List<CompletableFuture<Void>> futures) {
if (!mutations.isEmpty()) {
try {
client.write(mutations);
LOG.trace("published mutations : {}", mutations.size());
for (CompletableFuture<Void> future : futures) {
future.complete(null);
}
} catch (SpannerException e) {
for (CompletableFuture<Void> future : futures) {
future.completeExceptionally(e);
}
}
}
//TODO: Add a RollbackDetail implementation that throws exceptions if any of the methods is called.
return null;
}

@Override
Expand Down Expand Up @@ -385,19 +480,15 @@ public CloseableIterator<RawMessage> fetch(MessageFetchRequest messageFetchReque
// or
// publish_ts = TIMESTAMP_MICROS(startTime) and sequence_id > sequenceId
// order by
// publish_ts, sequence_id
// publish_ts, sequence_id, payload_sequence_id
String sqlStatement = String.format(
"SELECT %s, %s, UNIX_MICROS(%s) %s, %s FROM %s where (%s > TIMESTAMP_MICROS(%s)) or"
+ " (%s = TIMESTAMP_MICROS(%s) and %s > %s) order by" + " %s, %s LIMIT %s",
SpannerMessagingService.SEQUENCE_ID_FIELD,
SpannerMessagingService.PAYLOAD_SEQUENCE_ID_FIELD,
SpannerMessagingService.PUBLISH_TS_FIELD, SpannerMessagingService.PUBLISH_TS_MICROS_FIELD,
SpannerMessagingService.PAYLOAD_FIELD,
SpannerMessagingService.getTableName(messageFetchRequest.getTopicId()),
PUBLISH_TS_FIELD, startTime, PUBLISH_TS_FIELD, startTime,
SEQUENCE_ID_FIELD, sequenceId, PUBLISH_TS_FIELD, SEQUENCE_ID_FIELD,
messageFetchRequest.getLimit());

"SELECT %s, %s, UNIX_MICROS(%s) %s, %s, %s FROM %s where (%s > TIMESTAMP_MICROS(%s)) or"
+ " (%s = TIMESTAMP_MICROS(%s) and %s > %s) order by" + " %s, %s, %s LIMIT %s",
SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, PUBLISH_TS_FIELD, PUBLISH_TS_MICROS_FIELD,
PAYLOAD_REMAINING_CHUNKS_FIELD, PAYLOAD_FIELD,
getTableName(messageFetchRequest.getTopicId()), PUBLISH_TS_FIELD, startTime,
PUBLISH_TS_FIELD, startTime, SEQUENCE_ID_FIELD, sequenceId, PUBLISH_TS_FIELD,
SEQUENCE_ID_FIELD, PAYLOAD_SEQUENCE_ID_FIELD, messageFetchRequest.getLimit());
try {
ResultSet resultSet = client.singleUse().executeQuery(Statement.of(sqlStatement));
return new SpannerResultSetClosableIterator<>(resultSet);
Expand All @@ -420,13 +511,29 @@ protected io.cdap.cdap.messaging.spi.RawMessage computeNext() {
if (!resultSet.next()) {
return endOfData();
}
long remainingParts = resultSet.getLong(PAYLOAD_REMAINING_CHUNKS_FIELD);
ByteArrayOutputStream payload = new ByteArrayOutputStream();
try {
payload.write(resultSet.getBytes(PAYLOAD_FIELD).toByteArray());
while (remainingParts > 0) {
if (!resultSet.next()) {
// Note : result.next() moves the pointer to the next row in Spanner fetch results.
LOG.trace("Fetch batch is incomplete. Looking for more {} rows", remainingParts);
return endOfData();
}
payload.write(resultSet.getBytes(PAYLOAD_FIELD).toByteArray());
remainingParts--;
}
} catch (IOException e) {
throw new RuntimeException(e);
}

byte[] id = getMessageId(resultSet.getLong(SEQUENCE_ID_FIELD),
resultSet.getLong(PAYLOAD_SEQUENCE_ID_FIELD), resultSet.getLong(PUBLISH_TS_MICROS_FIELD));
byte[] payload = resultSet.getBytes(PAYLOAD_FIELD).toByteArray();

return new io.cdap.cdap.messaging.spi.RawMessage.Builder().setId(id).setPayload(payload)
.build();
// We always set payload sequence id in fetch() as 0,
// since we send back just one message after re-assembling all the payload chunks.
byte[] id = getMessageId(resultSet.getLong(SEQUENCE_ID_FIELD), 0,
resultSet.getLong(PUBLISH_TS_MICROS_FIELD));
return new io.cdap.cdap.messaging.spi.RawMessage.Builder().setId(id)
.setPayload(payload.toByteArray()).build();
}

@Override
Expand All @@ -447,8 +554,34 @@ static byte[] getMessageId(long sequenceId, long payloadSequenceId, long timesta
offset = Bytes.putShort(result, offset, (short) sequenceId);
// This 0 corresponds to the write timestamp which we do not maintain in case of spanner messaging service.
offset = Bytes.putLong(result, offset, 0);
//TODO: [CDAP-21094] Handle messages with larger payload which were broken into parts during publish.
Bytes.putShort(result, offset, (short) payloadSequenceId);
return result;
}

// Helper class to hold the publish request and its future.
private static class RequestFuture {

private final StoreRequest storeRequest;
private final CompletableFuture<Void> future;
private final long size;

private RequestFuture(StoreRequest storeRequest, CompletableFuture<Void> future, long size) {
this.storeRequest = storeRequest;
this.future = future;
this.size = size;
}

private StoreRequest getStoreRequest() {
return storeRequest;
}

private long getSize() {
return size;
}

private CompletableFuture<Void> getFuture() {
return future;
}
}

}
Loading

0 comments on commit a3e1b36

Please sign in to comment.