Skip to content

Commit

Permalink
Remove refresh from server path and refresh from network call (#16657)
Browse files Browse the repository at this point in the history
[AB#5118](https://dev.azure.com/fluidframework/235294da-091d-4c29-84fc-cdfc3d90890b/_workitems/edit/5118)

Remove "refresh from server" logic. We are making this PR into `next`
instead of `main` as we want to wait a bit before we commit to removing
"refresh from server" logic. Instead of refreshing we are closing the
container and restarting. More details here:
#15140

This requires updating the `summarizerNode` logic and `containerRuntime`
logic. Work to move when we decide to close the container sits in
[AB#5152](https://dev.azure.com/fluidframework/235294da-091d-4c29-84fc-cdfc3d90890b/_workitems/edit/5152)

---------

Co-authored-by: Navin Agarwal <[email protected]>
Co-authored-by: Mark Fields <[email protected]>
  • Loading branch information
3 people authored Aug 30, 2023
1 parent 7829d8a commit 21d00fb
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 1,056 deletions.
144 changes: 43 additions & 101 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ import {
IContainerRuntimeMetadata,
ICreateContainerMetadata,
idCompressorBlobName,
IFetchSnapshotResult,
IRootSummarizerNodeWithGC,
ISummaryMetadataMessage,
metadataBlobName,
Expand Down Expand Up @@ -1066,7 +1065,6 @@ export class ContainerRuntime
private emitDirtyDocumentEvent = true;
private readonly enableOpReentryCheck: boolean;
private readonly disableAttachReorder: boolean | undefined;
private readonly summaryStateUpdateMethod: string | undefined;
private readonly closeSummarizerDelayMs: number;
/**
* If true, summary generated is validate before uploading it to the server. With single commit summaries,
Expand Down Expand Up @@ -1546,9 +1544,6 @@ export class ContainerRuntime
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._audience = audience!;

this.summaryStateUpdateMethod = this.mc.config.getString(
"Fluid.ContainerRuntime.Test.SummaryStateUpdateMethodV2",
);
const closeSummarizerDelayOverride = this.mc.config.getNumber(
"Fluid.ContainerRuntime.Test.CloseSummarizerDelayOverrideMs",
);
Expand Down Expand Up @@ -1703,7 +1698,6 @@ export class ContainerRuntime
disableAttachReorder: this.disableAttachReorder,
disablePartialFlush,
idCompressorEnabled: this.idCompressorEnabled,
summaryStateUpdateMethod: this.summaryStateUpdateMethod,
closeSummarizerDelayOverride,
}),
telemetryDocumentId: this.telemetryDocumentId,
Expand Down Expand Up @@ -3658,11 +3652,19 @@ export class ContainerRuntime
public async refreshLatestSummaryAck(options: IRefreshSummaryAckOptions) {
const { proposalHandle, ackHandle, summaryRefSeq, summaryLogger } = options;
const readAndParseBlob = async <T>(id: string) => readAndParse<T>(this.storage, id);
// The call to fetch the snapshot is very expensive and not always needed.
// It should only be done by the summarizerNode, if required.
// When fetching from storage we will always get the latest version and do not use the ackHandle.
const fetchLatestSnapshot: () => Promise<IFetchSnapshotResult> = async () => {
let fetchResult = await this.fetchSnapshotFromStorageAndMaybeClose(
const result = await this.summarizerNode.refreshLatestSummary(
proposalHandle,
summaryRefSeq,
);

/**
* When refreshing a summary ack, this check indicates a new ack of a summary that was newer than the
* current summary that is tracked, but this summarizer runtime did not produce/track that summary. Thus
* it needs to refresh its state. Today refresh is done by fetching the latest snapshot to update the cache
* and then close as the current main client is likely to be re-elected as the parent summarizer again.
*/
if (result.latestSummaryUpdated && !result.wasSummaryTracked) {
const fetchResult = await this.fetchSnapshotFromStorage(
summaryLogger,
{
eventName: "RefreshLatestSummaryAckFetch",
Expand All @@ -3673,25 +3675,6 @@ export class ContainerRuntime
null,
);

/**
* back-compat - Older loaders and drivers (pre 2.0.0-internal.1.4) don't have fetchSource as a param in the
* getVersions API. So, they will not fetch the latest snapshot from network in the previous fetch call. For
* these scenarios, fetch the snapshot corresponding to the ack handle to have the same behavior before the
* change that started fetching latest snapshot always.
*/
if (fetchResult.latestSnapshotRefSeq < summaryRefSeq) {
fetchResult = await this.fetchSnapshotFromStorageAndMaybeClose(
summaryLogger,
{
eventName: "RefreshLatestSummaryAckFetchBackCompat",
ackHandle,
targetSequenceNumber: summaryRefSeq,
},
readAndParseBlob,
ackHandle,
);
}

/**
* If the fetched snapshot is older than the one for which the ack was received, close the container.
* This should never happen because an ack should be sent after the latest summary is updated in the server.
Expand All @@ -3717,29 +3700,12 @@ export class ContainerRuntime
throw error;
}

// In case we had to retrieve the latest snapshot and it is different than summaryRefSeq,
// wait for the delta manager to catch up before refreshing the latest Summary.
await this.waitForDeltaManagerToCatchup(
fetchResult.latestSnapshotRefSeq,
summaryLogger,
);

return {
snapshotTree: fetchResult.snapshotTree,
snapshotRefSeq: fetchResult.latestSnapshotRefSeq,
};
};

const result = await this.summarizerNode.refreshLatestSummary(
proposalHandle,
summaryRefSeq,
fetchLatestSnapshot,
readAndParseBlob,
summaryLogger,
);
await this.closeStaleSummarizer("RefreshLatestSummaryAckFetch");
return;
}

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(proposalHandle, result, readAndParseBlob);
await this.garbageCollector.refreshLatestSummary(result);
}

/**
Expand All @@ -3752,49 +3718,49 @@ export class ContainerRuntime
summaryLogger: ITelemetryLoggerExt,
): Promise<{ latestSnapshotRefSeq: number; latestSnapshotVersionId: string | undefined }> {
const readAndParseBlob = async <T>(id: string) => readAndParse<T>(this.storage, id);
const { snapshotTree, versionId, latestSnapshotRefSeq } =
await this.fetchSnapshotFromStorageAndMaybeClose(
summaryLogger,
{
eventName: "RefreshLatestSummaryFromServerFetch",
},
readAndParseBlob,
null,
);
const fetchLatestSnapshot: IFetchSnapshotResult = {
snapshotTree,
snapshotRefSeq: latestSnapshotRefSeq,
};
const result = await this.summarizerNode.refreshLatestSummary(
undefined /* proposalHandle */,
latestSnapshotRefSeq,
async () => fetchLatestSnapshot,
readAndParseBlob,
const { versionId, latestSnapshotRefSeq } = await this.fetchSnapshotFromStorage(
summaryLogger,
);

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(
undefined /* proposalHandle */,
result,
{
eventName: "RefreshLatestSummaryFromServerFetch",
},
readAndParseBlob,
null,
);

await this.closeStaleSummarizer("RefreshLatestSummaryFromServerFetch");

return { latestSnapshotRefSeq, latestSnapshotVersionId: versionId };
}

private async closeStaleSummarizer(codePath: string): Promise<void> {
this.mc.logger.sendTelemetryEvent(
{
eventName: "ClosingSummarizerOnSummaryStale",
codePath,
message: "Stopping fetch from storage",
closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);

// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();
}

/**
* Downloads snapshot from storage with the given versionId or latest if versionId is null.
* By default, it also closes the container after downloading the snapshot. However, this may be
* overridden via options.
*/
private async fetchSnapshotFromStorageAndMaybeClose(
private async fetchSnapshotFromStorage(
logger: ITelemetryLoggerExt,
event: ITelemetryGenericEvent,
readAndParseBlob: ReadAndParseBlob,
versionId: string | null,
): Promise<{ snapshotTree: ISnapshotTree; versionId: string; latestSnapshotRefSeq: number }> {
const snapshotResults = await PerformanceEvent.timedExecAsync(
return PerformanceEvent.timedExecAsync(
logger,
event,
async (perfEvent: {
Expand Down Expand Up @@ -3840,30 +3806,6 @@ export class ContainerRuntime
};
},
);

// We choose to close the summarizer after the snapshot cache is updated to avoid
// situations which the main client (which is likely to be re-elected as the leader again)
// loads the summarizer from cache.
if (this.summaryStateUpdateMethod !== "refreshFromSnapshot") {
this.mc.logger.sendTelemetryEvent(
{
...event,
eventName: "ClosingSummarizerOnSummaryStale",
codePath: event.eventName,
message: "Stopping fetch from storage",
versionId: versionId != null ? versionId : undefined,
closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);

// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();
}

return snapshotResults;
}

public notifyAttaching() {} // do nothing (deprecated method)
Expand Down
39 changes: 3 additions & 36 deletions packages/runtime/container-runtime/src/gc/garbageCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import {
ISummarizeResult,
ITelemetryContext,
} from "@fluidframework/runtime-definitions";
import {
ReadAndParseBlob,
createResponseError,
responseToException,
} from "@fluidframework/runtime-utils";
import { createResponseError, responseToException } from "@fluidframework/runtime-utils";
import {
createChildLogger,
createChildMonitoringContext,
Expand Down Expand Up @@ -850,37 +846,8 @@ export class GarbageCollector implements IGarbageCollector {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void> {
const latestSnapshotData = await this.summaryStateTracker.refreshLatestSummary(
proposalHandle,
result,
readAndParseBlob,
);

// If the latest summary was updated but it was not tracked by this client, our state needs to be updated from
// this snapshot data.
if (this.shouldRunGC && result.latestSummaryUpdated && !result.wasSummaryTracked) {
// The current reference timestamp should be available if we are refreshing state from a snapshot. There has
// to be at least one op (summary op / ack, if nothing else) if a snapshot was taken.
const currentReferenceTimestampMs = this.runtime.getCurrentReferenceTimestampMs();
if (currentReferenceTimestampMs === undefined) {
throw DataProcessingError.create(
"No reference timestamp when updating GC state from snapshot",
"refreshLatestSummary",
undefined,
{
proposalHandle,
summaryRefSeq: result.summaryRefSeq,
gcConfigs: JSON.stringify(this.configs),
},
);
}
this.updateStateFromSnapshotData(latestSnapshotData, currentReferenceTimestampMs);
}
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
return this.summaryStateTracker.refreshLatestSummary(result);
}

/**
Expand Down
6 changes: 1 addition & 5 deletions packages/runtime/container-runtime/src/gc/gcDefinitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ export interface IGarbageCollector {
/** Returns the GC details generated from the base snapshot. */
getBaseGCDetails(): Promise<IGarbageCollectionDetailsBase>;
/** Called when the latest summary of the system has been refreshed. */
refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void>;
refreshLatestSummary(result: RefreshSummaryResult): Promise<void>;
/** Called when a node is updated. Used to detect and log when an inactive node is changed or loaded. */
nodeUpdated(
nodePath: string,
Expand Down
53 changes: 6 additions & 47 deletions packages/runtime/container-runtime/src/gc/gcSummaryStateTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import {
ISummarizeResult,
ISummaryTreeWithStats,
} from "@fluidframework/runtime-definitions";
import { mergeStats, ReadAndParseBlob, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { IContainerRuntimeMetadata, metadataBlobName, RefreshSummaryResult } from "../summary";
import { mergeStats, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { RefreshSummaryResult } from "../summary";
import { GCVersion, IGCStats } from "./gcDefinitions";
import { getGCDataFromSnapshot, generateSortedGCState, getGCVersion } from "./gcHelpers";
import { generateSortedGCState } from "./gcHelpers";
import { IGarbageCollectionSnapshotData, IGarbageCollectionState } from "./gcSummaryDefinitions";
import { IGarbageCollectorConfigs } from ".";

Expand Down Expand Up @@ -267,11 +267,7 @@ export class GCSummaryStateTracker {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<IGarbageCollectionSnapshotData | undefined> {
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
// If the latest summary was updated and the summary was tracked, this client is the one that generated this
// summary. So, update wasGCRunInLatestSummary.
// Note that this has to be updated if GC did not run too. Otherwise, `gcStateNeedsReset` will always return
Expand All @@ -281,7 +277,7 @@ export class GCSummaryStateTracker {
}

if (!result.latestSummaryUpdated || !this.configs.shouldRunGC) {
return undefined;
return;
}

// If the summary was tracked by this client, it was the one that generated the summary in the first place.
Expand All @@ -291,45 +287,8 @@ export class GCSummaryStateTracker {
this.latestSummaryData = this.pendingSummaryData;
this.pendingSummaryData = undefined;
this.updatedDSCountSinceLastSummary = 0;
return undefined;
}

// If the summary was not tracked by this client, the state should be updated from the downloaded snapshot.
const snapshotTree = result.snapshotTree;
const metadataBlobId = snapshotTree.blobs[metadataBlobName];
const metadata = metadataBlobId
? await readAndParseBlob<IContainerRuntimeMetadata>(metadataBlobId)
: undefined;
this.latestSummaryGCVersion = getGCVersion(metadata);

const gcSnapshotTree = snapshotTree.trees[gcTreeKey];
// If GC ran in the container that generated this snapshot, it will have a GC tree.
this.wasGCRunInLatestSummary = gcSnapshotTree !== undefined;

if (gcSnapshotTree === undefined) {
return undefined;
}

let snapshotData = await getGCDataFromSnapshot(gcSnapshotTree, readAndParseBlob);

// If the GC version in the snapshot does not match the GC version currently in effect, the GC data
// in the snapshot cannot be interpreted correctly. Set everything to undefined except for deletedNodes
// because irrespective of GC versions, these nodes have been deleted and cannot be brought back. The
// deletedNodes info is needed to identify when these nodes are used.
if (getGCVersion(metadata) !== this.configs.gcVersionInEffect) {
snapshotData = {
gcState: undefined,
tombstones: undefined,
deletedNodes: snapshotData.deletedNodes,
};
return;
}

this.latestSummaryData = {
serializedGCState: JSON.stringify(snapshotData.gcState),
serializedTombstones: JSON.stringify(snapshotData.tombstones),
serializedDeletedNodes: JSON.stringify(snapshotData.deletedNodes),
};
return snapshotData;
}

/**
Expand Down
1 change: 0 additions & 1 deletion packages/runtime/container-runtime/src/gc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export {
export {
cloneGCData,
concatGarbageCollectionStates,
getGCDataFromSnapshot,
shouldAllowGcTombstoneEnforcement,
shouldAllowGcSweep,
trimLeadingAndTrailingSlashes,
Expand Down
Loading

0 comments on commit 21d00fb

Please sign in to comment.