Skip to content

Commit

Permalink
Use real RunningSummarizer in SummaryManager UT to test last summary …
Browse files Browse the repository at this point in the history
…path
  • Loading branch information
pleath committed Oct 7, 2021
1 parent d562a2d commit ad3c834
Showing 1 changed file with 83 additions and 17 deletions.
100 changes: 83 additions & 17 deletions packages/runtime/container-runtime/src/test/summaryManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { strict as assert } from "assert";
import sinon from "sinon";
import { Deferred, TypedEventEmitter } from "@fluidframework/common-utils";
import { IFluidHandle, IFluidLoadable, IFluidRouter } from "@fluidframework/core-interfaces";
import { MessageType } from "@fluidframework/protocol-definitions";
import { MockLogger } from "@fluidframework/telemetry-utils";
import { MockDeltaManager } from "@fluidframework/test-runtime-utils";
import {
IConnectedEvents,
IConnectedState,
Expand All @@ -22,6 +24,10 @@ import {
SummarizerStopReason,
} from "../summarizerTypes";
import { ISummarizerClientElection, ISummarizerClientElectionEvents } from "../summarizerClientElection";
import { RunningSummarizer } from "../runningSummarizer";
import { SummarizeHeuristicData } from "../summarizerHeuristics";
import { SummaryCollection, ISummaryOpMessage } from "../summaryCollection";
import { neverCancelledSummaryToken } from "../runWhileConnectedCoordinator";

describe("Summary Manager", () => {
let clock: sinon.SinonFakeTimers;
Expand All @@ -30,18 +36,14 @@ describe("Summary Manager", () => {
const flushPromises = async () => new Promise((resolve) => process.nextTick(resolve));
const thisClientId = "this";
const mockLogger = new MockLogger();
const mockDeltaManager = new MockDeltaManager();
let summaryManager: SummaryManager;
let runningSummarizer: RunningSummarizer;
// let runCount: number;
const summarizerClientId = "test";

// Fake objects
let fakeOpListener;
const summaryCollection = {
opsSinceLastAck: 0,
addOpListener: (listener) => { fakeOpListener = listener; },
removeOpListener: (listener) => {
assert.strictEqual(fakeOpListener, listener, "Re-init of fakeOpListener?");
fakeOpListener = undefined;
},
};
const summaryCollection = new SummaryCollection(mockDeltaManager, mockLogger);
const throttler = {
delayMs: 0,
numAttempts: 0,
Expand All @@ -51,6 +53,23 @@ describe("Summary Manager", () => {
delayFn: () => 0,
};

const summaryOp: ISummaryOpMessage = {
clientId: "clientId",
clientSequenceNumber: 5,
minimumSequenceNumber: 5,
referenceSequenceNumber: 5,
sequenceNumber: 6,
term: 0,
timestamp: 6,
type: MessageType.Summarize,
contents: {
handle: "OpHandle",
head: "head",
message: "message",
parents: ["parents"],
},
};

class TestConnectedState extends TypedEventEmitter<IConnectedEvents> implements IConnectedState {
public connected = false;
public clientId: string | undefined;
Expand Down Expand Up @@ -90,10 +109,35 @@ describe("Summary Manager", () => {
public async run(onBehalfOf: string): Promise<SummarizerStopReason> {
this.onBehalfOf = onBehalfOf;
this.state = "running";
runningSummarizer = await RunningSummarizer.start(
mockLogger,
summaryCollection.createWatcher(summarizerClientId),
{
idleTime: 5000, // 5 sec (idle)
maxTime: 5000 * 12, // 1 min (active)
maxOps: 1000, // 1k ops (active)
maxAckWaitTime: 120000, // 2 min
},
// submitSummaryCallback
async (/* options */) => {
return {
stage: "base",
referenceSequenceNumber: 0,
error: undefined,
} as const;
},
new SummarizeHeuristicData(0, { refSequenceNumber: 0, summaryTime: Date.now() }),
() => { },
summaryCollection,
neverCancelledSummaryToken,
// stopSummarizerCallback
(reason) => { },
);
await Promise.all([
this.stopDeferred.promise,
this.runDeferred.promise,
]);
await runningSummarizer.waitStop(true);
this.state = "stopped";
return "summarizerClientDisconnected";
}
Expand Down Expand Up @@ -177,7 +221,7 @@ describe("Summary Manager", () => {
summaryManager.removeAllListeners();
connectedState.removeAllListeners();
throttler.delayMs = 0;
summaryCollection.opsSinceLastAck = 0;
mockDeltaManager.lastSequenceNumber = 0;
requestCalls = 0;
clock.reset();
});
Expand Down Expand Up @@ -254,7 +298,7 @@ describe("Summary Manager", () => {

describe("Start Summarizer Delay", () => {
it("Should wait for initial delay before first start", async () => {
summaryCollection.opsSinceLastAck = 999; // 999 < 1000, so do not bypass
mockDeltaManager.lastSequenceNumber = 999; // 999 < 1000, so do not bypass
createSummaryManager({
initialDelayMs: 2000,
opsToBypassInitialDelay: 1000,
Expand All @@ -275,7 +319,7 @@ describe("Summary Manager", () => {
});

it("Should bypass initial delay if enough ops have already passed", async () => {
summaryCollection.opsSinceLastAck = 1000; // 1000 >= 1000, so bypass
mockDeltaManager.lastSequenceNumber = 1000; // seq >= opsToBypass, so bypass
createSummaryManager({
initialDelayMs: 2000,
opsToBypassInitialDelay: 1000,
Expand All @@ -300,7 +344,7 @@ describe("Summary Manager", () => {
// make it work in main scenario, not some corner case that does not matter.
// Issue #7273 tracks making appropriate product and test change and re-enable the test.
it("Should bypass initial delay if enough ops pass later", async () => {
summaryCollection.opsSinceLastAck = 500; // 500 < 1000, so do not bypass yet
mockDeltaManager.lastSequenceNumber = 500;
createSummaryManager({
initialDelayMs: 2000,
opsToBypassInitialDelay: 1000,
Expand All @@ -312,13 +356,13 @@ describe("Summary Manager", () => {
clock.tick(1999);
await flushPromises();
assertRequests(0, "should not have requested summarizer yet");
summaryCollection.opsSinceLastAck = 999; // 999 < 1000, still do not bypass
fakeOpListener(); // Fire a fake "op" event
mockDeltaManager.lastSequenceNumber = 999; // seq < opsToBypass. No bypass.
mockDeltaManager.emit("op", summaryOp);
clientElection.electClient(thisClientId); // force trigger refresh
await flushPromises();
assertRequests(0, "still should not have requested summarizer yet");
summaryCollection.opsSinceLastAck = 1000; // 1000 >= 1000, so should bypass now
fakeOpListener(); // Fire a fake "op" event
mockDeltaManager.lastSequenceNumber = 1000; // Bypass now
mockDeltaManager.emit("op", summaryOp);
clientElection.electClient(thisClientId); // force trigger refresh
await flushPromises();
assertRequests(1, "should request summarizer, bypassing initial delay");
Expand All @@ -327,6 +371,28 @@ describe("Summary Manager", () => {
assertState(SummaryManagerState.Running, "summarizer should be running");
});

it("Should create last summary when summarizer created without delay, then disconnected", async () => {
throttler.delayMs = 0;
createSummaryManager({
opsToBypassInitialDelay: 0,
connected: false,
});
clientElection.electClient(thisClientId);
await flushPromises();
assertState(SummaryManagerState.Off, "not connected");
mockDeltaManager.lastSequenceNumber = 10001;
connectedState.connect();
await flushPromises();
assertState(SummaryManagerState.Starting, "Summarizer should be starting");
assertRequests(1, "Should begin without delay");
completeSummarizerRequest();
await flushPromises();
assertState(SummaryManagerState.Running, "Should be running");
connectedState.disconnect();
await flushPromises();
assertState(SummaryManagerState.Stopping, "Should be stopping");
});

it("Should wait for throttler delay before starting summarizer", async () => {
throttler.delayMs = 100;
createSummaryManager({
Expand Down

0 comments on commit ad3c834

Please sign in to comment.