Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use real RunningSummarizer in SummaryManager UT to test last summary #7762

Merged
merged 1 commit into from
Apr 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 84 additions & 17 deletions packages/runtime/container-runtime/src/test/summaryManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { strict as assert } from "assert";
import sinon from "sinon";
import { Deferred, TypedEventEmitter } from "@fluidframework/common-utils";
import { IFluidHandle, IFluidLoadable } from "@fluidframework/core-interfaces";
import { MessageType } from "@fluidframework/protocol-definitions";
import { MockLogger } from "@fluidframework/telemetry-utils";
import { MockDeltaManager } from "@fluidframework/test-runtime-utils";
import {
IConnectedEvents,
IConnectedState,
Expand All @@ -22,6 +24,10 @@ import {
SummarizerStopReason,
} from "../summarizerTypes";
import { ISummarizerClientElection, ISummarizerClientElectionEvents } from "../summarizerClientElection";
import { RunningSummarizer } from "../runningSummarizer";
import { SummarizeHeuristicData } from "../summarizerHeuristics";
import { SummaryCollection, ISummaryOpMessage } from "../summaryCollection";
import { neverCancelledSummaryToken } from "../runWhileConnectedCoordinator";

describe("Summary Manager", () => {
let clock: sinon.SinonFakeTimers;
Expand All @@ -30,18 +36,14 @@ describe("Summary Manager", () => {
const flushPromises = async () => new Promise((resolve) => process.nextTick(resolve));
const thisClientId = "this";
const mockLogger = new MockLogger();
const mockDeltaManager = new MockDeltaManager();
let summaryManager: SummaryManager;
let runningSummarizer: RunningSummarizer;
// let runCount: number;
const summarizerClientId = "test";

// Fake objects
let fakeOpListener;
const summaryCollection = {
opsSinceLastAck: 0,
addOpListener: (listener) => { fakeOpListener = listener; },
removeOpListener: (listener) => {
assert.strictEqual(fakeOpListener, listener, "Re-init of fakeOpListener?");
fakeOpListener = undefined;
},
};
const summaryCollection = new SummaryCollection(mockDeltaManager, mockLogger);
const throttler = {
delayMs: 0,
numAttempts: 0,
Expand All @@ -51,6 +53,23 @@ describe("Summary Manager", () => {
delayFn: () => 0,
};

const summaryOp: ISummaryOpMessage = {
clientId: "clientId",
clientSequenceNumber: 5,
minimumSequenceNumber: 5,
referenceSequenceNumber: 5,
sequenceNumber: 6,
term: 0,
timestamp: 6,
type: MessageType.Summarize,
contents: {
handle: "OpHandle",
head: "head",
message: "message",
parents: ["parents"],
},
};

class TestConnectedState extends TypedEventEmitter<IConnectedEvents> implements IConnectedState {
public connected = false;
public clientId: string | undefined;
Expand Down Expand Up @@ -90,10 +109,36 @@ describe("Summary Manager", () => {
public async run(onBehalfOf: string): Promise<SummarizerStopReason> {
this.onBehalfOf = onBehalfOf;
this.state = "running";
runningSummarizer = await RunningSummarizer.start(
mockLogger,
summaryCollection.createWatcher(summarizerClientId),
{
idleTime: 5000, // 5 sec (idle)
maxTime: 5000 * 12, // 1 min (active)
maxOps: 1000, // 1k ops (active)
maxAckWaitTime: 120000, // 2 min
},
// submitSummaryCallback
async (options) => {
return {
stage: "base",
minimumSequenceNumber: 0,
referenceSequenceNumber: 0,
error: undefined,
} as const;
},
new SummarizeHeuristicData(0, { refSequenceNumber: 0, summaryTime: Date.now() }),
() => { },
summaryCollection,
neverCancelledSummaryToken,
// stopSummarizerCallback
(reason) => { },
);
await Promise.all([
this.stopDeferred.promise,
this.runDeferred.promise,
]);
await runningSummarizer.waitStop(true);
this.state = "stopped";
return "summarizerClientDisconnected";
}
Expand Down Expand Up @@ -175,7 +220,7 @@ describe("Summary Manager", () => {
summarizer.removeAllListeners();
connectedState.removeAllListeners();
throttler.delayMs = 0;
summaryCollection.opsSinceLastAck = 0;
mockDeltaManager.lastSequenceNumber = 0;
requestCalls = 0;
clock.reset();
});
Expand Down Expand Up @@ -252,7 +297,7 @@ describe("Summary Manager", () => {

describe("Start Summarizer Delay", () => {
it("Should wait for initial delay before first start", async () => {
summaryCollection.opsSinceLastAck = 999; // 999 < 1000, so do not bypass
mockDeltaManager.lastSequenceNumber = 999; // 999 < 1000, so do not bypass
createSummaryManager({
initialDelayMs: 2000,
opsToBypassInitialDelay: 1000,
Expand All @@ -273,7 +318,7 @@ describe("Summary Manager", () => {
});

it("Should bypass initial delay if enough ops have already passed", async () => {
summaryCollection.opsSinceLastAck = 1000; // 1000 >= 1000, so bypass
mockDeltaManager.lastSequenceNumber = 1000; // seq >= opsToBypass, so bypass
createSummaryManager({
initialDelayMs: 2000,
opsToBypassInitialDelay: 1000,
Expand All @@ -298,7 +343,7 @@ describe("Summary Manager", () => {
// make it work in main scenario, not some corner case that does not matter.
// Issue #7273 tracks making appropriate product and test change and re-enable the test.
it("Should bypass initial delay if enough ops pass later", async () => {
summaryCollection.opsSinceLastAck = 500; // 500 < 1000, so do not bypass yet
mockDeltaManager.lastSequenceNumber = 500;
createSummaryManager({
initialDelayMs: 2000,
opsToBypassInitialDelay: 1000,
Expand All @@ -310,13 +355,13 @@ describe("Summary Manager", () => {
clock.tick(1999);
await flushPromises();
assertRequests(0, "should not have requested summarizer yet");
summaryCollection.opsSinceLastAck = 999; // 999 < 1000, still do not bypass
fakeOpListener(); // Fire a fake "op" event
mockDeltaManager.lastSequenceNumber = 999; // seq < opsToBypass. No bypass.
mockDeltaManager.emit("op", summaryOp);
clientElection.electClient(thisClientId); // force trigger refresh
await flushPromises();
assertRequests(0, "still should not have requested summarizer yet");
summaryCollection.opsSinceLastAck = 1000; // 1000 >= 1000, so should bypass now
fakeOpListener(); // Fire a fake "op" event
mockDeltaManager.lastSequenceNumber = 1000; // Bypass now
mockDeltaManager.emit("op", summaryOp);
clientElection.electClient(thisClientId); // force trigger refresh
await flushPromises();
assertRequests(1, "should request summarizer, bypassing initial delay");
Expand All @@ -325,6 +370,28 @@ describe("Summary Manager", () => {
assertState(SummaryManagerState.Running, "summarizer should be running");
});

it("Should create last summary when summarizer created without delay, then disconnected", async () => {
throttler.delayMs = 0;
createSummaryManager({
opsToBypassInitialDelay: 0,
connected: false,
});
clientElection.electClient(thisClientId);
await flushPromises();
assertState(SummaryManagerState.Off, "not connected");
mockDeltaManager.lastSequenceNumber = 10001;
connectedState.connect();
await flushPromises();
assertState(SummaryManagerState.Running, "Summarizer should be starting");
assertRequests(1, "Should begin without delay");
completeSummarizerRequest();
await flushPromises();
assertState(SummaryManagerState.Running, "Should be running");
connectedState.disconnect();
await flushPromises();
assertState(SummaryManagerState.Stopping, "Should be stopping");
});

it("Should wait for throttler delay before starting summarizer", async () => {
throttler.delayMs = 100;
createSummaryManager({
Expand Down