From ad3c8344fd59b1f09c639773ab95465e644ae1e9 Mon Sep 17 00:00:00 2001 From: Paul Leathers Date: Thu, 7 Oct 2021 10:51:54 -0700 Subject: [PATCH] Use real RunningSummarizer in SummaryManager UT to test last summary path --- .../src/test/summaryManager.spec.ts | 100 +++++++++++++++--- 1 file changed, 83 insertions(+), 17 deletions(-) diff --git a/packages/runtime/container-runtime/src/test/summaryManager.spec.ts b/packages/runtime/container-runtime/src/test/summaryManager.spec.ts index 83b468360a0f..8afc4fb9cb1c 100644 --- a/packages/runtime/container-runtime/src/test/summaryManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/summaryManager.spec.ts @@ -7,7 +7,9 @@ import { strict as assert } from "assert"; import sinon from "sinon"; import { Deferred, TypedEventEmitter } from "@fluidframework/common-utils"; import { IFluidHandle, IFluidLoadable, IFluidRouter } from "@fluidframework/core-interfaces"; +import { MessageType } from "@fluidframework/protocol-definitions"; import { MockLogger } from "@fluidframework/telemetry-utils"; +import { MockDeltaManager } from "@fluidframework/test-runtime-utils"; import { IConnectedEvents, IConnectedState, @@ -22,6 +24,10 @@ import { SummarizerStopReason, } from "../summarizerTypes"; import { ISummarizerClientElection, ISummarizerClientElectionEvents } from "../summarizerClientElection"; +import { RunningSummarizer } from "../runningSummarizer"; +import { SummarizeHeuristicData } from "../summarizerHeuristics"; +import { SummaryCollection, ISummaryOpMessage } from "../summaryCollection"; +import { neverCancelledSummaryToken } from "../runWhileConnectedCoordinator"; describe("Summary Manager", () => { let clock: sinon.SinonFakeTimers; @@ -30,18 +36,14 @@ describe("Summary Manager", () => { const flushPromises = async () => new Promise((resolve) => process.nextTick(resolve)); const thisClientId = "this"; const mockLogger = new MockLogger(); + const mockDeltaManager = new MockDeltaManager(); let summaryManager: SummaryManager; + let runningSummarizer: RunningSummarizer; + // let runCount: number; + const summarizerClientId = "test"; // Fake objects - let fakeOpListener; - const summaryCollection = { - opsSinceLastAck: 0, - addOpListener: (listener) => { fakeOpListener = listener; }, - removeOpListener: (listener) => { - assert.strictEqual(fakeOpListener, listener, "Re-init of fakeOpListener?"); - fakeOpListener = undefined; - }, - }; + const summaryCollection = new SummaryCollection(mockDeltaManager, mockLogger); const throttler = { delayMs: 0, numAttempts: 0, @@ -51,6 +53,23 @@ describe("Summary Manager", () => { delayFn: () => 0, }; + const summaryOp: ISummaryOpMessage = { + clientId: "clientId", + clientSequenceNumber: 5, + minimumSequenceNumber: 5, + referenceSequenceNumber: 5, + sequenceNumber: 6, + term: 0, + timestamp: 6, + type: MessageType.Summarize, + contents: { + handle: "OpHandle", + head: "head", + message: "message", + parents: ["parents"], + }, + }; + class TestConnectedState extends TypedEventEmitter implements IConnectedState { public connected = false; public clientId: string | undefined; @@ -90,10 +109,35 @@ describe("Summary Manager", () => { public async run(onBehalfOf: string): Promise { this.onBehalfOf = onBehalfOf; this.state = "running"; + runningSummarizer = await RunningSummarizer.start( + mockLogger, + summaryCollection.createWatcher(summarizerClientId), + { + idleTime: 5000, // 5 sec (idle) + maxTime: 5000 * 12, // 1 min (active) + maxOps: 1000, // 1k ops (active) + maxAckWaitTime: 120000, // 2 min + }, + // submitSummaryCallback + async (/* options */) => { + return { + stage: "base", + referenceSequenceNumber: 0, + error: undefined, + } as const; + }, + new SummarizeHeuristicData(0, { refSequenceNumber: 0, summaryTime: Date.now() }), + () => { }, + summaryCollection, + neverCancelledSummaryToken, + // stopSummarizerCallback + (reason) => { }, + ); await Promise.all([ this.stopDeferred.promise, this.runDeferred.promise, ]); + await runningSummarizer.waitStop(true); this.state = "stopped"; return "summarizerClientDisconnected"; } @@ -177,7 +221,7 @@ describe("Summary Manager", () => { summaryManager.removeAllListeners(); connectedState.removeAllListeners(); throttler.delayMs = 0; - summaryCollection.opsSinceLastAck = 0; + mockDeltaManager.lastSequenceNumber = 0; requestCalls = 0; clock.reset(); }); @@ -254,7 +298,7 @@ describe("Summary Manager", () => { describe("Start Summarizer Delay", () => { it("Should wait for initial delay before first start", async () => { - summaryCollection.opsSinceLastAck = 999; // 999 < 1000, so do not bypass + mockDeltaManager.lastSequenceNumber = 999; // 999 < 1000, so do not bypass createSummaryManager({ initialDelayMs: 2000, opsToBypassInitialDelay: 1000, @@ -275,7 +319,7 @@ describe("Summary Manager", () => { }); it("Should bypass initial delay if enough ops have already passed", async () => { - summaryCollection.opsSinceLastAck = 1000; // 1000 >= 1000, so bypass + mockDeltaManager.lastSequenceNumber = 1000; // seq >= opsToBypass, so bypass createSummaryManager({ initialDelayMs: 2000, opsToBypassInitialDelay: 1000, @@ -300,7 +344,7 @@ describe("Summary Manager", () => { // make it work in main scenario, not some corner case that does not matter. // Issue #7273 tracks making appropriate product and test change and re-enable the test. it("Should bypass initial delay if enough ops pass later", async () => { - summaryCollection.opsSinceLastAck = 500; // 500 < 1000, so do not bypass yet + mockDeltaManager.lastSequenceNumber = 500; createSummaryManager({ initialDelayMs: 2000, opsToBypassInitialDelay: 1000, @@ -312,13 +356,13 @@ describe("Summary Manager", () => { clock.tick(1999); await flushPromises(); assertRequests(0, "should not have requested summarizer yet"); - summaryCollection.opsSinceLastAck = 999; // 999 < 1000, still do not bypass - fakeOpListener(); // Fire a fake "op" event + mockDeltaManager.lastSequenceNumber = 999; // seq < opsToBypass. No bypass. + mockDeltaManager.emit("op", summaryOp); clientElection.electClient(thisClientId); // force trigger refresh await flushPromises(); assertRequests(0, "still should not have requested summarizer yet"); - summaryCollection.opsSinceLastAck = 1000; // 1000 >= 1000, so should bypass now - fakeOpListener(); // Fire a fake "op" event + mockDeltaManager.lastSequenceNumber = 1000; // Bypass now + mockDeltaManager.emit("op", summaryOp); clientElection.electClient(thisClientId); // force trigger refresh await flushPromises(); assertRequests(1, "should request summarizer, bypassing initial delay"); @@ -327,6 +371,28 @@ describe("Summary Manager", () => { assertState(SummaryManagerState.Running, "summarizer should be running"); }); + it("Should create last summary when summarizer created without delay, then disconnected", async () => { + throttler.delayMs = 0; + createSummaryManager({ + opsToBypassInitialDelay: 0, + connected: false, + }); + clientElection.electClient(thisClientId); + await flushPromises(); + assertState(SummaryManagerState.Off, "not connected"); + mockDeltaManager.lastSequenceNumber = 10001; + connectedState.connect(); + await flushPromises(); + assertState(SummaryManagerState.Starting, "Summarizer should be starting"); + assertRequests(1, "Should begin without delay"); + completeSummarizerRequest(); + await flushPromises(); + assertState(SummaryManagerState.Running, "Should be running"); + connectedState.disconnect(); + await flushPromises(); + assertState(SummaryManagerState.Stopping, "Should be stopping"); + }); + it("Should wait for throttler delay before starting summarizer", async () => { throttler.delayMs = 100; createSummaryManager({