Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove refresh from server path and refresh from network call #16657

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 43 additions & 101 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ import {
IContainerRuntimeMetadata,
ICreateContainerMetadata,
idCompressorBlobName,
IFetchSnapshotResult,
IRootSummarizerNodeWithGC,
ISummaryMetadataMessage,
metadataBlobName,
Expand Down Expand Up @@ -1066,7 +1065,6 @@ export class ContainerRuntime
private emitDirtyDocumentEvent = true;
private readonly enableOpReentryCheck: boolean;
private readonly disableAttachReorder: boolean | undefined;
private readonly summaryStateUpdateMethod: string | undefined;
private readonly closeSummarizerDelayMs: number;
/**
* If true, summary generated is validate before uploading it to the server. With single commit summaries,
Expand Down Expand Up @@ -1546,9 +1544,6 @@ export class ContainerRuntime
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._audience = audience!;

this.summaryStateUpdateMethod = this.mc.config.getString(
"Fluid.ContainerRuntime.Test.SummaryStateUpdateMethodV2",
);
const closeSummarizerDelayOverride = this.mc.config.getNumber(
"Fluid.ContainerRuntime.Test.CloseSummarizerDelayOverrideMs",
);
Expand Down Expand Up @@ -1703,7 +1698,6 @@ export class ContainerRuntime
disableAttachReorder: this.disableAttachReorder,
disablePartialFlush,
idCompressorEnabled: this.idCompressorEnabled,
summaryStateUpdateMethod: this.summaryStateUpdateMethod,
closeSummarizerDelayOverride,
}),
telemetryDocumentId: this.telemetryDocumentId,
Expand Down Expand Up @@ -3658,11 +3652,19 @@ export class ContainerRuntime
public async refreshLatestSummaryAck(options: IRefreshSummaryAckOptions) {
const { proposalHandle, ackHandle, summaryRefSeq, summaryLogger } = options;
const readAndParseBlob = async <T>(id: string) => readAndParse<T>(this.storage, id);
// The call to fetch the snapshot is very expensive and not always needed.
// It should only be done by the summarizerNode, if required.
// When fetching from storage we will always get the latest version and do not use the ackHandle.
const fetchLatestSnapshot: () => Promise<IFetchSnapshotResult> = async () => {
let fetchResult = await this.fetchSnapshotFromStorageAndMaybeClose(
const result = await this.summarizerNode.refreshLatestSummary(
proposalHandle,
summaryRefSeq,
);

/**
* When refreshing a summary ack, this check indicates a new ack of a summary that was newer than the
* current summary that is tracked, but this summarizer runtime did not produce/track that summary. Thus
* it needs to refresh its state. Today refresh is done by fetching the latest snapshot to update the cache
* and then close as the current main client is likely to be re-elected as the parent summarizer again.
*/
if (result.latestSummaryUpdated && !result.wasSummaryTracked) {
const fetchResult = await this.fetchSnapshotFromStorage(
summaryLogger,
{
eventName: "RefreshLatestSummaryAckFetch",
Expand All @@ -3673,25 +3675,6 @@ export class ContainerRuntime
null,
);

/**
* back-compat - Older loaders and drivers (pre 2.0.0-internal.1.4) don't have fetchSource as a param in the
* getVersions API. So, they will not fetch the latest snapshot from network in the previous fetch call. For
* these scenarios, fetch the snapshot corresponding to the ack handle to have the same behavior before the
* change that started fetching latest snapshot always.
*/
if (fetchResult.latestSnapshotRefSeq < summaryRefSeq) {
fetchResult = await this.fetchSnapshotFromStorageAndMaybeClose(
summaryLogger,
{
eventName: "RefreshLatestSummaryAckFetchBackCompat",
ackHandle,
targetSequenceNumber: summaryRefSeq,
},
readAndParseBlob,
ackHandle,
);
}

/**
* If the fetched snapshot is older than the one for which the ack was received, close the container.
* This should never happen because an ack should be sent after the latest summary is updated in the server.
Expand All @@ -3717,29 +3700,12 @@ export class ContainerRuntime
throw error;
markfields marked this conversation as resolved.
Show resolved Hide resolved
}

// In case we had to retrieve the latest snapshot and it is different than summaryRefSeq,
// wait for the delta manager to catch up before refreshing the latest Summary.
await this.waitForDeltaManagerToCatchup(
fetchResult.latestSnapshotRefSeq,
summaryLogger,
);

return {
snapshotTree: fetchResult.snapshotTree,
snapshotRefSeq: fetchResult.latestSnapshotRefSeq,
};
};

const result = await this.summarizerNode.refreshLatestSummary(
proposalHandle,
summaryRefSeq,
fetchLatestSnapshot,
readAndParseBlob,
summaryLogger,
);
await this.closeStaleSummarizer("RefreshLatestSummaryAckFetch");
return;
}

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(proposalHandle, result, readAndParseBlob);
await this.garbageCollector.refreshLatestSummary(result);
}

/**
Expand All @@ -3752,49 +3718,49 @@ export class ContainerRuntime
summaryLogger: ITelemetryLoggerExt,
): Promise<{ latestSnapshotRefSeq: number; latestSnapshotVersionId: string | undefined }> {
const readAndParseBlob = async <T>(id: string) => readAndParse<T>(this.storage, id);
const { snapshotTree, versionId, latestSnapshotRefSeq } =
await this.fetchSnapshotFromStorageAndMaybeClose(
summaryLogger,
{
eventName: "RefreshLatestSummaryFromServerFetch",
},
readAndParseBlob,
null,
);
const fetchLatestSnapshot: IFetchSnapshotResult = {
snapshotTree,
snapshotRefSeq: latestSnapshotRefSeq,
};
const result = await this.summarizerNode.refreshLatestSummary(
undefined /* proposalHandle */,
latestSnapshotRefSeq,
async () => fetchLatestSnapshot,
readAndParseBlob,
const { versionId, latestSnapshotRefSeq } = await this.fetchSnapshotFromStorage(
summaryLogger,
);

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(
undefined /* proposalHandle */,
result,
{
eventName: "RefreshLatestSummaryFromServerFetch",
},
readAndParseBlob,
null,
);

await this.closeStaleSummarizer("RefreshLatestSummaryFromServerFetch");

return { latestSnapshotRefSeq, latestSnapshotVersionId: versionId };
}

private async closeStaleSummarizer(codePath: string): Promise<void> {
this.mc.logger.sendTelemetryEvent(
{
eventName: "ClosingSummarizerOnSummaryStale",
codePath,
message: "Stopping fetch from storage",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this field for? It's redundant with eventName, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it will prevent the error's message from being copied over to the event, not that we typically look for that, but JFYI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I realized this is just a code move, not actually new here, but something minor to consider in your next PRs)

closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);

// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();
}

/**
* Downloads snapshot from storage with the given versionId or latest if versionId is null.
* By default, it also closes the container after downloading the snapshot. However, this may be
* overridden via options.
*/
private async fetchSnapshotFromStorageAndMaybeClose(
private async fetchSnapshotFromStorage(
logger: ITelemetryLoggerExt,
event: ITelemetryGenericEvent,
readAndParseBlob: ReadAndParseBlob,
versionId: string | null,
): Promise<{ snapshotTree: ISnapshotTree; versionId: string; latestSnapshotRefSeq: number }> {
const snapshotResults = await PerformanceEvent.timedExecAsync(
return PerformanceEvent.timedExecAsync(
logger,
event,
async (perfEvent: {
Expand Down Expand Up @@ -3840,30 +3806,6 @@ export class ContainerRuntime
};
},
);

// We choose to close the summarizer after the snapshot cache is updated to avoid
// situations which the main client (which is likely to be re-elected as the leader again)
// loads the summarizer from cache.
if (this.summaryStateUpdateMethod !== "refreshFromSnapshot") {
agarwal-navin marked this conversation as resolved.
Show resolved Hide resolved
this.mc.logger.sendTelemetryEvent(
{
...event,
eventName: "ClosingSummarizerOnSummaryStale",
codePath: event.eventName,
message: "Stopping fetch from storage",
versionId: versionId != null ? versionId : undefined,
closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);

// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();
}

return snapshotResults;
}

public notifyAttaching() {} // do nothing (deprecated method)
Expand Down
39 changes: 3 additions & 36 deletions packages/runtime/container-runtime/src/gc/garbageCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import {
ISummarizeResult,
ITelemetryContext,
} from "@fluidframework/runtime-definitions";
import {
ReadAndParseBlob,
createResponseError,
responseToException,
} from "@fluidframework/runtime-utils";
import { createResponseError, responseToException } from "@fluidframework/runtime-utils";
import {
createChildLogger,
createChildMonitoringContext,
Expand Down Expand Up @@ -850,37 +846,8 @@ export class GarbageCollector implements IGarbageCollector {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void> {
const latestSnapshotData = await this.summaryStateTracker.refreshLatestSummary(
proposalHandle,
result,
readAndParseBlob,
);

// If the latest summary was updated but it was not tracked by this client, our state needs to be updated from
// this snapshot data.
if (this.shouldRunGC && result.latestSummaryUpdated && !result.wasSummaryTracked) {
// The current reference timestamp should be available if we are refreshing state from a snapshot. There has
// to be at least one op (summary op / ack, if nothing else) if a snapshot was taken.
const currentReferenceTimestampMs = this.runtime.getCurrentReferenceTimestampMs();
if (currentReferenceTimestampMs === undefined) {
throw DataProcessingError.create(
"No reference timestamp when updating GC state from snapshot",
"refreshLatestSummary",
undefined,
{
proposalHandle,
summaryRefSeq: result.summaryRefSeq,
gcConfigs: JSON.stringify(this.configs),
},
);
}
this.updateStateFromSnapshotData(latestSnapshotData, currentReferenceTimestampMs);
markfields marked this conversation as resolved.
Show resolved Hide resolved
}
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
markfields marked this conversation as resolved.
Show resolved Hide resolved
return this.summaryStateTracker.refreshLatestSummary(result);
}

/**
Expand Down
6 changes: 1 addition & 5 deletions packages/runtime/container-runtime/src/gc/gcDefinitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ export interface IGarbageCollector {
/** Returns the GC details generated from the base snapshot. */
getBaseGCDetails(): Promise<IGarbageCollectionDetailsBase>;
/** Called when the latest summary of the system has been refreshed. */
refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void>;
refreshLatestSummary(result: RefreshSummaryResult): Promise<void>;
/** Called when a node is updated. Used to detect and log when an inactive node is changed or loaded. */
nodeUpdated(
nodePath: string,
Expand Down
53 changes: 6 additions & 47 deletions packages/runtime/container-runtime/src/gc/gcSummaryStateTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import {
ISummarizeResult,
ISummaryTreeWithStats,
} from "@fluidframework/runtime-definitions";
import { mergeStats, ReadAndParseBlob, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { IContainerRuntimeMetadata, metadataBlobName, RefreshSummaryResult } from "../summary";
import { mergeStats, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { RefreshSummaryResult } from "../summary";
import { GCVersion, IGCStats } from "./gcDefinitions";
import { getGCDataFromSnapshot, generateSortedGCState, getGCVersion } from "./gcHelpers";
import { generateSortedGCState } from "./gcHelpers";
import { IGarbageCollectionSnapshotData, IGarbageCollectionState } from "./gcSummaryDefinitions";
import { IGarbageCollectorConfigs } from ".";

Expand Down Expand Up @@ -267,11 +267,7 @@ export class GCSummaryStateTracker {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<IGarbageCollectionSnapshotData | undefined> {
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
// If the latest summary was updated and the summary was tracked, this client is the one that generated this
// summary. So, update wasGCRunInLatestSummary.
// Note that this has to be updated if GC did not run too. Otherwise, `gcStateNeedsReset` will always return
Expand All @@ -281,7 +277,7 @@ export class GCSummaryStateTracker {
}

if (!result.latestSummaryUpdated || !this.configs.shouldRunGC) {
return undefined;
return;
}

// If the summary was tracked by this client, it was the one that generated the summary in the first place.
Expand All @@ -291,45 +287,8 @@ export class GCSummaryStateTracker {
this.latestSummaryData = this.pendingSummaryData;
this.pendingSummaryData = undefined;
this.updatedDSCountSinceLastSummary = 0;
return undefined;
}

// If the summary was not tracked by this client, the state should be updated from the downloaded snapshot.
const snapshotTree = result.snapshotTree;
const metadataBlobId = snapshotTree.blobs[metadataBlobName];
const metadata = metadataBlobId
? await readAndParseBlob<IContainerRuntimeMetadata>(metadataBlobId)
: undefined;
this.latestSummaryGCVersion = getGCVersion(metadata);

const gcSnapshotTree = snapshotTree.trees[gcTreeKey];
// If GC ran in the container that generated this snapshot, it will have a GC tree.
this.wasGCRunInLatestSummary = gcSnapshotTree !== undefined;

if (gcSnapshotTree === undefined) {
return undefined;
}

let snapshotData = await getGCDataFromSnapshot(gcSnapshotTree, readAndParseBlob);

// If the GC version in the snapshot does not match the GC version currently in effect, the GC data
// in the snapshot cannot be interpreted correctly. Set everything to undefined except for deletedNodes
// because irrespective of GC versions, these nodes have been deleted and cannot be brought back. The
// deletedNodes info is needed to identify when these nodes are used.
if (getGCVersion(metadata) !== this.configs.gcVersionInEffect) {
snapshotData = {
gcState: undefined,
tombstones: undefined,
deletedNodes: snapshotData.deletedNodes,
};
return;
}

this.latestSummaryData = {
serializedGCState: JSON.stringify(snapshotData.gcState),
serializedTombstones: JSON.stringify(snapshotData.tombstones),
serializedDeletedNodes: JSON.stringify(snapshotData.deletedNodes),
};
return snapshotData;
}

/**
Expand Down
1 change: 0 additions & 1 deletion packages/runtime/container-runtime/src/gc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export {
export {
cloneGCData,
concatGarbageCollectionStates,
getGCDataFromSnapshot,
shouldAllowGcTombstoneEnforcement,
shouldAllowGcSweep,
trimLeadingAndTrailingSlashes,
Expand Down
Loading