From 213dca3ed2c3de98b208ffd1301468cc23b7eeab Mon Sep 17 00:00:00 2001 From: Mark Fields Date: Thu, 29 Aug 2024 14:53:54 -0700 Subject: [PATCH] (Porting ce0a14ca to 2.2 release branch) Offline: Model empty batches differently, exposing the empty Grouped Batch message to more of the system (#22343) Empty batches are a unique case, where we submit a Grouped Batch message with no inner runtime messages. This is needed for tracking batches between forks of a container (each trying to resubmit the same local content). There are several flows that need the message or at least sequenceNumber-related metadata: * BatchStart/BatchEnd events * DeltaScheduler * savedOps mechanism in PendingStateManager * The DataProcessingError thrown if the container forks * Future (See #22310): MinimumSequenceNumber We used to include the `emptyBatchSequenceNumber` and that partially addressed this, but it's better to include the whole message. So add this as the `keyMessage` to `InboundBatch` type - and for non-empty batches this is just the first message (we were already referring to that in certain places). --- .../container-runtime/src/containerRuntime.ts | 54 ++++---- .../src/opLifecycle/remoteMessageProcessor.ts | 17 ++- .../src/pendingStateManager.ts | 6 +- .../remoteMessageProcessor.spec.ts | 123 ++++++++++-------- .../src/test/pendingStateManager.spec.ts | 12 +- 5 files changed, 111 insertions(+), 101 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index b337e78b741c..74ab9b44443a 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -2690,7 +2690,7 @@ export class ContainerRuntime this.ensureNoDataModelChanges(() => this.processRuntimeMessage(msg)); }); } else { - this.ensureNoDataModelChanges(() => this.processEmptyBatch(inboundBatch, local)); + this.ensureNoDataModelChanges(() => this.processEmptyBatch(inboundBatch)); } } else { // Check if message.type is one of values in ContainerMessageType @@ -2717,6 +2717,13 @@ export class ContainerRuntime ); } } + + if (local) { + // If we have processed a local op, this means that the container is + // making progress and we can reset the counter for how many times + // we have consecutively replayed the pending states + this.resetReconnectCount(); + } } private _processedClientSequenceNumber: number | undefined; @@ -2729,7 +2736,7 @@ export class ContainerRuntime private processRuntimeMessage( messageWithContext: MessageWithContext & { isRuntimeMessage: true }, ) { - const { message, local, localOpMetadata } = messageWithContext; + const { message, localOpMetadata } = messageWithContext; // Intercept to reduce minimum sequence number to the delta manager's minimum sequence number. // Sequence numbers are not guaranteed to follow any sort of order. Re-entrancy is one of those situations @@ -2767,13 +2774,6 @@ export class ContainerRuntime this.emit("op", message, messageWithContext.isRuntimeMessage); this.scheduleManager.afterOpProcessing(undefined, message); - - if (local) { - // If we have processed a local op, this means that the container is - // making progress and we can reset the counter for how many times - // we have consecutively replayed the pending states - this.resetReconnectCount(); - } } catch (e) { this.scheduleManager.afterOpProcessing(e, message); throw e; @@ -2781,22 +2781,27 @@ export class ContainerRuntime } /** - * Process an empty batch, which will execute expected actions while processing even if there are no messages. - * This is a separate function because the processCore function expects at least one message to process. - * It is expected to happen only when the outbox produces an empty batch due to a resubmit flow. + * Process an empty batch, which will execute expected actions while processing even if there are no inner runtime messages. + * + * @remarks - Empty batches are produced by the outbox on resubmit when the resubmit flow resulted in no runtime messages. + * This can happen if changes from a remote client "cancel out" the pending changes being resubmited by this client. + * We submit an empty batch if "offline load" (aka rehydrating from stashed state) is enabled, + * to ensure we account for this batch when comparing batchIds, checking for a forked container. + * Otherwise, we would not realize this container has forked in the case where it did fork, and a batch became empty but wasn't submitted as such. */ - private processEmptyBatch(emptyBatch: InboundBatch, local: boolean) { - const { emptyBatchSequenceNumber: sequenceNumber, batchStartCsn } = emptyBatch; - assert(sequenceNumber !== undefined, 0x9fa /* emptyBatchSequenceNumber must be defined */); - this.emit("batchBegin", { sequenceNumber }); + private processEmptyBatch(emptyBatch: InboundBatch) { + const { keyMessage, batchStartCsn } = emptyBatch; + this.scheduleManager.beforeOpProcessing(keyMessage); + this._processedClientSequenceNumber = batchStartCsn; if (!this.hasPendingMessages()) { this.updateDocumentDirtyState(false); } - this.emit("batchEnd", undefined, { sequenceNumber }); - if (local) { - this.resetReconnectCount(); - } + + // We emit this event but say isRuntimeMessage is false, because there are no actual runtime messages here being processed. + // But someone listening to this event expecting to be notified whenever a message arrives would want to know about this. + this.emit("op", keyMessage, false /* isRuntimeMessage */); + this.scheduleManager.afterOpProcessing(undefined /* error */, keyMessage); } /** @@ -2806,7 +2811,7 @@ export class ContainerRuntime private observeNonRuntimeMessage( messageWithContext: MessageWithContext & { isRuntimeMessage: false }, ) { - const { message, local } = messageWithContext; + const { message } = messageWithContext; // Intercept to reduce minimum sequence number to the delta manager's minimum sequence number. // Sequence numbers are not guaranteed to follow any sort of order. Re-entrancy is one of those situations @@ -2835,13 +2840,6 @@ export class ContainerRuntime this.emit("op", message, messageWithContext.isRuntimeMessage); this.scheduleManager.afterOpProcessing(undefined, message); - - if (local) { - // If we have processed a local op, this means that the container is - // making progress and we can reset the counter for how many times - // we have consecutively replayed the pending states - this.resetReconnectCount(); - } } catch (e) { this.scheduleManager.afterOpProcessing(e, message); throw e; diff --git a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts index 99bd163aa7cc..b14e0c787ab4 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts @@ -30,15 +30,20 @@ export interface InboundBatch { /** clientId that sent this batch. Used to compute Batch ID if needed */ readonly clientId: string; /** - * Client Sequence Number of the first message in the batch. + * Client Sequence Number of the Grouped Batch message, or the first message in the ungrouped batch. * Used to compute Batch ID if needed * * @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk). * For grouped batches, clientSequenceNumber on messages is overwritten, so we track this original value here. */ readonly batchStartCsn: number; - /** For an empty batch (with no messages), we need to remember the empty grouped batch's sequence number */ - readonly emptyBatchSequenceNumber?: number; + /** + * The first message in the batch, or if the batch is empty, the empty grouped batch message + * Used for accessing the sequence numbers for the (start of the) batch. + * + * @remarks Do not use clientSequenceNumber here, use batchStartCsn instead. + */ + readonly keyMessage: ISequencedDocumentMessage; } function assertHasClientId( @@ -141,9 +146,7 @@ export class RemoteMessageProcessor { batchStartCsn: message.clientSequenceNumber, clientId, batchId, - // If the batch is empty, we need to return the sequence number aside - emptyBatchSequenceNumber: - groupedMessages.length === 0 ? message.sequenceNumber : undefined, + keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch }; } @@ -184,6 +187,7 @@ export class RemoteMessageProcessor { batchId: asBatchMetadata(message.metadata)?.batchId, clientId: message.clientId, batchStartCsn: message.clientSequenceNumber, + keyMessage: message, }; return { batchEnded: false }; @@ -195,6 +199,7 @@ export class RemoteMessageProcessor { batchStartCsn: message.clientSequenceNumber, clientId: message.clientId, batchId: asBatchMetadata(message.metadata)?.batchId, + keyMessage: message, }; return { batchEnded: true }; } diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index cfb6918e9740..01faa700d34b 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -364,11 +364,7 @@ export class PendingStateManager implements IDisposable { // Empty batch if (batch.messages.length === 0) { - assert( - batch.emptyBatchSequenceNumber !== undefined, - 0x9fb /* Expected sequence number for empty batch */, - ); - const localOpMetadata = this.processNextPendingMessage(batch.emptyBatchSequenceNumber); + const localOpMetadata = this.processNextPendingMessage(batch.keyMessage.sequenceNumber); assert( asEmptyBatchLocalOpMetadata(localOpMetadata)?.emptyBatch === true, 0xa20 /* Expected empty batch marker */, diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts index 6101f30764f9..8c8ef8814dd9 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts @@ -244,88 +244,97 @@ describe("RemoteMessageProcessor", () => { processor.process(message, () => {}), ); + // Expected results + const messagesA = [ + { + "contents": "A1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 1, + "metadata": { "batch": true }, + "clientId": "CLIENT_ID", + }, + { + "contents": "A2", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 2, + "clientId": "CLIENT_ID", + }, + { + "contents": "A3", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 3, + "metadata": { "batch": false }, + "clientId": "CLIENT_ID", + }, + ]; + const messagesB = [ + { + "contents": "B1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 4, + "clientId": "CLIENT_ID", + }, + ]; + const messagesC = [ + { + "contents": "C1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 5, + "metadata": { "batch": true, "batchId": "C" }, + "clientId": "CLIENT_ID", + }, + { + "contents": "C2", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 6, + "metadata": { "batch": false }, + "clientId": "CLIENT_ID", + }, + ]; + const messagesD = [ + { + "contents": "D1", + "referenceSequenceNumber": 1, + "clientSequenceNumber": 7, + "metadata": { "batchId": "D" }, + "clientId": "CLIENT_ID", + }, + ]; const expectedResults = [ // A undefined, undefined, { - messages: [ - { - "contents": "A1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 1, - "metadata": { "batch": true }, - "clientId": "CLIENT_ID", - }, - { - "contents": "A2", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 2, - "clientId": "CLIENT_ID", - }, - { - "contents": "A3", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 3, - "metadata": { "batch": false }, - "clientId": "CLIENT_ID", - }, - ], + messages: messagesA, clientId: "CLIENT_ID", batchId: undefined, batchStartCsn: 1, + keyMessage: messagesA[0], }, // B { - messages: [ - { - "contents": "B1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 4, - "clientId": "CLIENT_ID", - }, - ], + messages: messagesB, clientId: "CLIENT_ID", batchId: undefined, batchStartCsn: 4, + keyMessage: messagesB[0], }, // C undefined, { - messages: [ - { - "contents": "C1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 5, - "metadata": { "batch": true, "batchId": "C" }, - "clientId": "CLIENT_ID", - }, - { - "contents": "C2", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 6, - "metadata": { "batch": false }, - "clientId": "CLIENT_ID", - }, - ], + messages: messagesC, batchId: "C", clientId: "CLIENT_ID", batchStartCsn: 5, + keyMessage: messagesC[0], }, // D { - messages: [ - { - "contents": "D1", - "referenceSequenceNumber": 1, - "clientSequenceNumber": 7, - "metadata": { "batchId": "D" }, - "clientId": "CLIENT_ID", - }, - ], + messages: messagesD, clientId: "CLIENT_ID", batchId: "D", batchStartCsn: 7, + keyMessage: messagesD[0], }, ]; @@ -517,7 +526,7 @@ describe("RemoteMessageProcessor", () => { batchStartCsn: 12, clientId: "CLIENT_ID", batchId: "BATCH_ID", - emptyBatchSequenceNumber: undefined, + keyMessage: expected[0], }, "unexpected processing of groupedBatch", ); @@ -549,7 +558,7 @@ describe("RemoteMessageProcessor", () => { batchStartCsn: 8, clientId: "CLIENT_ID", batchId: "BATCH_ID", - emptyBatchSequenceNumber: 10, + keyMessage: groupedBatch, }, "unexpected processing of empty groupedBatch", ); diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index d81c39420410..8e6c61203901 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -166,7 +166,9 @@ describe("Pending State Manager", () => { { messages: messages as InboundSequencedContainerRuntimeMessage[], batchStartCsn, - emptyBatchSequenceNumber, + keyMessage: { + sequenceNumber: emptyBatchSequenceNumber, + } satisfies Partial as ISequencedDocumentMessage, clientId, batchId: generateBatchId(clientId, batchStartCsn), }, @@ -546,15 +548,15 @@ describe("Pending State Manager", () => { ], 1, ); + const inboundMessage = futureRuntimeMessage as ISequencedDocumentMessage & + UnknownContainerRuntimeMessage; pendingStateManager.processInboundBatch( { - messages: [ - futureRuntimeMessage as ISequencedDocumentMessage & - UnknownContainerRuntimeMessage, - ], + messages: [inboundMessage], batchStartCsn: 1 /* batchStartCsn */, batchId: "batchId", clientId: "clientId", + keyMessage: inboundMessage, }, true /* local */, );