Skip to content

Commit

Permalink
Fix the Missing Deltas when Switch the Cluster (#11919)
Browse files Browse the repository at this point in the history
When we switch clusters, it would have the GetDeltas_Exception. We can
reproduce the following scenario:

1. Cluster A opens a new session, and adds ops [1, 2, 3, 4] in Cluster
A's cosmos DB. We have the client summary at seq 2. The session ends and
makes the service summary = the client summary at seq 2 + [3, 4].
2. Cluster B opens the session, and adds ops [5, 6] in Cluster B's
cosmos DB. It would read the correct service summary from step 1. We
don't have a client summary. However, when the session ends, the new
service summary = client summary at seq 2 + [5, 6]
3. Cluster A opens a new session again, it would read the wrong service
summary from step 2. Meanwhile, it would request ops after the last
client summary at seq 2, and there are missing operations [5, 6] that
are in cluster B's cosmos DB.


To fix this bug, when we fetch the logtails, we would fill in the
missing operations from the last summary messages.

We make the session stickiness time as cluster-related variable in this
pr.
  • Loading branch information
tianzhu007 authored Sep 14, 2022
1 parent efceccf commit 122a587
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ data:
"maxMessageSize": "16KB",
"maxNumberOfClientsPerDocument": {{ .Values.alfred.maxNumberOfClientsPerDocument }},
"numberOfMessagesPerTrace": {{ .Values.alfred.numberOfMessagesPerTrace }},
"sessionStickinessDurationMs": {{ .Values.alfred.sessionStickinessDurationMs }},
"throttling": {
"restCalls": {
"maxPerMs": {{ .Values.alfred.throttling.restCalls.maxPerMs }},
Expand Down
1 change: 1 addition & 0 deletions server/routerlicious/kubernetes/routerlicious/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ alfred:
key: jwt_key
maxNumberOfClientsPerDocument: 1000000
numberOfMessagesPerTrace: 100
sessionStickinessDurationMs: 3600000
throttling:
restCalls:
maxPerMs: 1000000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,14 @@ export class ScribeLambdaFactory extends EventEmitter implements IPartitionLambd

const protocolHandler = initializeProtocol(lastCheckpoint.protocolState, latestSummary.term);

const lastSummaryMessages = latestSummary.messages;
const summaryWriter = new SummaryWriter(
tenantId,
documentId,
gitManager,
this.messageCollection,
this.enableWholeSummaryUpload);
this.enableWholeSummaryUpload,
lastSummaryMessages);
const checkpointManager = new CheckpointManager(
context,
tenantId,
Expand Down
51 changes: 47 additions & 4 deletions server/routerlicious/packages/lambdas/src/scribe/summaryWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class SummaryWriter implements ISummaryWriter {
private readonly summaryStorage: IGitManager,
private readonly opStorage: ICollection<ISequencedOperationMessage>,
private readonly enableWholeSummaryUpload: boolean,
private readonly lastSummaryMessages: ISequencedDocumentMessage[],
private readonly maxRetriesOnError: number = 6,
) {
this.lumberProperties = getLumberBaseProperties(this.documentId, this.tenantId);
Expand Down Expand Up @@ -179,7 +180,7 @@ export class SummaryWriter implements ISummaryWriter {

// Generate a tree of logTail starting from protocol sequence number to summarySequenceNumber
const logTailEntries = await requestWithRetry(
async () => this.generateLogtailEntries(checkpoint.protocolState.sequenceNumber, op.sequenceNumber + 1, pendingOps),
async () => this.generateLogtailEntries(checkpoint.protocolState.sequenceNumber, op.sequenceNumber + 1, pendingOps, this.lastSummaryMessages),
"writeClientSummary_generateLogtailEntries",
this.lumberProperties,
shouldRetryNetworkError,
Expand Down Expand Up @@ -369,7 +370,8 @@ export class SummaryWriter implements ISummaryWriter {
async () => this.generateLogtailEntries(
currentProtocolHead,
op.sequenceNumber + 1,
pendingOps),
pendingOps,
this.lastSummaryMessages),
"writeServiceSummary_generateLogtailEntries",
this.lumberProperties,
shouldRetryNetworkError,
Expand Down Expand Up @@ -501,22 +503,63 @@ export class SummaryWriter implements ISummaryWriter {
private async generateLogtailEntries(
from: number,
to: number,
pending: ISequencedOperationMessage[]): Promise<ITreeEntry[]> {
pending: ISequencedOperationMessage[],
lastSummaryMessages: ISequencedDocumentMessage[] | undefined): Promise<ITreeEntry[]> {
const logTail = await this.getLogTail(from, to, pending);

// Some ops would be missing if we switch cluster during routing.
// We need to load these missing ops from the last summary.
const missingOps = await this.getMissingOpsFromLastSummaryLogtail(from, to, logTail, lastSummaryMessages);
const fullLogTail = missingOps ?
(missingOps.concat(logTail)).sort((op1, op2) => op1.sequenceNumber - op2.sequenceNumber) :
logTail;

// Check the missing operations in the fullLogTail
if (fullLogTail.length !== (to - from - 1)) {
const missingOpsSequenceNumbers: number[] = [];
const fullLogTailSequenceNumbers = fullLogTail.map((ms) => ms.sequenceNumber);
let j = 0;
for (let i = from + 1; i < to; i++) {
if (i === fullLogTailSequenceNumbers[j]) {
j++;
continue;
}
missingOpsSequenceNumbers.push(i);
}
Lumberjack.error(`Missing ops in the fullLogTail: ${JSON.stringify(missingOpsSequenceNumbers)}`
, this.lumberProperties);
}

const logTailEntries: ITreeEntry[] = [
{
mode: FileMode.File,
path: "logTail",
type: TreeEntry.Blob,
value: {
contents: JSON.stringify(logTail),
contents: JSON.stringify(fullLogTail),
encoding: "utf-8",
},
},
];
return logTailEntries;
}

private async getMissingOpsFromLastSummaryLogtail(
gt: number,
lt: number,
logTail: ISequencedDocumentMessage[],
lastSummaryMessages: ISequencedDocumentMessage[] | undefined):
Promise<ISequencedDocumentMessage[] | undefined> {
if (lt - gt <= 1) {
return undefined;
}
const logtailSequenceNumbers = new Set();
logTail.forEach((ms) => logtailSequenceNumbers.add(ms.sequenceNumber));
const missingOps = lastSummaryMessages?.filter((ms) =>
!(logtailSequenceNumbers.has(ms.sequenceNumber)));
return missingOps;
}

private async getLogTail(
gt: number,
lt: number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,15 @@ export class LocalOrderer implements IOrderer {
() => -1,
);

const summaryReader = new SummaryReader(this.tenantId, this.documentId, this.gitManager, false);
const latestSummary = await summaryReader.readLastSummary();
const summaryWriter = new SummaryWriter(
this.tenantId,
this.documentId,
this.gitManager,
scribeMessagesCollection,
false);
const summaryReader = new SummaryReader(this.tenantId, this.documentId, this.gitManager, false);
false,
latestSummary.messages);
const checkpointManager = new CheckpointManager(
context,
this.tenantId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"verifyMaxMessageSize": false,
"maxNumberOfClientsPerDocument": 1000000,
"numberOfMessagesPerTrace": 100,
"sessionStickinessDurationMs": 3600000,
"throttling": {
"restCalls": {
"maxPerMs": 1000000,
Expand Down Expand Up @@ -107,8 +108,7 @@
"enforceServerGeneratedDocumentId": false,
"socketIo": {
"perMessageDeflate": true
},
"sessionStickinessDurationMs": 3600000
}
},
"client": {
"type": "browser",
Expand Down

0 comments on commit 122a587

Please sign in to comment.