Skip to content

Commit

Permalink
server: cover edge cases for scrubbed checkpoint users (#20259)
Browse files Browse the repository at this point in the history
## Description

Continuing the work from #20150 to make sure we cover all the possible
edge cases.

## Reviewer Guidance

The control flow in Scribe LambdaFactory was a bit confusing, so
hopefully I didn't mess it up.
I understand the current control flow to be:

```
if (no global checkpoint)
  use Default checkpoint
elsif (global checkpoint was cleared or global checkpoint quorum was scrubbed)
  use Summary checkpoint
else
  use latest DB checkpoint (local or global)
```

My goal was to make the control flow behave as follows:

```
if (no global and no local checkpoint and no summary checkpoint)
  use Default checkpoint
elsif (
    global checkpoint was cleared and summary checkpoint ahead of local db checkpoint
    or latest DB checkpoint quorum was scrubbed
    or summary checkpoint ahead of latest DB checkpoint
  )
  use Summary checkpoint
else
  use latest DB checkpoint (local or global)
```
  • Loading branch information
znewton authored Mar 25, 2024
1 parent d6f2749 commit 6718a9a
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 80 deletions.
30 changes: 21 additions & 9 deletions server/routerlicious/packages/lambdas/src/scribe/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ import {
DocumentCheckpointManager,
} from "../utils";
import { ICheckpointManager, IPendingMessageReader, ISummaryWriter } from "./interfaces";
import { getClientIds, initializeProtocol, isGlobalCheckpoint, sendToDeli } from "./utils";
import {
getClientIds,
initializeProtocol,
isGlobalCheckpoint,
isScribeCheckpointQuorumScrubbed,
sendToDeli,
} from "./utils";

/**
* @internal
Expand Down Expand Up @@ -481,13 +487,14 @@ export class ScribeLambda implements IPartitionLambda {
checkpointReason: CheckpointReason,
skipKafkaCheckpoint?: boolean,
) {
const isGlobal = isGlobalCheckpoint(
this.documentCheckpointManager.getNoActiveClients(),
this.globalCheckpointOnly,
);
// Get checkpoint context
const checkpoint = this.generateScribeCheckpoint(
message.offset,
isGlobalCheckpoint(
this.documentCheckpointManager.getNoActiveClients(),
this.globalCheckpointOnly,
)
isGlobal
? this.serviceConfiguration.scribe.scrubUserDataInGlobalCheckpoints
: this.serviceConfiguration.scribe.scrubUserDataInLocalCheckpoints,
);
Expand All @@ -513,6 +520,8 @@ export class ScribeLambda implements IPartitionLambda {
localCheckpointEnabled: this.localCheckpointEnabled,
globalCheckpointOnly: this.globalCheckpointOnly,
localCheckpoint: this.localCheckpointEnabled && !this.globalCheckpointOnly,
checkpointLocation: isGlobal ? "global" : "local",
scrubbedUserData: isScribeCheckpointQuorumScrubbed(checkpoint),
};
Lumberjack.info(checkpointResult, lumberjackProperties);
}
Expand Down Expand Up @@ -847,12 +856,13 @@ export class ScribeLambda implements IPartitionLambda {

private readonly idleTimeCheckpoint = (initialScribeCheckpointMessage: IQueuedMessage) => {
if (initialScribeCheckpointMessage) {
const isGlobal = isGlobalCheckpoint(
this.documentCheckpointManager.getNoActiveClients(),
this.globalCheckpointOnly,
);
const checkpoint = this.generateScribeCheckpoint(
initialScribeCheckpointMessage.offset,
isGlobalCheckpoint(
this.documentCheckpointManager.getNoActiveClients(),
this.globalCheckpointOnly,
)
isGlobal
? this.serviceConfiguration.scribe.scrubUserDataInGlobalCheckpoints
: this.serviceConfiguration.scribe.scrubUserDataInLocalCheckpoints,
);
Expand All @@ -872,6 +882,8 @@ export class ScribeLambda implements IPartitionLambda {
localCheckpointEnabled: this.localCheckpointEnabled,
globalCheckpointOnly: this.globalCheckpointOnly,
localCheckpoint: this.localCheckpointEnabled && !this.globalCheckpointOnly,
checkpointLocation: isGlobal ? "global" : "local",
scrubbedUserData: isScribeCheckpointQuorumScrubbed(checkpoint),
};
Lumberjack.info(checkpointResult, lumberjackProperties);
}
Expand Down
100 changes: 72 additions & 28 deletions server/routerlicious/packages/lambdas/src/scribe/lambdaFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ import { CheckpointManager } from "./checkpointManager";
import { ScribeLambda } from "./lambda";
import { SummaryReader } from "./summaryReader";
import { SummaryWriter } from "./summaryWriter";
import { getClientIds, initializeProtocol, isCheckpointQuorumScrubbed, sendToDeli } from "./utils";
import {
getClientIds,
initializeProtocol,
isScribeCheckpointQuorumScrubbed,
sendToDeli,
} from "./utils";
import { ILatestSummaryState } from "./interfaces";
import { PendingMessageReader } from "./pendingMessageReader";

Expand Down Expand Up @@ -105,6 +110,8 @@ export class ScribeLambdaFactory
let lastCheckpoint: IScribe;
let summaryReader: SummaryReader;
let latestSummary: ILatestSummaryState;
let latestSummaryCheckpoint: IScribe | undefined;
let latestDbCheckpoint: IScribe | undefined;
let opMessages: ISequencedDocumentMessage[] = [];

const { tenantId, documentId } = config;
Expand All @@ -119,6 +126,16 @@ export class ScribeLambdaFactory
LumberEventName.ScribeSessionResult,
this.serviceConfiguration,
);
const failCreation = async (error: unknown): Promise<void> => {
const errorMessage = "Scribe lambda creation failed.";
context.log?.error(`${errorMessage} Exception: ${inspect(error)}`, { messageMetaData });
Lumberjack.error(errorMessage, lumberProperties, error);
await this.sendLambdaStartResult(tenantId, documentId, {
lambdaName: LambdaName.Scribe,
success: false,
});
scribeSessionMetric?.error("Scribe lambda creation failed", error);
};

const lumberProperties = getLumberBaseProperties(documentId, tenantId);

Expand Down Expand Up @@ -176,38 +193,58 @@ export class ScribeLambdaFactory
this.enableWholeSummaryUpload,
);
latestSummary = await summaryReader.readLastSummary();
latestSummaryCheckpoint = latestSummary.scribe
? JSON.parse(latestSummary.scribe)
: undefined;
latestDbCheckpoint = (await this.checkpointService.restoreFromCheckpoint(
documentId,
tenantId,
"scribe",
document,
)) as IScribe;
} catch (error) {
const errorMessage = "Scribe lambda creation failed.";
context.log?.error(`${errorMessage} Exception: ${inspect(error)}`, { messageMetaData });
Lumberjack.error(errorMessage, lumberProperties, error);
await this.sendLambdaStartResult(tenantId, documentId, {
lambdaName: LambdaName.Scribe,
success: false,
});
scribeSessionMetric?.error("Scribe lambda creation failed", error);

await failCreation(error);
throw error;
}

// For a new document, Summary, Global (document) DB and Local DB checkpoints will not exist.
// However, it is possible that the global checkpoint was cleared to an empty string
// due to a service summary, so specifically check if global is not defined at all.
// Lastly, a new document will also not have a summary checkpoint, so if one exists without a DB checkpoint,
// we should use the summary checkpoint because there was likely a DB failure.
const useDefaultCheckpointForNewDocument =
document.scribe === undefined || document.scribe === null;
const checkpointIsBlank = document.scribe === "";
const checkpointQuorumIsScrubbed = isCheckpointQuorumScrubbed(document.scribe);
// Mongodb casts undefined as null so we are checking both to be safe.
(document.scribe === undefined || document.scribe === null) &&
!latestDbCheckpoint &&
!latestSummaryCheckpoint;
// Empty string for document DB checkpoint denotes a cache that was cleared due to a service summary.
// This will only happen if IServiceConfiguration.scribe.clearCacheAfterServiceSummary is true. Defaults to false.
const documentCheckpointIsCleared = document.scribe === "";
// It's possible that a local checkpoint is written after global checkpoint was cleared for service summary.
// Similarly, it's possible that the summary checkpoint is ahead of the latest db checkpoint due to a failure.
const summaryCheckpointAheadOfLatestDbCheckpoint =
latestSummaryCheckpoint &&
latestSummaryCheckpoint.sequenceNumber > latestDbCheckpoint?.sequenceNumber;
// Scrubbed users indicate that the quorum members have been scrubbed for privacy compliance.
const dbCheckpointQuorumIsScrubbed = isScribeCheckpointQuorumScrubbed(latestDbCheckpoint);
// Only use the summary checkpoint when
// 1) summary checkpoint is more recent than any DB checkpoint
// 2) the document checkpoint is cleared and there is not a more recent local checkpoint
// 3) the latest db checkpoint quorum members are scrubbed for privacy compliance
const useLatestSummaryCheckpointForExistingDocument =
checkpointIsBlank || checkpointQuorumIsScrubbed;
summaryCheckpointAheadOfLatestDbCheckpoint ||
(documentCheckpointIsCleared && summaryCheckpointAheadOfLatestDbCheckpoint) ||
dbCheckpointQuorumIsScrubbed;

if (useDefaultCheckpointForNewDocument) {
// Restore scribe state if not present in the cache. Mongodb casts undefined as null so we are checking
// both to be safe. Empty sring denotes a cache that was cleared due to a service summary
// Restore scribe state if not present in the cache.
const message = "New document. Setting empty scribe checkpoint";
context.log?.info(message, { messageMetaData });
Lumberjack.info(message, lumberProperties);
lastCheckpoint = DefaultScribe;
} else if (useLatestSummaryCheckpointForExistingDocument) {
const message = `Existing document${
!checkpointIsBlank && checkpointQuorumIsScrubbed
? " with invalid quorum members"
: ""
dbCheckpointQuorumIsScrubbed ? " with invalid quorum members" : ""
}. Fetching checkpoint from summary`;
context.log?.info(message, { messageMetaData });
Lumberjack.info(message, lumberProperties);
Expand All @@ -216,13 +253,20 @@ export class ScribeLambdaFactory
Lumberjack.error(`Summary can't be fetched`, lumberProperties);
lastCheckpoint = DefaultScribe;
} else {
if (isCheckpointQuorumScrubbed(latestSummary.scribe)) {
if (!latestSummaryCheckpoint) {
const error = new Error(
"Attempted to load from non-existent summary checkpoint.",
);
await failCreation(error);
throw error;
}
if (isScribeCheckpointQuorumScrubbed(latestSummaryCheckpoint)) {
Lumberjack.error(
"Quorum from summary is invalid. Continuing.",
"Quorum from summary is scrubbed. Continuing.",
lumberProperties,
);
}
lastCheckpoint = JSON.parse(latestSummary.scribe);
lastCheckpoint = latestSummaryCheckpoint;
opMessages = latestSummary.messages;
// Since the document was originated elsewhere or cache was cleared, logOffset info is irrelavant.
// Currently the lambda checkpoints only after updating the logOffset so setting this to lower
Expand All @@ -234,12 +278,12 @@ export class ScribeLambdaFactory
Lumberjack.info(checkpointMessage, lumberProperties);
}
} else {
lastCheckpoint = (await this.checkpointService.restoreFromCheckpoint(
documentId,
tenantId,
"scribe",
document,
)) as IScribe;
if (!latestDbCheckpoint) {
const error = new Error("Attempted to load from non-existent DB checkpoint.");
await failCreation(error);
throw error;
}
lastCheckpoint = latestDbCheckpoint;

try {
opMessages = await this.getOpMessages(documentId, tenantId, lastCheckpoint);
Expand Down
13 changes: 8 additions & 5 deletions server/routerlicious/packages/lambdas/src/scribe/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ export const isGlobalCheckpoint = (noActiveClients: boolean, globalCheckpointOnl
* Whether the quorum members represented in the checkpoint's protocol state have had their user data scrubbed
* for privacy compliance.
*/
export const isCheckpointQuorumScrubbed = (stringifiedCheckpoint: string): boolean => {
if (!stringifiedCheckpoint) {
export const isScribeCheckpointQuorumScrubbed = (
checkpoint: string | IScribe | undefined,
): boolean => {
if (!checkpoint) {
return false;
}
const checkpoint: IScribe = JSON.parse(stringifiedCheckpoint);
for (const [, sequencedClient] of checkpoint.protocolState.members) {
const parsedCheckpoint: IScribe =
typeof checkpoint === "string" ? JSON.parse(checkpoint) : checkpoint;
for (const [, sequencedClient] of parsedCheckpoint.protocolState.members) {
const user: IUser = sequencedClient.client.user;
// User information was scrubbed.
if (!user.id) {
// User information was scrubbed.
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ describe("Routerlicious", () => {
"getLocalCheckpointEnabled",
Sinon.fake.returns(false),
);
Sinon.replace(
testCheckpointService,
"restoreFromCheckpoint",
Sinon.fake.returns(undefined),
);

testMessageCollection = new TestCollection([]);
testKafka = new TestKafka();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
Lumberjack,
} from "@fluidframework/server-services-telemetry";
import { ICheckpointRepository, IDocumentRepository } from "./database";
import { IDeliState, IDocument, IScribe } from "./document";
import { IDeliState, IDocument, IScribe, type ICheckpoint } from "./document";

type DocumentLambda = "deli" | "scribe";

Expand Down Expand Up @@ -204,46 +204,73 @@ export class CheckpointService implements ICheckpointService {
service: DocumentLambda,
document: IDocument,
) {
let checkpoint;
let lastCheckpoint: IDeliState | IScribe;
let lastCheckpoint: IDeliState | IScribe | undefined;
let isLocalCheckpoint = false;
let localLogOffset;
let globalLogOffset;
let localSequenceNumber;
let globalSequenceNumber;
let localLogOffset: number | undefined;
let globalLogOffset: number | undefined;
let localSequenceNumber: number | undefined;
let globalSequenceNumber: number | undefined;
let checkpointSource = "notFound";

const restoreFromCheckpointMetric = Lumberjack.newLumberMetric(
LumberEventName.RestoreFromCheckpoint,
);
let checkpointSource = "defaultGlobalCollection";
const parseCheckpointString = (
checkpointString: string | undefined,
): IDeliState | IScribe | undefined =>
checkpointString ? (JSON.parse(checkpointString) as IDeliState | IScribe) : undefined;

try {
if (!this.localCheckpointEnabled || !this.checkpointRepository) {
// If we cannot checkpoint locally, use document
lastCheckpoint = JSON.parse(document[service]);
lastCheckpoint = parseCheckpointString(document[service]);
globalLogOffset = lastCheckpoint.logOffset;
globalSequenceNumber = lastCheckpoint.sequenceNumber;
checkpointSource = "defaultGlobalCollection";
} else {
// Search checkpoints collection for checkpoint
try {
checkpoint = await this.checkpointRepository.getCheckpoint(
documentId,
tenantId,
const checkpoint: ICheckpoint | undefined = await this.checkpointRepository
.getCheckpoint(documentId, tenantId)
.catch((error) => {
Lumberjack.error(
`Error retrieving local checkpoint`,
getLumberBaseProperties(documentId, tenantId),
);
return undefined;
});

const localCheckpoint: IDeliState | IScribe | undefined = parseCheckpointString(
checkpoint?.[service],
);
const globalCheckpoint: IDeliState | IScribe | undefined = parseCheckpointString(
document[service],
);
localLogOffset = localCheckpoint?.logOffset;
globalLogOffset = globalCheckpoint?.logOffset;
localSequenceNumber = localCheckpoint?.sequenceNumber;
globalSequenceNumber = globalCheckpoint?.sequenceNumber;

if (localCheckpoint && !globalCheckpoint) {
// If checkpoint does not exist in document (global), use local
Lumberjack.info(
`Global checkpoint not found.`,
getLumberBaseProperties(documentId, tenantId),
);
} catch (error) {
checkpoint = undefined;
Lumberjack.error(
`Error retrieving local checkpoint`,
lastCheckpoint = localCheckpoint;
checkpointSource = "notFoundInGlobalCollection";
isLocalCheckpoint = true;
} else if (!localCheckpoint && globalCheckpoint) {
// If checkpoint does not exist in local, use document (global)
Lumberjack.info(
`Local checkpoint not found.`,
getLumberBaseProperties(documentId, tenantId),
);
checkpointSource = "notFoundInLocalCollection";
}

if (checkpoint?.[service]) {
const localCheckpoint: IDeliState | IScribe = JSON.parse(checkpoint[service]);
const globalCheckpoint: IDeliState | IScribe = JSON.parse(document[service]);

// Compare local and global checkpoints to use latest version
lastCheckpoint = globalCheckpoint;
isLocalCheckpoint = false;
} else if (localCheckpoint && globalCheckpoint) {
// If both checkpoints exist,
// compare local and global checkpoints to use latest version
if (localCheckpoint.sequenceNumber < globalCheckpoint.sequenceNumber) {
// if local checkpoint is behind global, use global
lastCheckpoint = globalCheckpoint;
Expand All @@ -254,20 +281,6 @@ export class CheckpointService implements ICheckpointService {
checkpointSource = "latestFoundInLocalCollection";
isLocalCheckpoint = true;
}
localLogOffset = localCheckpoint.logOffset;
globalLogOffset = globalCheckpoint.logOffset;
localSequenceNumber = localCheckpoint.sequenceNumber;
globalSequenceNumber = globalCheckpoint.sequenceNumber;
} else {
// If checkpoint does not exist, use document
Lumberjack.info(
`Local checkpoint not found.`,
getLumberBaseProperties(documentId, tenantId),
);
checkpointSource = "notFoundInLocalCollection";
lastCheckpoint = JSON.parse(document[service]);
globalLogOffset = lastCheckpoint.logOffset;
globalSequenceNumber = lastCheckpoint.sequenceNumber;
}
}
restoreFromCheckpointMetric.setProperties({
Expand Down

0 comments on commit 6718a9a

Please sign in to comment.