From 6718a9a1707d6a5bcc573acbb2d154b8840c4b72 Mon Sep 17 00:00:00 2001 From: Zach Newton Date: Mon, 25 Mar 2024 14:06:51 -0700 Subject: [PATCH] server: cover edge cases for scrubbed checkpoint users (#20259) ## Description Continuing the work from #20150 to make sure we cover all the possible edge cases. ## Reviewer Guidance The control flow in Scribe LambdaFactory was a bit confusing, so hopefully I didn't mess it up. I understand the current control flow to be: ``` if (no global checkpoint) use Default checkpoint elsif (global checkpoint was cleared or global checkpoint quorum was scrubbed) use Summary checkpoint else use latest DB checkpoint (local or global) ``` My goal was to make the control flow behave as follows: ``` if (no global and no local checkpoint and no summary checkpoint) use Default checkpoint elsif ( global checkpoint was cleared and summary checkpoint ahead of local db checkpoint or latest DB checkpoint quorum was scrubbed or summary checkpoint ahead of latest DB checkpoint ) use Summary checkpoint else use latest DB checkpoint (local or global) ``` --- .../packages/lambdas/src/scribe/lambda.ts | 30 ++++-- .../lambdas/src/scribe/lambdaFactory.ts | 100 +++++++++++++----- .../packages/lambdas/src/scribe/utils.ts | 13 ++- .../lambdas/src/test/scribe/lambda.spec.ts | 5 + .../services-core/src/checkpointService.ts | 89 +++++++++------- 5 files changed, 157 insertions(+), 80 deletions(-) diff --git a/server/routerlicious/packages/lambdas/src/scribe/lambda.ts b/server/routerlicious/packages/lambdas/src/scribe/lambda.ts index 3465bc927923..1263a5a6d069 100644 --- a/server/routerlicious/packages/lambdas/src/scribe/lambda.ts +++ b/server/routerlicious/packages/lambdas/src/scribe/lambda.ts @@ -48,7 +48,13 @@ import { DocumentCheckpointManager, } from "../utils"; import { ICheckpointManager, IPendingMessageReader, ISummaryWriter } from "./interfaces"; -import { getClientIds, initializeProtocol, isGlobalCheckpoint, sendToDeli } from "./utils"; +import { + getClientIds, + initializeProtocol, + isGlobalCheckpoint, + isScribeCheckpointQuorumScrubbed, + sendToDeli, +} from "./utils"; /** * @internal @@ -481,13 +487,14 @@ export class ScribeLambda implements IPartitionLambda { checkpointReason: CheckpointReason, skipKafkaCheckpoint?: boolean, ) { + const isGlobal = isGlobalCheckpoint( + this.documentCheckpointManager.getNoActiveClients(), + this.globalCheckpointOnly, + ); // Get checkpoint context const checkpoint = this.generateScribeCheckpoint( message.offset, - isGlobalCheckpoint( - this.documentCheckpointManager.getNoActiveClients(), - this.globalCheckpointOnly, - ) + isGlobal ? this.serviceConfiguration.scribe.scrubUserDataInGlobalCheckpoints : this.serviceConfiguration.scribe.scrubUserDataInLocalCheckpoints, ); @@ -513,6 +520,8 @@ export class ScribeLambda implements IPartitionLambda { localCheckpointEnabled: this.localCheckpointEnabled, globalCheckpointOnly: this.globalCheckpointOnly, localCheckpoint: this.localCheckpointEnabled && !this.globalCheckpointOnly, + checkpointLocation: isGlobal ? "global" : "local", + scrubbedUserData: isScribeCheckpointQuorumScrubbed(checkpoint), }; Lumberjack.info(checkpointResult, lumberjackProperties); } @@ -847,12 +856,13 @@ export class ScribeLambda implements IPartitionLambda { private readonly idleTimeCheckpoint = (initialScribeCheckpointMessage: IQueuedMessage) => { if (initialScribeCheckpointMessage) { + const isGlobal = isGlobalCheckpoint( + this.documentCheckpointManager.getNoActiveClients(), + this.globalCheckpointOnly, + ); const checkpoint = this.generateScribeCheckpoint( initialScribeCheckpointMessage.offset, - isGlobalCheckpoint( - this.documentCheckpointManager.getNoActiveClients(), - this.globalCheckpointOnly, - ) + isGlobal ? this.serviceConfiguration.scribe.scrubUserDataInGlobalCheckpoints : this.serviceConfiguration.scribe.scrubUserDataInLocalCheckpoints, ); @@ -872,6 +882,8 @@ export class ScribeLambda implements IPartitionLambda { localCheckpointEnabled: this.localCheckpointEnabled, globalCheckpointOnly: this.globalCheckpointOnly, localCheckpoint: this.localCheckpointEnabled && !this.globalCheckpointOnly, + checkpointLocation: isGlobal ? "global" : "local", + scrubbedUserData: isScribeCheckpointQuorumScrubbed(checkpoint), }; Lumberjack.info(checkpointResult, lumberjackProperties); } diff --git a/server/routerlicious/packages/lambdas/src/scribe/lambdaFactory.ts b/server/routerlicious/packages/lambdas/src/scribe/lambdaFactory.ts index 090fc1248d5b..f678bf7ca976 100644 --- a/server/routerlicious/packages/lambdas/src/scribe/lambdaFactory.ts +++ b/server/routerlicious/packages/lambdas/src/scribe/lambdaFactory.ts @@ -44,7 +44,12 @@ import { CheckpointManager } from "./checkpointManager"; import { ScribeLambda } from "./lambda"; import { SummaryReader } from "./summaryReader"; import { SummaryWriter } from "./summaryWriter"; -import { getClientIds, initializeProtocol, isCheckpointQuorumScrubbed, sendToDeli } from "./utils"; +import { + getClientIds, + initializeProtocol, + isScribeCheckpointQuorumScrubbed, + sendToDeli, +} from "./utils"; import { ILatestSummaryState } from "./interfaces"; import { PendingMessageReader } from "./pendingMessageReader"; @@ -105,6 +110,8 @@ export class ScribeLambdaFactory let lastCheckpoint: IScribe; let summaryReader: SummaryReader; let latestSummary: ILatestSummaryState; + let latestSummaryCheckpoint: IScribe | undefined; + let latestDbCheckpoint: IScribe | undefined; let opMessages: ISequencedDocumentMessage[] = []; const { tenantId, documentId } = config; @@ -119,6 +126,16 @@ export class ScribeLambdaFactory LumberEventName.ScribeSessionResult, this.serviceConfiguration, ); + const failCreation = async (error: unknown): Promise => { + const errorMessage = "Scribe lambda creation failed."; + context.log?.error(`${errorMessage} Exception: ${inspect(error)}`, { messageMetaData }); + Lumberjack.error(errorMessage, lumberProperties, error); + await this.sendLambdaStartResult(tenantId, documentId, { + lambdaName: LambdaName.Scribe, + success: false, + }); + scribeSessionMetric?.error("Scribe lambda creation failed", error); + }; const lumberProperties = getLumberBaseProperties(documentId, tenantId); @@ -176,38 +193,58 @@ export class ScribeLambdaFactory this.enableWholeSummaryUpload, ); latestSummary = await summaryReader.readLastSummary(); + latestSummaryCheckpoint = latestSummary.scribe + ? JSON.parse(latestSummary.scribe) + : undefined; + latestDbCheckpoint = (await this.checkpointService.restoreFromCheckpoint( + documentId, + tenantId, + "scribe", + document, + )) as IScribe; } catch (error) { - const errorMessage = "Scribe lambda creation failed."; - context.log?.error(`${errorMessage} Exception: ${inspect(error)}`, { messageMetaData }); - Lumberjack.error(errorMessage, lumberProperties, error); - await this.sendLambdaStartResult(tenantId, documentId, { - lambdaName: LambdaName.Scribe, - success: false, - }); - scribeSessionMetric?.error("Scribe lambda creation failed", error); - + await failCreation(error); throw error; } + // For a new document, Summary, Global (document) DB and Local DB checkpoints will not exist. + // However, it is possible that the global checkpoint was cleared to an empty string + // due to a service summary, so specifically check if global is not defined at all. + // Lastly, a new document will also not have a summary checkpoint, so if one exists without a DB checkpoint, + // we should use the summary checkpoint because there was likely a DB failure. const useDefaultCheckpointForNewDocument = - document.scribe === undefined || document.scribe === null; - const checkpointIsBlank = document.scribe === ""; - const checkpointQuorumIsScrubbed = isCheckpointQuorumScrubbed(document.scribe); + // Mongodb casts undefined as null so we are checking both to be safe. + (document.scribe === undefined || document.scribe === null) && + !latestDbCheckpoint && + !latestSummaryCheckpoint; + // Empty string for document DB checkpoint denotes a cache that was cleared due to a service summary. + // This will only happen if IServiceConfiguration.scribe.clearCacheAfterServiceSummary is true. Defaults to false. + const documentCheckpointIsCleared = document.scribe === ""; + // It's possible that a local checkpoint is written after global checkpoint was cleared for service summary. + // Similarly, it's possible that the summary checkpoint is ahead of the latest db checkpoint due to a failure. + const summaryCheckpointAheadOfLatestDbCheckpoint = + latestSummaryCheckpoint && + latestSummaryCheckpoint.sequenceNumber > latestDbCheckpoint?.sequenceNumber; + // Scrubbed users indicate that the quorum members have been scrubbed for privacy compliance. + const dbCheckpointQuorumIsScrubbed = isScribeCheckpointQuorumScrubbed(latestDbCheckpoint); + // Only use the summary checkpoint when + // 1) summary checkpoint is more recent than any DB checkpoint + // 2) the document checkpoint is cleared and there is not a more recent local checkpoint + // 3) the latest db checkpoint quorum members are scrubbed for privacy compliance const useLatestSummaryCheckpointForExistingDocument = - checkpointIsBlank || checkpointQuorumIsScrubbed; + summaryCheckpointAheadOfLatestDbCheckpoint || + (documentCheckpointIsCleared && summaryCheckpointAheadOfLatestDbCheckpoint) || + dbCheckpointQuorumIsScrubbed; if (useDefaultCheckpointForNewDocument) { - // Restore scribe state if not present in the cache. Mongodb casts undefined as null so we are checking - // both to be safe. Empty sring denotes a cache that was cleared due to a service summary + // Restore scribe state if not present in the cache. const message = "New document. Setting empty scribe checkpoint"; context.log?.info(message, { messageMetaData }); Lumberjack.info(message, lumberProperties); lastCheckpoint = DefaultScribe; } else if (useLatestSummaryCheckpointForExistingDocument) { const message = `Existing document${ - !checkpointIsBlank && checkpointQuorumIsScrubbed - ? " with invalid quorum members" - : "" + dbCheckpointQuorumIsScrubbed ? " with invalid quorum members" : "" }. Fetching checkpoint from summary`; context.log?.info(message, { messageMetaData }); Lumberjack.info(message, lumberProperties); @@ -216,13 +253,20 @@ export class ScribeLambdaFactory Lumberjack.error(`Summary can't be fetched`, lumberProperties); lastCheckpoint = DefaultScribe; } else { - if (isCheckpointQuorumScrubbed(latestSummary.scribe)) { + if (!latestSummaryCheckpoint) { + const error = new Error( + "Attempted to load from non-existent summary checkpoint.", + ); + await failCreation(error); + throw error; + } + if (isScribeCheckpointQuorumScrubbed(latestSummaryCheckpoint)) { Lumberjack.error( - "Quorum from summary is invalid. Continuing.", + "Quorum from summary is scrubbed. Continuing.", lumberProperties, ); } - lastCheckpoint = JSON.parse(latestSummary.scribe); + lastCheckpoint = latestSummaryCheckpoint; opMessages = latestSummary.messages; // Since the document was originated elsewhere or cache was cleared, logOffset info is irrelavant. // Currently the lambda checkpoints only after updating the logOffset so setting this to lower @@ -234,12 +278,12 @@ export class ScribeLambdaFactory Lumberjack.info(checkpointMessage, lumberProperties); } } else { - lastCheckpoint = (await this.checkpointService.restoreFromCheckpoint( - documentId, - tenantId, - "scribe", - document, - )) as IScribe; + if (!latestDbCheckpoint) { + const error = new Error("Attempted to load from non-existent DB checkpoint."); + await failCreation(error); + throw error; + } + lastCheckpoint = latestDbCheckpoint; try { opMessages = await this.getOpMessages(documentId, tenantId, lastCheckpoint); diff --git a/server/routerlicious/packages/lambdas/src/scribe/utils.ts b/server/routerlicious/packages/lambdas/src/scribe/utils.ts index edb51de7539a..2764f687378d 100644 --- a/server/routerlicious/packages/lambdas/src/scribe/utils.ts +++ b/server/routerlicious/packages/lambdas/src/scribe/utils.ts @@ -77,15 +77,18 @@ export const isGlobalCheckpoint = (noActiveClients: boolean, globalCheckpointOnl * Whether the quorum members represented in the checkpoint's protocol state have had their user data scrubbed * for privacy compliance. */ -export const isCheckpointQuorumScrubbed = (stringifiedCheckpoint: string): boolean => { - if (!stringifiedCheckpoint) { +export const isScribeCheckpointQuorumScrubbed = ( + checkpoint: string | IScribe | undefined, +): boolean => { + if (!checkpoint) { return false; } - const checkpoint: IScribe = JSON.parse(stringifiedCheckpoint); - for (const [, sequencedClient] of checkpoint.protocolState.members) { + const parsedCheckpoint: IScribe = + typeof checkpoint === "string" ? JSON.parse(checkpoint) : checkpoint; + for (const [, sequencedClient] of parsedCheckpoint.protocolState.members) { const user: IUser = sequencedClient.client.user; - // User information was scrubbed. if (!user.id) { + // User information was scrubbed. return true; } } diff --git a/server/routerlicious/packages/lambdas/src/test/scribe/lambda.spec.ts b/server/routerlicious/packages/lambdas/src/test/scribe/lambda.spec.ts index d63ecd9e84ea..efc558218778 100644 --- a/server/routerlicious/packages/lambdas/src/test/scribe/lambda.spec.ts +++ b/server/routerlicious/packages/lambdas/src/test/scribe/lambda.spec.ts @@ -115,6 +115,11 @@ describe("Routerlicious", () => { "getLocalCheckpointEnabled", Sinon.fake.returns(false), ); + Sinon.replace( + testCheckpointService, + "restoreFromCheckpoint", + Sinon.fake.returns(undefined), + ); testMessageCollection = new TestCollection([]); testKafka = new TestKafka(); diff --git a/server/routerlicious/packages/services-core/src/checkpointService.ts b/server/routerlicious/packages/services-core/src/checkpointService.ts index 007fb7a547cb..ecb93df08bb4 100644 --- a/server/routerlicious/packages/services-core/src/checkpointService.ts +++ b/server/routerlicious/packages/services-core/src/checkpointService.ts @@ -14,7 +14,7 @@ import { Lumberjack, } from "@fluidframework/server-services-telemetry"; import { ICheckpointRepository, IDocumentRepository } from "./database"; -import { IDeliState, IDocument, IScribe } from "./document"; +import { IDeliState, IDocument, IScribe, type ICheckpoint } from "./document"; type DocumentLambda = "deli" | "scribe"; @@ -204,46 +204,73 @@ export class CheckpointService implements ICheckpointService { service: DocumentLambda, document: IDocument, ) { - let checkpoint; - let lastCheckpoint: IDeliState | IScribe; + let lastCheckpoint: IDeliState | IScribe | undefined; let isLocalCheckpoint = false; - let localLogOffset; - let globalLogOffset; - let localSequenceNumber; - let globalSequenceNumber; + let localLogOffset: number | undefined; + let globalLogOffset: number | undefined; + let localSequenceNumber: number | undefined; + let globalSequenceNumber: number | undefined; + let checkpointSource = "notFound"; const restoreFromCheckpointMetric = Lumberjack.newLumberMetric( LumberEventName.RestoreFromCheckpoint, ); - let checkpointSource = "defaultGlobalCollection"; + const parseCheckpointString = ( + checkpointString: string | undefined, + ): IDeliState | IScribe | undefined => + checkpointString ? (JSON.parse(checkpointString) as IDeliState | IScribe) : undefined; try { if (!this.localCheckpointEnabled || !this.checkpointRepository) { // If we cannot checkpoint locally, use document - lastCheckpoint = JSON.parse(document[service]); + lastCheckpoint = parseCheckpointString(document[service]); globalLogOffset = lastCheckpoint.logOffset; globalSequenceNumber = lastCheckpoint.sequenceNumber; + checkpointSource = "defaultGlobalCollection"; } else { // Search checkpoints collection for checkpoint - try { - checkpoint = await this.checkpointRepository.getCheckpoint( - documentId, - tenantId, + const checkpoint: ICheckpoint | undefined = await this.checkpointRepository + .getCheckpoint(documentId, tenantId) + .catch((error) => { + Lumberjack.error( + `Error retrieving local checkpoint`, + getLumberBaseProperties(documentId, tenantId), + ); + return undefined; + }); + + const localCheckpoint: IDeliState | IScribe | undefined = parseCheckpointString( + checkpoint?.[service], + ); + const globalCheckpoint: IDeliState | IScribe | undefined = parseCheckpointString( + document[service], + ); + localLogOffset = localCheckpoint?.logOffset; + globalLogOffset = globalCheckpoint?.logOffset; + localSequenceNumber = localCheckpoint?.sequenceNumber; + globalSequenceNumber = globalCheckpoint?.sequenceNumber; + + if (localCheckpoint && !globalCheckpoint) { + // If checkpoint does not exist in document (global), use local + Lumberjack.info( + `Global checkpoint not found.`, + getLumberBaseProperties(documentId, tenantId), ); - } catch (error) { - checkpoint = undefined; - Lumberjack.error( - `Error retrieving local checkpoint`, + lastCheckpoint = localCheckpoint; + checkpointSource = "notFoundInGlobalCollection"; + isLocalCheckpoint = true; + } else if (!localCheckpoint && globalCheckpoint) { + // If checkpoint does not exist in local, use document (global) + Lumberjack.info( + `Local checkpoint not found.`, getLumberBaseProperties(documentId, tenantId), ); checkpointSource = "notFoundInLocalCollection"; - } - - if (checkpoint?.[service]) { - const localCheckpoint: IDeliState | IScribe = JSON.parse(checkpoint[service]); - const globalCheckpoint: IDeliState | IScribe = JSON.parse(document[service]); - - // Compare local and global checkpoints to use latest version + lastCheckpoint = globalCheckpoint; + isLocalCheckpoint = false; + } else if (localCheckpoint && globalCheckpoint) { + // If both checkpoints exist, + // compare local and global checkpoints to use latest version if (localCheckpoint.sequenceNumber < globalCheckpoint.sequenceNumber) { // if local checkpoint is behind global, use global lastCheckpoint = globalCheckpoint; @@ -254,20 +281,6 @@ export class CheckpointService implements ICheckpointService { checkpointSource = "latestFoundInLocalCollection"; isLocalCheckpoint = true; } - localLogOffset = localCheckpoint.logOffset; - globalLogOffset = globalCheckpoint.logOffset; - localSequenceNumber = localCheckpoint.sequenceNumber; - globalSequenceNumber = globalCheckpoint.sequenceNumber; - } else { - // If checkpoint does not exist, use document - Lumberjack.info( - `Local checkpoint not found.`, - getLumberBaseProperties(documentId, tenantId), - ); - checkpointSource = "notFoundInLocalCollection"; - lastCheckpoint = JSON.parse(document[service]); - globalLogOffset = lastCheckpoint.logOffset; - globalSequenceNumber = lastCheckpoint.sequenceNumber; } } restoreFromCheckpointMetric.setProperties({