Skip to content

Commit

Permalink
Offline: Model empty batches differently, exposing the empty Grouped …
Browse files Browse the repository at this point in the history
…Batch message to more of the system (microsoft#22343)

Empty batches are a unique case, where we submit a Grouped Batch message
with no inner runtime messages. This is needed for tracking batches
between forks of a container (each trying to resubmit the same local
content).

There are several flows that need the message or at least
sequenceNumber-related metadata:
* BatchStart/BatchEnd events
* DeltaScheduler
* savedOps mechanism in PendingStateManager
* The DataProcessingError thrown if the container forks
* Future (See microsoft#22310): MinimumSequenceNumber

We used to include the `emptyBatchSequenceNumber` and that partially
addressed this, but it's better to include the whole message. So add
this as the `keyMessage` to `InboundBatch` type - and for non-empty
batches this is just the first message (we were already referring to
that in certain places).
  • Loading branch information
markfields authored Aug 29, 2024
1 parent aa88509 commit ce0a14c
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 102 deletions.
54 changes: 26 additions & 28 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,7 @@ export class ContainerRuntime
this.ensureNoDataModelChanges(() => this.processRuntimeMessage(msg));
});
} else {
this.ensureNoDataModelChanges(() => this.processEmptyBatch(inboundBatch, local));
this.ensureNoDataModelChanges(() => this.processEmptyBatch(inboundBatch));
}
} else {
// Check if message.type is one of values in ContainerMessageType
Expand All @@ -2777,6 +2777,13 @@ export class ContainerRuntime
);
}
}

if (local) {
// If we have processed a local op, this means that the container is
// making progress and we can reset the counter for how many times
// we have consecutively replayed the pending states
this.resetReconnectCount();
}
}

private _processedClientSequenceNumber: number | undefined;
Expand All @@ -2789,7 +2796,7 @@ export class ContainerRuntime
private processRuntimeMessage(
messageWithContext: MessageWithContext & { isRuntimeMessage: true },
) {
const { message, local, localOpMetadata } = messageWithContext;
const { message, localOpMetadata } = messageWithContext;

// Set the minimum sequence number to the containerRuntime's understanding of minimum sequence number.
if (
Expand Down Expand Up @@ -2825,36 +2832,34 @@ export class ContainerRuntime
this.emit("op", message, messageWithContext.isRuntimeMessage);

this.scheduleManager.afterOpProcessing(undefined, message);

if (local) {
// If we have processed a local op, this means that the container is
// making progress and we can reset the counter for how many times
// we have consecutively replayed the pending states
this.resetReconnectCount();
}
} catch (e) {
this.scheduleManager.afterOpProcessing(e, message);
throw e;
}
}

