Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove refresh from server path and refresh from network call #16657

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3739,7 +3739,7 @@ export class ContainerRuntime
);

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(proposalHandle, result, readAndParseBlob);
await this.garbageCollector.refreshLatestSummary(result);
}

/**
Expand Down Expand Up @@ -3774,11 +3774,7 @@ export class ContainerRuntime
);

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(
undefined /* proposalHandle */,
result,
readAndParseBlob,
);
await this.garbageCollector.refreshLatestSummary(result);

return { latestSnapshotRefSeq, latestSnapshotVersionId: versionId };
}
Expand Down Expand Up @@ -3844,24 +3840,22 @@ export class ContainerRuntime
// We choose to close the summarizer after the snapshot cache is updated to avoid
// situations which the main client (which is likely to be re-elected as the leader again)
// loads the summarizer from cache.
tyler-cai-microsoft marked this conversation as resolved.
Show resolved Hide resolved
if (this.summaryStateUpdateMethod !== "refreshFromSnapshot") {
agarwal-navin marked this conversation as resolved.
Show resolved Hide resolved
this.mc.logger.sendTelemetryEvent(
{
...event,
eventName: "ClosingSummarizerOnSummaryStale",
codePath: event.eventName,
message: "Stopping fetch from storage",
versionId: versionId != null ? versionId : undefined,
closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);
this.mc.logger.sendTelemetryEvent(
{
...event,
eventName: "ClosingSummarizerOnSummaryStale",
codePath: event.eventName,
message: "Stopping fetch from storage",
versionId: versionId != null ? versionId : undefined,
closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);

// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();
}
// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();

return snapshotResults;
}
Expand Down
39 changes: 3 additions & 36 deletions packages/runtime/container-runtime/src/gc/garbageCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import {
ISummarizeResult,
ITelemetryContext,
} from "@fluidframework/runtime-definitions";
import {
ReadAndParseBlob,
createResponseError,
responseToException,
} from "@fluidframework/runtime-utils";
import { createResponseError, responseToException } from "@fluidframework/runtime-utils";
import {
createChildLogger,
createChildMonitoringContext,
Expand Down Expand Up @@ -850,37 +846,8 @@ export class GarbageCollector implements IGarbageCollector {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void> {
const latestSnapshotData = await this.summaryStateTracker.refreshLatestSummary(
proposalHandle,
result,
readAndParseBlob,
);

// If the latest summary was updated but it was not tracked by this client, our state needs to be updated from
// this snapshot data.
if (this.shouldRunGC && result.latestSummaryUpdated && !result.wasSummaryTracked) {
// The current reference timestamp should be available if we are refreshing state from a snapshot. There has
// to be at least one op (summary op / ack, if nothing else) if a snapshot was taken.
const currentReferenceTimestampMs = this.runtime.getCurrentReferenceTimestampMs();
if (currentReferenceTimestampMs === undefined) {
throw DataProcessingError.create(
"No reference timestamp when updating GC state from snapshot",
"refreshLatestSummary",
undefined,
{
proposalHandle,
summaryRefSeq: result.summaryRefSeq,
gcConfigs: JSON.stringify(this.configs),
},
);
}
this.updateStateFromSnapshotData(latestSnapshotData, currentReferenceTimestampMs);
markfields marked this conversation as resolved.
Show resolved Hide resolved
}
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
markfields marked this conversation as resolved.
Show resolved Hide resolved
await this.summaryStateTracker.refreshLatestSummary(result);
tyler-cai-microsoft marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
6 changes: 1 addition & 5 deletions packages/runtime/container-runtime/src/gc/gcDefinitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ export interface IGarbageCollector {
/** Returns the GC details generated from the base snapshot. */
getBaseGCDetails(): Promise<IGarbageCollectionDetailsBase>;
/** Called when the latest summary of the system has been refreshed. */
refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void>;
refreshLatestSummary(result: RefreshSummaryResult): Promise<void>;
/** Called when a node is updated. Used to detect and log when an inactive node is changed or loaded. */
nodeUpdated(
nodePath: string,
Expand Down
53 changes: 6 additions & 47 deletions packages/runtime/container-runtime/src/gc/gcSummaryStateTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import {
ISummarizeResult,
ISummaryTreeWithStats,
} from "@fluidframework/runtime-definitions";
import { mergeStats, ReadAndParseBlob, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { IContainerRuntimeMetadata, metadataBlobName, RefreshSummaryResult } from "../summary";
import { mergeStats, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { RefreshSummaryResult } from "../summary";
import { GCVersion, IGCStats } from "./gcDefinitions";
import { getGCDataFromSnapshot, generateSortedGCState, getGCVersion } from "./gcHelpers";
import { generateSortedGCState } from "./gcHelpers";
import { IGarbageCollectionSnapshotData, IGarbageCollectionState } from "./gcSummaryDefinitions";
import { IGarbageCollectorConfigs } from ".";

Expand Down Expand Up @@ -267,11 +267,7 @@ export class GCSummaryStateTracker {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<IGarbageCollectionSnapshotData | undefined> {
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
// If the latest summary was updated and the summary was tracked, this client is the one that generated this
// summary. So, update wasGCRunInLatestSummary.
// Note that this has to be updated if GC did not run too. Otherwise, `gcStateNeedsReset` will always return
Expand All @@ -281,7 +277,7 @@ export class GCSummaryStateTracker {
}

if (!result.latestSummaryUpdated || !this.configs.shouldRunGC) {
return undefined;
return;
}

// If the summary was tracked by this client, it was the one that generated the summary in the first place.
Expand All @@ -291,45 +287,8 @@ export class GCSummaryStateTracker {
this.latestSummaryData = this.pendingSummaryData;
this.pendingSummaryData = undefined;
this.updatedDSCountSinceLastSummary = 0;
return undefined;
}

// If the summary was not tracked by this client, the state should be updated from the downloaded snapshot.
const snapshotTree = result.snapshotTree;
const metadataBlobId = snapshotTree.blobs[metadataBlobName];
const metadata = metadataBlobId
? await readAndParseBlob<IContainerRuntimeMetadata>(metadataBlobId)
: undefined;
this.latestSummaryGCVersion = getGCVersion(metadata);

const gcSnapshotTree = snapshotTree.trees[gcTreeKey];
// If GC ran in the container that generated this snapshot, it will have a GC tree.
this.wasGCRunInLatestSummary = gcSnapshotTree !== undefined;

if (gcSnapshotTree === undefined) {
return undefined;
}

let snapshotData = await getGCDataFromSnapshot(gcSnapshotTree, readAndParseBlob);

// If the GC version in the snapshot does not match the GC version currently in effect, the GC data
// in the snapshot cannot be interpreted correctly. Set everything to undefined except for deletedNodes
// because irrespective of GC versions, these nodes have been deleted and cannot be brought back. The
// deletedNodes info is needed to identify when these nodes are used.
if (getGCVersion(metadata) !== this.configs.gcVersionInEffect) {
snapshotData = {
gcState: undefined,
tombstones: undefined,
deletedNodes: snapshotData.deletedNodes,
};
return;
}

this.latestSummaryData = {
serializedGCState: JSON.stringify(snapshotData.gcState),
serializedTombstones: JSON.stringify(snapshotData.tombstones),
serializedDeletedNodes: JSON.stringify(snapshotData.deletedNodes),
};
return snapshotData;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,6 @@ export class SummarizerNode implements IRootSummarizerNode {
proposalHandle: string | undefined,
summaryRefSeq: number,
fetchLatestSnapshot: () => Promise<IFetchSnapshotResult>,
markfields marked this conversation as resolved.
Show resolved Hide resolved
readAndParseBlob: ReadAndParseBlob,
correlatedSummaryLogger: ITelemetryLoggerExt,
): Promise<RefreshSummaryResult> {
const eventProps: {
proposalHandle: string | undefined;
Expand Down Expand Up @@ -450,41 +448,18 @@ export class SummarizerNode implements IRootSummarizerNode {
if (this.referenceSequenceNumber >= summaryRefSeq) {
eventProps.latestSummaryUpdated = false;
event.end(eventProps);
return { latestSummaryUpdated: false };
return { latestSummaryUpdated: false, networkFetchMade: false };
}

// Fetch the latest snapshot and refresh state from it. Note that we need to use the reference sequence number
markfields marked this conversation as resolved.
Show resolved Hide resolved
// of the fetched snapshot and not the "summaryRefSeq" that was passed in.
const { snapshotTree, snapshotRefSeq: fetchedSnapshotRefSeq } =
await fetchLatestSnapshot();
await fetchLatestSnapshot();
agarwal-navin marked this conversation as resolved.
Show resolved Hide resolved

// Possible re-entrancy. We may have updated latest summary state while fetching the snapshot. If the fetched
// snapshot is older than the latest tracked summary, ignore it.
if (this.referenceSequenceNumber >= fetchedSnapshotRefSeq) {
eventProps.latestSummaryUpdated = false;
event.end(eventProps);
return { latestSummaryUpdated: false };
}

await this.refreshLatestSummaryFromSnapshot(
agarwal-navin marked this conversation as resolved.
Show resolved Hide resolved
fetchedSnapshotRefSeq,
snapshotTree,
undefined,
EscapedPath.create(""),
correlatedSummaryLogger,
readAndParseBlob,
);

eventProps.latestSummaryUpdated = true;
eventProps.wasSummaryTracked = false;
eventProps.summaryRefSeq = fetchedSnapshotRefSeq;
eventProps.latestSummaryUpdated = false;
event.end(eventProps);
return {
latestSummaryUpdated: true,
wasSummaryTracked: false,
snapshotTree,
summaryRefSeq: fetchedSnapshotRefSeq,
};
return { latestSummaryUpdated: false, networkFetchMade: false };
},
{ start: true, end: true, cancel: "error" },
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ import { ReadAndParseBlob } from "@fluidframework/runtime-utils";
export type RefreshSummaryResult =
| {
latestSummaryUpdated: false;
networkFetchMade: false;
}
| {
latestSummaryUpdated: true;
wasSummaryTracked: true;
summaryRefSeq: number;
}
| {
latestSummaryUpdated: true;
wasSummaryTracked: false;
snapshotTree: ISnapshotTree;
markfields marked this conversation as resolved.
Show resolved Hide resolved
summaryRefSeq: number;
latestSummaryUpdated: false;
networkFetchMade: true;
};

/**
Expand Down
Loading