Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove refresh from server path and refresh from network call #16657

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 31 additions & 56 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,6 @@ export class ContainerRuntime
private emitDirtyDocumentEvent = true;
private readonly enableOpReentryCheck: boolean;
private readonly disableAttachReorder: boolean | undefined;
private readonly summaryStateUpdateMethod: string | undefined;
private readonly closeSummarizerDelayMs: number;
/**
* If true, summary generated is validate before uploading it to the server. With single commit summaries,
Expand Down Expand Up @@ -1546,9 +1545,6 @@ export class ContainerRuntime
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._audience = audience!;

this.summaryStateUpdateMethod = this.mc.config.getString(
"Fluid.ContainerRuntime.Test.SummaryStateUpdateMethodV2",
);
const closeSummarizerDelayOverride = this.mc.config.getNumber(
"Fluid.ContainerRuntime.Test.CloseSummarizerDelayOverrideMs",
);
Expand Down Expand Up @@ -1703,7 +1699,6 @@ export class ContainerRuntime
disableAttachReorder: this.disableAttachReorder,
disablePartialFlush,
idCompressorEnabled: this.idCompressorEnabled,
summaryStateUpdateMethod: this.summaryStateUpdateMethod,
closeSummarizerDelayOverride,
}),
telemetryDocumentId: this.telemetryDocumentId,
Expand Down Expand Up @@ -3662,7 +3657,7 @@ export class ContainerRuntime
// It should only be done by the summarizerNode, if required.
// When fetching from storage we will always get the latest version and do not use the ackHandle.
const fetchLatestSnapshot: () => Promise<IFetchSnapshotResult> = async () => {
let fetchResult = await this.fetchSnapshotFromStorageAndMaybeClose(
let fetchResult = await this.fetchSnapshotFromStorageAndClose(
summaryLogger,
{
eventName: "RefreshLatestSummaryAckFetch",
Expand All @@ -3680,7 +3675,7 @@ export class ContainerRuntime
* change that started fetching latest snapshot always.
*/
if (fetchResult.latestSnapshotRefSeq < summaryRefSeq) {
markfields marked this conversation as resolved.
Show resolved Hide resolved
fetchResult = await this.fetchSnapshotFromStorageAndMaybeClose(
fetchResult = await this.fetchSnapshotFromStorageAndClose(
summaryLogger,
{
eventName: "RefreshLatestSummaryAckFetchBackCompat",
Expand Down Expand Up @@ -3733,13 +3728,14 @@ export class ContainerRuntime
const result = await this.summarizerNode.refreshLatestSummary(
proposalHandle,
summaryRefSeq,
fetchLatestSnapshot,
readAndParseBlob,
summaryLogger,
);

if (result.latestSummaryUpdated && !result.wasSummaryTracked) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment here explaning why in this case fetch snapshot is called.

Also nit: The fetchLatestSnapshot() const is no longer required. You can directly call fetchSnapshotFromStorageAndClose here.

Copy link
Member

@markfields markfields Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had left a comment about this too but VS Code ate it. There's lots of logic in there that doesn't necessarily make sense anymore, would be good to clean up here or in a follow-up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most troubling is waitForDeltaManagerToCatchup which I'm pretty sure will hang if we're not already caught up (It should throw IMO since the Container is disposed). I wonder how often this is happening already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wait function is also called in the refreshLatestAck codepath in submitSummary and this concern should be addressed there too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It's a little weird that there is code running after the container runtime is explicitly disposed. For example, when refreshLatestSummaryAckFromServer is called from submitSummary, it would dispose the container runtime but would continue summarization and eventually fail when checkContinue is called later (and the failure reason would be different I believe). We should short circuit this whole thing and return a failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe that be cleaned up as part of AB#5417

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaning up / short-circuiting later sounds fine, as long as we're sure we don't have a potential deadlock - I think we do. That needs to be sorted out immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to AB#5417

Good point. It's a little weird that there is code running after the container runtime is explicitly disposed. For example, when refreshLatestSummaryAckFromServer is called from submitSummary, it would dispose the container runtime but would continue summarization and eventually fail when checkContinue is called later (and the failure reason would be different I believe). We should short circuit this whole thing and return a failure.

await fetchLatestSnapshot();
}

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(proposalHandle, result, readAndParseBlob);
await this.garbageCollector.refreshLatestSummary(result);
}

/**
Expand All @@ -3752,32 +3748,13 @@ export class ContainerRuntime
summaryLogger: ITelemetryLoggerExt,
): Promise<{ latestSnapshotRefSeq: number; latestSnapshotVersionId: string | undefined }> {
const readAndParseBlob = async <T>(id: string) => readAndParse<T>(this.storage, id);
const { snapshotTree, versionId, latestSnapshotRefSeq } =
await this.fetchSnapshotFromStorageAndMaybeClose(
summaryLogger,
{
eventName: "RefreshLatestSummaryFromServerFetch",
},
readAndParseBlob,
null,
);
const fetchLatestSnapshot: IFetchSnapshotResult = {
snapshotTree,
snapshotRefSeq: latestSnapshotRefSeq,
};
const result = await this.summarizerNode.refreshLatestSummary(
undefined /* proposalHandle */,
latestSnapshotRefSeq,
async () => fetchLatestSnapshot,
readAndParseBlob,
const { versionId, latestSnapshotRefSeq } = await this.fetchSnapshotFromStorageAndClose(
summaryLogger,
);

// Notify the garbage collector so it can update its latest summary state.
await this.garbageCollector.refreshLatestSummary(
undefined /* proposalHandle */,
result,
{
eventName: "RefreshLatestSummaryFromServerFetch",
},
readAndParseBlob,
null,
);

return { latestSnapshotRefSeq, latestSnapshotVersionId: versionId };
Expand All @@ -3788,7 +3765,7 @@ export class ContainerRuntime
* By default, it also closes the container after downloading the snapshot. However, this may be
* overridden via options.
*/
private async fetchSnapshotFromStorageAndMaybeClose(
private async fetchSnapshotFromStorageAndClose(
logger: ITelemetryLoggerExt,
event: ITelemetryGenericEvent,
readAndParseBlob: ReadAndParseBlob,
Expand Down Expand Up @@ -3841,27 +3818,25 @@ export class ContainerRuntime
},
);

// We choose to close the summarizer after the snapshot cache is updated to avoid
// situations which the main client (which is likely to be re-elected as the leader again)
// loads the summarizer from cache.
if (this.summaryStateUpdateMethod !== "refreshFromSnapshot") {
agarwal-navin marked this conversation as resolved.
Show resolved Hide resolved
this.mc.logger.sendTelemetryEvent(
{
...event,
eventName: "ClosingSummarizerOnSummaryStale",
codePath: event.eventName,
message: "Stopping fetch from storage",
versionId: versionId != null ? versionId : undefined,
closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);
// We wait to close the summarizer until after the snapshot cache is updated.
// It's likely that this client will be re-elected as leader, in which case
// it will have the latest snapshot available in the cache (not the stale one it used this time)
this.mc.logger.sendTelemetryEvent(
{
...event,
eventName: "ClosingSummarizerOnSummaryStale",
codePath: event.eventName,
message: "Stopping fetch from storage",
versionId: versionId != null ? versionId : undefined,
closeSummarizerDelayMs: this.closeSummarizerDelayMs,
},
new GenericError("Restarting summarizer instead of refreshing"),
);

// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();
}
// Delay before restarting summarizer to prevent the summarizer from restarting too frequently.
await delay(this.closeSummarizerDelayMs);
this._summarizer?.stop("latestSummaryStateStale");
this.disposeFn();

return snapshotResults;
}
Expand Down
39 changes: 3 additions & 36 deletions packages/runtime/container-runtime/src/gc/garbageCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ import {
ISummarizeResult,
ITelemetryContext,
} from "@fluidframework/runtime-definitions";
import {
ReadAndParseBlob,
createResponseError,
responseToException,
} from "@fluidframework/runtime-utils";
import { createResponseError, responseToException } from "@fluidframework/runtime-utils";
import {
createChildLogger,
createChildMonitoringContext,
Expand Down Expand Up @@ -850,37 +846,8 @@ export class GarbageCollector implements IGarbageCollector {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void> {
const latestSnapshotData = await this.summaryStateTracker.refreshLatestSummary(
proposalHandle,
result,
readAndParseBlob,
);

// If the latest summary was updated but it was not tracked by this client, our state needs to be updated from
// this snapshot data.
if (this.shouldRunGC && result.latestSummaryUpdated && !result.wasSummaryTracked) {
// The current reference timestamp should be available if we are refreshing state from a snapshot. There has
// to be at least one op (summary op / ack, if nothing else) if a snapshot was taken.
const currentReferenceTimestampMs = this.runtime.getCurrentReferenceTimestampMs();
if (currentReferenceTimestampMs === undefined) {
throw DataProcessingError.create(
"No reference timestamp when updating GC state from snapshot",
"refreshLatestSummary",
undefined,
{
proposalHandle,
summaryRefSeq: result.summaryRefSeq,
gcConfigs: JSON.stringify(this.configs),
},
);
}
this.updateStateFromSnapshotData(latestSnapshotData, currentReferenceTimestampMs);
markfields marked this conversation as resolved.
Show resolved Hide resolved
}
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
markfields marked this conversation as resolved.
Show resolved Hide resolved
return this.summaryStateTracker.refreshLatestSummary(result);
}

/**
Expand Down
6 changes: 1 addition & 5 deletions packages/runtime/container-runtime/src/gc/gcDefinitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ export interface IGarbageCollector {
/** Returns the GC details generated from the base snapshot. */
getBaseGCDetails(): Promise<IGarbageCollectionDetailsBase>;
/** Called when the latest summary of the system has been refreshed. */
refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<void>;
refreshLatestSummary(result: RefreshSummaryResult): Promise<void>;
/** Called when a node is updated. Used to detect and log when an inactive node is changed or loaded. */
nodeUpdated(
nodePath: string,
Expand Down
53 changes: 6 additions & 47 deletions packages/runtime/container-runtime/src/gc/gcSummaryStateTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import {
ISummarizeResult,
ISummaryTreeWithStats,
} from "@fluidframework/runtime-definitions";
import { mergeStats, ReadAndParseBlob, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { IContainerRuntimeMetadata, metadataBlobName, RefreshSummaryResult } from "../summary";
import { mergeStats, SummaryTreeBuilder } from "@fluidframework/runtime-utils";
import { RefreshSummaryResult } from "../summary";
import { GCVersion, IGCStats } from "./gcDefinitions";
import { getGCDataFromSnapshot, generateSortedGCState, getGCVersion } from "./gcHelpers";
import { generateSortedGCState } from "./gcHelpers";
import { IGarbageCollectionSnapshotData, IGarbageCollectionState } from "./gcSummaryDefinitions";
import { IGarbageCollectorConfigs } from ".";

Expand Down Expand Up @@ -267,11 +267,7 @@ export class GCSummaryStateTracker {
* Called to refresh the latest summary state. This happens when either a pending summary is acked or a snapshot
* is downloaded and should be used to update the state.
*/
public async refreshLatestSummary(
proposalHandle: string | undefined,
result: RefreshSummaryResult,
readAndParseBlob: ReadAndParseBlob,
): Promise<IGarbageCollectionSnapshotData | undefined> {
public async refreshLatestSummary(result: RefreshSummaryResult): Promise<void> {
// If the latest summary was updated and the summary was tracked, this client is the one that generated this
// summary. So, update wasGCRunInLatestSummary.
// Note that this has to be updated if GC did not run too. Otherwise, `gcStateNeedsReset` will always return
Expand All @@ -281,7 +277,7 @@ export class GCSummaryStateTracker {
}

if (!result.latestSummaryUpdated || !this.configs.shouldRunGC) {
return undefined;
return;
}

// If the summary was tracked by this client, it was the one that generated the summary in the first place.
Expand All @@ -291,45 +287,8 @@ export class GCSummaryStateTracker {
this.latestSummaryData = this.pendingSummaryData;
this.pendingSummaryData = undefined;
this.updatedDSCountSinceLastSummary = 0;
return undefined;
}

// If the summary was not tracked by this client, the state should be updated from the downloaded snapshot.
const snapshotTree = result.snapshotTree;
const metadataBlobId = snapshotTree.blobs[metadataBlobName];
const metadata = metadataBlobId
? await readAndParseBlob<IContainerRuntimeMetadata>(metadataBlobId)
: undefined;
this.latestSummaryGCVersion = getGCVersion(metadata);

const gcSnapshotTree = snapshotTree.trees[gcTreeKey];
// If GC ran in the container that generated this snapshot, it will have a GC tree.
this.wasGCRunInLatestSummary = gcSnapshotTree !== undefined;

if (gcSnapshotTree === undefined) {
return undefined;
}

let snapshotData = await getGCDataFromSnapshot(gcSnapshotTree, readAndParseBlob);

// If the GC version in the snapshot does not match the GC version currently in effect, the GC data
// in the snapshot cannot be interpreted correctly. Set everything to undefined except for deletedNodes
// because irrespective of GC versions, these nodes have been deleted and cannot be brought back. The
// deletedNodes info is needed to identify when these nodes are used.
if (getGCVersion(metadata) !== this.configs.gcVersionInEffect) {
snapshotData = {
gcState: undefined,
tombstones: undefined,
deletedNodes: snapshotData.deletedNodes,
};
return;
}

this.latestSummaryData = {
serializedGCState: JSON.stringify(snapshotData.gcState),
serializedTombstones: JSON.stringify(snapshotData.tombstones),
serializedDeletedNodes: JSON.stringify(snapshotData.deletedNodes),
};
return snapshotData;
}

/**
Expand Down
Loading