/**
* Process an empty batch, which will execute expected actions while processing even if there are no messages.
* This is a separate function because the processCore function expects at least one message to process.
* It is expected to happen only when the outbox produces an empty batch due to a resubmit flow.
* Process an empty batch, which will execute expected actions while processing even if there are no inner runtime messages.
*
* @remarks - Empty batches are produced by the outbox on resubmit when the resubmit flow resulted in no runtime messages.
* This can happen if changes from a remote client "cancel out" the pending changes being resubmited by this client.
* We submit an empty batch if "offline load" (aka rehydrating from stashed state) is enabled,
* to ensure we account for this batch when comparing batchIds, checking for a forked container.
* Otherwise, we would not realize this container has forked in the case where it did fork, and a batch became empty but wasn't submitted as such.
*/
private processEmptyBatch(emptyBatch: InboundBatch, local: boolean) {
const { emptyBatchSequenceNumber: sequenceNumber, batchStartCsn } = emptyBatch;
assert(sequenceNumber !== undefined, 0x9fa /* emptyBatchSequenceNumber must be defined */);
this.emit("batchBegin", { sequenceNumber });
private processEmptyBatch(emptyBatch: InboundBatch) {
const { keyMessage, batchStartCsn } = emptyBatch;
this.scheduleManager.beforeOpProcessing(keyMessage);

this._processedClientSequenceNumber = batchStartCsn;
if (!this.hasPendingMessages()) {
this.updateDocumentDirtyState(false);
}
this.emit("batchEnd", undefined, { sequenceNumber });
if (local) {
this.resetReconnectCount();
}

// We emit this event but say isRuntimeMessage is false, because there are no actual runtime messages here being processed.
// But someone listening to this event expecting to be notified whenever a message arrives would want to know about this.
this.emit("op", keyMessage, false /* isRuntimeMessage */);
this.scheduleManager.afterOpProcessing(undefined /* error */, keyMessage);
}

/**
Expand All @@ -2864,7 +2869,7 @@ export class ContainerRuntime
private observeNonRuntimeMessage(
messageWithContext: MessageWithContext & { isRuntimeMessage: false },
) {
const { message, local } = messageWithContext;
const { message } = messageWithContext;

// Intercept to reduce minimum sequence number to the delta manager's minimum sequence number.
// Sequence numbers are not guaranteed to follow any sort of order. Re-entrancy is one of those situations
Expand Down Expand Up @@ -2893,13 +2898,6 @@ export class ContainerRuntime
this.emit("op", message, messageWithContext.isRuntimeMessage);

this.scheduleManager.afterOpProcessing(undefined, message);

if (local) {
// If we have processed a local op, this means that the container is
// making progress and we can reset the counter for how many times
// we have consecutively replayed the pending states
this.resetReconnectCount();
}
} catch (e) {
this.scheduleManager.afterOpProcessing(e, message);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ export interface InboundBatch {
/** clientId that sent this batch. Used to compute Batch ID if needed */
readonly clientId: string;
/**
* Client Sequence Number of the first message in the batch.
* Client Sequence Number of the Grouped Batch message, or the first message in the ungrouped batch.
* Used to compute Batch ID if needed
*
* @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk).
* For grouped batches, clientSequenceNumber on messages is overwritten, so we track this original value here.
*/
readonly batchStartCsn: number;
/** For an empty batch (with no messages), we need to remember the empty grouped batch's sequence number */
readonly emptyBatchSequenceNumber?: number;
/**
* The first message in the batch, or if the batch is empty, the empty grouped batch message
* Used for accessing the sequence numbers for the (start of the) batch.
*
* @remarks Do not use clientSequenceNumber here, use batchStartCsn instead.
*/
readonly keyMessage: ISequencedDocumentMessage;
}

function assertHasClientId(
Expand Down Expand Up @@ -141,9 +146,7 @@ export class RemoteMessageProcessor {
batchStartCsn: message.clientSequenceNumber,
clientId,
batchId,
// If the batch is empty, we need to return the sequence number aside
emptyBatchSequenceNumber:
groupedMessages.length === 0 ? message.sequenceNumber : undefined,
keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch
};
}

Expand Down Expand Up @@ -184,6 +187,7 @@ export class RemoteMessageProcessor {
batchId: asBatchMetadata(message.metadata)?.batchId,
clientId: message.clientId,
batchStartCsn: message.clientSequenceNumber,
keyMessage: message,
};

return { batchEnded: false };
Expand All @@ -195,6 +199,7 @@ export class RemoteMessageProcessor {
batchStartCsn: message.clientSequenceNumber,
clientId: message.clientId,
batchId: asBatchMetadata(message.metadata)?.batchId,
keyMessage: message,
};
return { batchEnded: true };
}
Expand Down
8 changes: 2 additions & 6 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ export class PendingStateManager implements IDisposable {
throw DataProcessingError.create(
"Forked Container Error! Matching batchIds but mismatched clientId",
"PendingStateManager.processInboundBatch",
batch.messages[0], // Note: if it's an empty batch, we won't get message metadata added to the error/log here
batch.keyMessage,
);
}

Expand All @@ -409,11 +409,7 @@ export class PendingStateManager implements IDisposable {

// Empty batch
if (batch.messages.length === 0) {
assert(
batch.emptyBatchSequenceNumber !== undefined,
0x9fb /* Expected sequence number for empty batch */,
);
const localOpMetadata = this.processNextPendingMessage(batch.emptyBatchSequenceNumber);
const localOpMetadata = this.processNextPendingMessage(batch.keyMessage.sequenceNumber);
assert(
asEmptyBatchLocalOpMetadata(localOpMetadata)?.emptyBatch === true,
0xa20 /* Expected empty batch marker */,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,88 +244,97 @@ describe("RemoteMessageProcessor", () => {
processor.process(message, () => {}),
);

// Expected results
const messagesA = [
{
"contents": "A1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 1,
"metadata": { "batch": true },
"clientId": "CLIENT_ID",
},
{
"contents": "A2",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 2,
"clientId": "CLIENT_ID",
},
{
"contents": "A3",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 3,
"metadata": { "batch": false },
"clientId": "CLIENT_ID",
},
];
const messagesB = [
{
"contents": "B1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 4,
"clientId": "CLIENT_ID",
},
];
const messagesC = [
{
"contents": "C1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 5,
"metadata": { "batch": true, "batchId": "C" },
"clientId": "CLIENT_ID",
},
{
"contents": "C2",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 6,
"metadata": { "batch": false },
"clientId": "CLIENT_ID",
},
];
const messagesD = [
{
"contents": "D1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 7,
"metadata": { "batchId": "D" },
"clientId": "CLIENT_ID",
},
];
const expectedResults = [
// A
undefined,
undefined,
{
messages: [
{
"contents": "A1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 1,
"metadata": { "batch": true },
"clientId": "CLIENT_ID",
},
{
"contents": "A2",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 2,
"clientId": "CLIENT_ID",
},
{
"contents": "A3",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 3,
"metadata": { "batch": false },
"clientId": "CLIENT_ID",
},
],
messages: messagesA,
clientId: "CLIENT_ID",
batchId: undefined,
batchStartCsn: 1,
keyMessage: messagesA[0],
},
// B
{
messages: [
{
"contents": "B1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 4,
"clientId": "CLIENT_ID",
},
],
messages: messagesB,
clientId: "CLIENT_ID",
batchId: undefined,
batchStartCsn: 4,
keyMessage: messagesB[0],
},
// C
undefined,
{
messages: [
{
"contents": "C1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 5,
"metadata": { "batch": true, "batchId": "C" },
"clientId": "CLIENT_ID",
},
{
"contents": "C2",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 6,
"metadata": { "batch": false },
"clientId": "CLIENT_ID",
},
],
messages: messagesC,
batchId: "C",
clientId: "CLIENT_ID",
batchStartCsn: 5,
keyMessage: messagesC[0],
},
// D
{
messages: [
{
"contents": "D1",
"referenceSequenceNumber": 1,
"clientSequenceNumber": 7,
"metadata": { "batchId": "D" },
"clientId": "CLIENT_ID",
},
],
messages: messagesD,
clientId: "CLIENT_ID",
batchId: "D",
batchStartCsn: 7,
keyMessage: messagesD[0],
},
];

Expand Down Expand Up @@ -517,7 +526,7 @@ describe("RemoteMessageProcessor", () => {
batchStartCsn: 12,
clientId: "CLIENT_ID",
batchId: "BATCH_ID",
emptyBatchSequenceNumber: undefined,
keyMessage: expected[0],
},
"unexpected processing of groupedBatch",
);
Expand Down Expand Up @@ -549,7 +558,7 @@ describe("RemoteMessageProcessor", () => {
batchStartCsn: 8,
clientId: "CLIENT_ID",
batchId: "BATCH_ID",
emptyBatchSequenceNumber: 10,
keyMessage: groupedBatch,
},
"unexpected processing of empty groupedBatch",
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ describe("Pending State Manager", () => {
{
messages: messages as InboundSequencedContainerRuntimeMessage[],
batchStartCsn,
emptyBatchSequenceNumber,
keyMessage: {
sequenceNumber: emptyBatchSequenceNumber,
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
clientId,
batchId: resubmittedBatchId,
},
Expand Down Expand Up @@ -552,15 +554,15 @@ describe("Pending State Manager", () => {
],
1,
);
const inboundMessage = futureRuntimeMessage as ISequencedDocumentMessage &
UnknownContainerRuntimeMessage;
pendingStateManager.processInboundBatch(
{
messages: [
futureRuntimeMessage as ISequencedDocumentMessage &
UnknownContainerRuntimeMessage,
],
messages: [inboundMessage],
batchStartCsn: 1 /* batchStartCsn */,
batchId: undefined,
clientId: "clientId",
keyMessage: inboundMessage,
},
true /* local */,
);
Expand Down

0 comments on commit ce0a14c

Please sign in to comment.