diff --git a/packages/loader/container-loader/src/serializedStateManager.ts b/packages/loader/container-loader/src/serializedStateManager.ts index 89f930baeba3..15c51126a85e 100644 --- a/packages/loader/container-loader/src/serializedStateManager.ts +++ b/packages/loader/container-loader/src/serializedStateManager.ts @@ -387,8 +387,12 @@ export class SerializedStateManager { this.mc.logger, { eventName: "getPendingLocalState", - notifyImminentClosure: props.notifyImminentClosure, - processedOpsSize: this.processedOps.length, + details: { + notifyImminentClosure: props.notifyImminentClosure, + sessionExpiryTimerStarted: props.sessionExpiryTimerStarted, + snapshotSequenceNumber: props.snapshotSequenceNumber, + processedOpsSize: this.processedOps.length, + }, clientId, }, async () => { diff --git a/packages/test/test-end-to-end-tests/src/test/stashedOps.spec.ts b/packages/test/test-end-to-end-tests/src/test/stashedOps.spec.ts index 944f2ff74e62..d72927e6d39a 100644 --- a/packages/test/test-end-to-end-tests/src/test/stashedOps.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/stashedOps.spec.ts @@ -88,6 +88,144 @@ type SharedObjCallback = ( dataStore: ITestFluidObject, ) => void | Promise; +/** + * load container, pause, create (local) ops from callback, then optionally send ops before closing container + */ +const getPendingOps = async ( + testContainerConfig: ITestContainerConfig, + testObjectProvider: ITestObjectProvider, + send: boolean, + cb: SharedObjCallback = () => undefined, +) => { + const container: IContainerExperimental = + await testObjectProvider.loadTestContainer(testContainerConfig); + await waitForContainerConnection(container); + const dataStore = (await container.getEntryPoint()) as ITestFluidObject; + + [...Array(lots).keys()].map((i) => + dataStore.root.set(`make sure csn is > 1 so it doesn't hide bugs ${i}`, i), + ); + + await testObjectProvider.ensureSynchronized(); + await testObjectProvider.opProcessingController.pauseProcessing(container); + assert(toDeltaManagerInternal(dataStore.runtime.deltaManager).outbound.paused); + + await cb(container, dataStore); + + let pendingState: string | undefined; + if (send) { + pendingState = await container.getPendingLocalState?.(); + await testObjectProvider.ensureSynchronized(); // Note: This will resume processing to get synchronized + container.close(); + } else { + pendingState = await container.closeAndGetPendingLocalState?.(); + } + + testObjectProvider.opProcessingController.resumeProcessing(); + + assert.ok(pendingState); + return pendingState; +}; + +/** + * Load a Container using testContainerConfig and the given testObjectProvider, + * Deferring connection to the service until the returned connect function is called + * (simulating returning from offline) + * + * @param testObjectProvider - For accessing Loader/Driver + * @param request - Request to use when loading + * @param pendingLocalState - (Optional) custom PendingLocalState to load from. Defaults to using getPendingOps helper if omitted. + * @returns A container instance with a connect function to unblock the Driver (simulating coming back from offline) + */ +async function loadOffline( + testContainerConfig: ITestContainerConfig, + testObjectProvider: ITestObjectProvider, + request: IRequest, + pendingLocalState?: string, +): Promise<{ container: IContainerExperimental; connect: () => void }> { + const p = new Deferred(); + // This documentServiceFactory will wait for the promise p to resolve before connecting to the service + const documentServiceFactory = wrapObjectAndOverride( + testObjectProvider.documentServiceFactory, + { + createDocumentService: { + connectToDeltaStream: (ds) => async (client) => { + await p.promise; + return ds.connectToDeltaStream(client); + }, + connectToDeltaStorage: (ds) => async () => { + await p.promise; + return ds.connectToDeltaStorage(); + }, + connectToStorage: (ds) => async () => { + await p.promise; + return ds.connectToStorage(); + }, + }, + }, + ); + + const loader = testObjectProvider.createLoader( + [ + [ + testObjectProvider.defaultCodeDetails, + testObjectProvider.createFluidEntryPoint(testContainerConfig), + ], + ], + { ...testContainerConfig.loaderProps, documentServiceFactory }, + ); + const container = await loader.resolve( + request, + pendingLocalState ?? + (await getPendingOps(testContainerConfig, testObjectProvider, false /* send */)), + ); + return { container, connect: () => p.resolve(undefined) }; +} + +const assertIntervals = ( + sharedString: SharedString, + intervalCollection: IIntervalCollection, + expected: readonly { start: number; end: number }[], + validateOverlapping: boolean = true, +) => { + const actual = Array.from(intervalCollection); + if (validateOverlapping && sharedString.getLength() > 0) { + const overlapping = intervalCollection.findOverlappingIntervals( + 0, + sharedString.getLength() - 1, + ); + assert.deepEqual(actual, overlapping, "Interval search returned inconsistent results"); + } + assert.strictEqual( + actual.length, + expected.length, + `findOverlappingIntervals() must return the expected number of intervals`, + ); + + const actualPos = actual.map((interval) => { + assert(interval); + const start = sharedString.localReferencePositionToPosition(interval.start); + const end = sharedString.localReferencePositionToPosition(interval.end); + return { start, end }; + }); + assert.deepEqual(actualPos, expected, "intervals are not as expected"); +}; + +/** Returns a new promise that will resolve once we process a summary op followed by a summary ack */ +const waitForSummary = async (container: IContainer) => + new Promise((resolve, reject) => { + let summarized = false; + container.on("op", (op) => { + if (op.type === "summarize") { + summarized = true; + } else if (summarized && op.type === "summaryAck") { + resolve(); + } else if (op.type === "summaryNack") { + reject(new Error("summaryNack")); + } + }); + }); + // Introduced in 0.37 // REVIEW: enable compat testing describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { @@ -136,113 +274,6 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }, }; - // load container, pause, create (local) ops from callback, then optionally send ops before closing container - const getPendingOps = async ( - args: ITestObjectProvider, - send: boolean, - cb: SharedObjCallback = () => undefined, - ) => { - const container: IContainerExperimental = await args.loadTestContainer(testContainerConfig); - await waitForContainerConnection(container); - const dataStore = (await container.getEntryPoint()) as ITestFluidObject; - - [...Array(lots).keys()].map((i) => - dataStore.root.set(`make sure csn is > 1 so it doesn't hide bugs ${i}`, i), - ); - - await args.ensureSynchronized(); - await args.opProcessingController.pauseProcessing(container); - assert(toDeltaManagerInternal(dataStore.runtime.deltaManager).outbound.paused); - - await cb(container, dataStore); - - let pendingState: string | undefined; - if (send) { - pendingState = await container.getPendingLocalState?.(); - await args.ensureSynchronized(); - container.close(); - } else { - pendingState = await container.closeAndGetPendingLocalState?.(); - } - - args.opProcessingController.resumeProcessing(); - - assert.ok(pendingState); - return pendingState; - }; - - const assertIntervals = ( - sharedString: SharedString, - intervalCollection: IIntervalCollection, - expected: readonly { start: number; end: number }[], - validateOverlapping: boolean = true, - ) => { - const actual = Array.from(intervalCollection); - if (validateOverlapping && sharedString.getLength() > 0) { - const overlapping = intervalCollection.findOverlappingIntervals( - 0, - sharedString.getLength() - 1, - ); - assert.deepEqual(actual, overlapping, "Interval search returned inconsistent results"); - } - assert.strictEqual( - actual.length, - expected.length, - `findOverlappingIntervals() must return the expected number of intervals`, - ); - - const actualPos = actual.map((interval) => { - assert(interval); - const start = sharedString.localReferencePositionToPosition(interval.start); - const end = sharedString.localReferencePositionToPosition(interval.end); - return { start, end }; - }); - assert.deepEqual(actualPos, expected, "intervals are not as expected"); - }; - - async function loadOffline( - testObjectProvider: ITestObjectProvider, - request: IRequest, - pendingLocalState?: string, - ): Promise<{ container: IContainerExperimental; connect: () => void }> { - const p = new Deferred(); - const documentServiceFactory = wrapObjectAndOverride( - provider.documentServiceFactory, - { - createDocumentService: { - connectToDeltaStream: (ds) => async (client) => { - await p.promise; - return ds.connectToDeltaStream(client); - }, - connectToDeltaStorage: (ds) => async () => { - await p.promise; - return ds.connectToDeltaStorage(); - }, - connectToStorage: (ds) => async () => { - await p.promise; - return ds.connectToStorage(); - }, - }, - }, - ); - - // eslint-disable-next-line @typescript-eslint/no-shadow - const loader = testObjectProvider.createLoader( - [ - [ - testObjectProvider.defaultCodeDetails, - testObjectProvider.createFluidEntryPoint(testContainerConfig), - ], - ], - { ...testContainerConfig.loaderProps, documentServiceFactory }, - ); - const container = await loader.resolve( - request, - pendingLocalState ?? (await getPendingOps(testObjectProvider, false)), - ); - return { container, connect: () => p.resolve(undefined) }; - } - const sf = new SchemaFactory("stashedTests"); class Root extends sf.object("Root", { @@ -287,7 +318,6 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { let counter1: SharedCounter; let directory1: ISharedDirectory; let collection1: IIntervalCollection; - let waitForSummary: () => Promise; beforeEach("setup", async () => { provider = getTestObjectProvider(); @@ -307,51 +337,41 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { string1 = await dataStore1.getSharedObject(stringId); collection1 = string1.getIntervalCollection(collectionId); string1.insertText(0, "hello"); - - waitForSummary = async () => { - await new Promise((resolve, reject) => { - let summarized = false; - container1.on("op", (op) => { - if (op.type === "summarize") { - summarized = true; - } else if (summarized && op.type === "summaryAck") { - resolve(); - } else if (op.type === "summaryNack") { - reject(new Error("summaryNack")); - } - }); - }); - }; }); it("resends op", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - map.set(testKey, testValue); - const cell = await d.getSharedObject(cellId); - cell.set(testValue); - const counter = await d.getSharedObject(counterId); - counter.increment(testIncrementValue); - const directory = await d.getSharedObject(directoryId); - directory.set(testKey, testValue); - const string = await d.getSharedObject(stringId); - const collection = string.getIntervalCollection(collectionId); - collection.add({ start: testStart, end: testEnd }); - // Submit a message with an unrecognized type - // Super rare corner case where you stash an op and then roll back to a previous runtime version that doesn't recognize it - ( - d.context.containerRuntime as unknown as { - submit: ( - containerRuntimeMessage: RecentlyAddedContainerRuntimeMessageDetails & - Record, - ) => void; - } - ).submit({ - type: "FROM_THE_FUTURE", - contents: "Hello", - compatDetails: { behavior: "Ignore" }, - }); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance + async (c, d) => { + const map = await d.getSharedObject(mapId); + map.set(testKey, testValue); + const cell = await d.getSharedObject(cellId); + cell.set(testValue); + const counter = await d.getSharedObject(counterId); + counter.increment(testIncrementValue); + const directory = await d.getSharedObject(directoryId); + directory.set(testKey, testValue); + const string = await d.getSharedObject(stringId); + const collection = string.getIntervalCollection(collectionId); + collection.add({ start: testStart, end: testEnd }); + // Submit a message with an unrecognized type + // Super rare corner case where you stash an op and then roll back to a previous runtime version that doesn't recognize it + ( + d.context.containerRuntime as unknown as { + submit: ( + containerRuntimeMessage: RecentlyAddedContainerRuntimeMessageDetails & + Record, + ) => void; + } + ).submit({ + type: "FROM_THE_FUTURE", + contents: "Hello", + compatDetails: { behavior: "Ignore" }, + }); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -387,28 +407,37 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { let sessionId; - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - assert((map as any).runtime.idCompressor !== undefined); - mapCompressedId = (map as any).runtime.idCompressor.generateCompressedId(); - mapDecompressedId = (map as any).runtime.idCompressor.decompress(mapCompressedId); - map.set(mapDecompressedId, testValue); - const cell = await d.getSharedObject(cellId); - assert((cell as any).runtime.idCompressor !== undefined); - cellCompressedId = (cell as any).runtime.idCompressor.generateCompressedId(); - cellDecompressedId = (cell as any).runtime.idCompressor.decompress(cellCompressedId); - cell.set(cellDecompressedId); - const directory = await d.getSharedObject(directoryId); - assert((directory as any).runtime.idCompressor !== undefined); - directoryCompressedId = (directory as any).runtime.idCompressor.generateCompressedId(); - directoryDecompressedId = (directory as any).runtime.idCompressor.decompress( - directoryCompressedId, - ); - directory.set(directoryDecompressedId, testValue); - - // All will have the same sessionId, it doesn't matter which DDS I use - sessionId = (map as any).runtime.idCompressor.localSessionId; - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance + async (c, d) => { + const map = await d.getSharedObject(mapId); + assert((map as any).runtime.idCompressor !== undefined); + mapCompressedId = (map as any).runtime.idCompressor.generateCompressedId(); + mapDecompressedId = (map as any).runtime.idCompressor.decompress(mapCompressedId); + map.set(mapDecompressedId, testValue); + const cell = await d.getSharedObject(cellId); + assert((cell as any).runtime.idCompressor !== undefined); + cellCompressedId = (cell as any).runtime.idCompressor.generateCompressedId(); + cellDecompressedId = (cell as any).runtime.idCompressor.decompress( + cellCompressedId, + ); + cell.set(cellDecompressedId); + const directory = await d.getSharedObject(directoryId); + assert((directory as any).runtime.idCompressor !== undefined); + directoryCompressedId = ( + directory as any + ).runtime.idCompressor.generateCompressedId(); + directoryDecompressedId = (directory as any).runtime.idCompressor.decompress( + directoryCompressedId, + ); + directory.set(directoryDecompressedId, testValue); + + // All will have the same sessionId, it doesn't matter which DDS I use + sessionId = (map as any).runtime.idCompressor.localSessionId; + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -454,16 +483,21 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("connects in write mode and resends op when loaded with no delta connection", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - map.set(testKey, testValue); - const cell = await d.getSharedObject(cellId); - cell.set(testValue); - const counter = await d.getSharedObject(counterId); - counter.increment(testIncrementValue); - const directory = await d.getSharedObject(directoryId); - directory.set(testKey, testValue); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance + async (c, d) => { + const map = await d.getSharedObject(mapId); + map.set(testKey, testValue); + const cell = await d.getSharedObject(cellId); + cell.set(testValue); + const counter = await d.getSharedObject(counterId); + counter.increment(testIncrementValue); + const directory = await d.getSharedObject(directoryId); + directory.set(testKey, testValue); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const headers: IRequestHeader = { [LoaderHeader.loadMode]: { deltaConnection: "none" } }; @@ -492,16 +526,21 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { ].forEach(({ name, getMap }) => { it(`doesn't resend successful op (${name})`, async function () { const map = await getMapFromProvider(getMap); - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const mapPre = await getMap(d); - mapPre.set(testKey, "something unimportant"); - const cell = await d.getSharedObject(cellId); - cell.set("something unimportant"); - const counter = await d.getSharedObject(counterId); - counter.increment(3); - const directory = await d.getSharedObject(directoryId); - directory.set(testKey, "I will be erased"); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const mapPre = await getMap(d); + mapPre.set(testKey, "something unimportant"); + const cell = await d.getSharedObject(cellId); + cell.set("something unimportant"); + const counter = await d.getSharedObject(counterId); + counter.increment(3); + const directory = await d.getSharedObject(directoryId); + directory.set(testKey, "I will be erased"); + }, + ); map.set(testKey, testValue); cell1.set(testValue); @@ -530,10 +569,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it(`resends delete op and can set after (${name})`, async function () { const map = await getMapFromProvider(getMap); - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const mapPre = await getMap(d); - mapPre.delete("clear"); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const mapPre = await getMap(d); + mapPre.delete("clear"); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -551,10 +595,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it(`resends a lot of ops (${name})`, async function () { const map = await getMapFromProvider(getMap); - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const mapPre = await getMap(d); - [...Array(lots).keys()].map((i) => mapPre.set(i.toString(), i.toString())); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const mapPre = await getMap(d); + [...Array(lots).keys()].map((i) => mapPre.set(i.toString(), i.toString())); + }, + ); // load container with pending ops, which should resend the ops not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -580,10 +629,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it(`doesn't resend a lot of successful ops (${name})`, async function () { const map = await getMapFromProvider(getMap); - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const mapPre = await getMap(d); - [...Array(lots).keys()].map((i) => map.set(i.toString(), i.toString())); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const mapPre = await getMap(d); + [...Array(lots).keys()].map((i) => map.set(i.toString(), i.toString())); + }, + ); // send a bunch from first container that should not be overwritten [...Array(lots).keys()].map((i) => map.set(i.toString(), testValue)); @@ -605,15 +659,20 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("resends all shared directory ops", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const directory = await d.getSharedObject(directoryId); - directory.set("key1", "value1"); - directory.set("key2", "value2"); - directory.createSubDirectory("subdir1"); - directory.createSubDirectory("subdir2"); - directory.delete("key2"); - directory.deleteSubDirectory("subdir2"); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const directory = await d.getSharedObject(directoryId); + directory.set("key1", "value1"); + directory.set("key2", "value2"); + directory.createSubDirectory("subdir1"); + directory.createSubDirectory("subdir2"); + directory.delete("key2"); + directory.deleteSubDirectory("subdir2"); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -632,12 +691,17 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("resends batched ops", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - (c as any).runtime.orderSequentially(() => { - [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); - }); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + (c as any).runtime.orderSequentially(() => { + [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); + }); + }, + ); // load container with pending ops, which should resend the ops not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -662,12 +726,17 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("doesn't resend successful batched ops", async function () { - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const map = await d.getSharedObject(mapId); - (c as any).runtime.orderSequentially(() => { - [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); - }); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + (c as any).runtime.orderSequentially(() => { + [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); + }); + }, + ); // send a bunch from first container that should not be overwritten [...Array(lots).keys()].map((i) => map1.set(i.toString(), testValue)); @@ -684,10 +753,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it("resends chunked op", async function () { const bigString = "a".repeat(container1.deltaManager.maxMessageSize); - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - map.set(testKey, bigString); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + map.set(testKey, bigString); + }, + ); // load container with pending ops, which should resend the ops not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -710,11 +784,16 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it("doesn't resend successful chunked op", async function () { const bigString = "a".repeat(container1.deltaManager.maxMessageSize); - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const map = await d.getSharedObject(mapId); - map.set(testKey, bigString); - map.set(testKey2, bigString); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + map.set(testKey, bigString); + map.set(testKey2, bigString); + }, + ); // set on first container which should not be overwritten map1.set(testKey, testValue); @@ -735,10 +814,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { [...Array(lots).keys()].map((i) => map1.set(i.toString(), testValue)); await provider.ensureSynchronized(); - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - map.clear(); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + map.clear(); + }, + ); const container2 = await loader.resolve({ url }, pendingOps); const dataStore2 = (await container2.getEntryPoint()) as ITestFluidObject; @@ -754,10 +838,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("successful map clear no resend", async function () { - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const map = await d.getSharedObject(mapId); - map.clear(); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + map.clear(); + }, + ); [...Array(lots).keys()].map((i) => map1.set(i.toString(), testValue)); await provider.ensureSynchronized(); @@ -780,10 +869,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("resends string insert op", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const s = await d.getSharedObject(stringId); - s.insertText(s.getLength(), " world!"); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const s = await d.getSharedObject(stringId); + s.insertText(s.getLength(), " world!"); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -796,10 +890,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("doesn't resend successful string insert op", async function () { - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const s = await d.getSharedObject(stringId); - s.insertText(s.getLength(), " world!"); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const s = await d.getSharedObject(stringId); + s.insertText(s.getLength(), " world!"); + }, + ); // load with pending ops, which it should not resend because they were already sent successfully const container2 = await loader.resolve({ url }, pendingOps); @@ -812,10 +911,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("resends string remove op", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const s = await d.getSharedObject(stringId); - s.removeText(0, s.getLength()); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const s = await d.getSharedObject(stringId); + s.removeText(0, s.getLength()); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -828,10 +932,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("doesn't resend successful string remove op", async function () { - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const s = await d.getSharedObject(stringId); - s.removeText(0, s.getLength()); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const s = await d.getSharedObject(stringId); + s.removeText(0, s.getLength()); + }, + ); string1.insertText(0, "goodbye cruel world"); @@ -846,10 +955,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("resends string annotate op", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const s = await d.getSharedObject(stringId); - s.annotateRange(0, s.getLength(), { bold: true }); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const s = await d.getSharedObject(stringId); + s.annotateRange(0, s.getLength(), { bold: true }); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -862,10 +976,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("doesn't resend successful string annotate op", async function () { - const pendingOps = await getPendingOps(provider, true, async (c, d) => { - const s = await d.getSharedObject(stringId); - s.annotateRange(0, s.getLength(), { bold: true }); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (c, d) => { + const s = await d.getSharedObject(stringId); + s.annotateRange(0, s.getLength(), { bold: true }); + }, + ); // change annotation, which should not be overwritten by successful stashed ops string1.annotateRange(0, string1.getLength(), { bold: false }); @@ -880,18 +999,23 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("resends marker ops", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const s = await d.getSharedObject(stringId); - s.insertMarker(s.getLength(), ReferenceType.Simple, { - [reservedMarkerIdKey]: "markerId", - [reservedMarkerSimpleTypeKey]: "markerKeyValue", - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const s = await d.getSharedObject(stringId); + s.insertMarker(s.getLength(), ReferenceType.Simple, { + [reservedMarkerIdKey]: "markerId", + [reservedMarkerSimpleTypeKey]: "markerKeyValue", + }); - s.insertMarker(0, ReferenceType.Tile, { - [reservedTileLabelsKey]: ["tileLabel"], - [reservedMarkerIdKey]: "tileMarkerId", - }); - }); + s.insertMarker(0, ReferenceType.Tile, { + [reservedTileLabelsKey]: ["tileLabel"], + [reservedMarkerIdKey]: "tileMarkerId", + }); + }, + ); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps); @@ -939,24 +1063,29 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it("resends attach op", async function () { const newMapId = "newMap"; let id; - const pendingOps = await getPendingOps(provider, false, async (container, d) => { - const defaultDataStore = (await container.getEntryPoint()) as ITestFluidObject; - const runtime = defaultDataStore.context.containerRuntime; - - const createdDataStore = await runtime.createDataStore(["default"]); - const dataStore = (await createdDataStore.entryPoint.get()) as ITestFluidObject; - id = dataStore.context.id; - - const channel = dataStore.runtime.createChannel( - newMapId, - "https://graph.microsoft.com/types/map", - ); - assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); - - ((await channel.handle.get()) as SharedObject).bindToContext(); - defaultDataStore.root.set("someDataStore", dataStore.handle); - (channel as ISharedMap).set(testKey, testValue); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (container, d) => { + const defaultDataStore = (await container.getEntryPoint()) as ITestFluidObject; + const runtime = defaultDataStore.context.containerRuntime; + + const createdDataStore = await runtime.createDataStore(["default"]); + const dataStore = (await createdDataStore.entryPoint.get()) as ITestFluidObject; + id = dataStore.context.id; + + const channel = dataStore.runtime.createChannel( + newMapId, + "https://graph.microsoft.com/types/map", + ); + assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); + + ((await channel.handle.get()) as SharedObject).bindToContext(); + defaultDataStore.root.set("someDataStore", dataStore.handle); + (channel as ISharedMap).set(testKey, testValue); + }, + ); const container2 = await loader.resolve({ url }, pendingOps); await waitForContainerConnection(container2); @@ -974,23 +1103,28 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it("doesn't resend successful attach op", async function () { const newMapId = "newMap"; - const pendingOps = await getPendingOps(provider, true, async (container, d) => { - const defaultDataStore = (await container.getEntryPoint()) as ITestFluidObject; - const runtime = defaultDataStore.context.containerRuntime; - - const createdDataStore = await runtime.createDataStore(["default"]); - const dataStore = (await createdDataStore.entryPoint.get()) as ITestFluidObject; - - const channel = dataStore.runtime.createChannel( - newMapId, - "https://graph.microsoft.com/types/map", - ); - assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); - - ((await channel.handle.get()) as SharedObject).bindToContext(); - defaultDataStore.root.set("someDataStore", dataStore.handle); - (channel as ISharedMap).set(testKey, testValue); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + true, // Do send ops from first container instance before closing + async (container, d) => { + const defaultDataStore = (await container.getEntryPoint()) as ITestFluidObject; + const runtime = defaultDataStore.context.containerRuntime; + + const createdDataStore = await runtime.createDataStore(["default"]); + const dataStore = (await createdDataStore.entryPoint.get()) as ITestFluidObject; + + const channel = dataStore.runtime.createChannel( + newMapId, + "https://graph.microsoft.com/types/map", + ); + assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); + + ((await channel.handle.get()) as SharedObject).bindToContext(); + defaultDataStore.root.set("someDataStore", dataStore.handle); + (channel as ISharedMap).set(testKey, testValue); + }, + ); const container2 = await loader.resolve({ url }, pendingOps); await waitForContainerConnection(container2); @@ -998,17 +1132,22 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { it("resends DDS attach op", async function () { const newMapId = "newMap"; - const pendingOps = await getPendingOps(provider, false, async (_, dataStore) => { - const channel = dataStore.runtime.createChannel( - newMapId, - "https://graph.microsoft.com/types/map", - ); - assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); - - ((await channel.handle.get()) as SharedObject).bindToContext(); - assert.strictEqual(channel.handle.isAttached, true, "Channel should be attached"); - (channel as ISharedMap).set(testKey, testValue); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (_, dataStore) => { + const channel = dataStore.runtime.createChannel( + newMapId, + "https://graph.microsoft.com/types/map", + ); + assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); + + ((await channel.handle.get()) as SharedObject).bindToContext(); + assert.strictEqual(channel.handle.isAttached, true, "Channel should be attached"); + (channel as ISharedMap).set(testKey, testValue); + }, + ); const container2 = await loader.resolve({ url }, pendingOps); await waitForContainerConnection(container2); @@ -1081,15 +1220,12 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { // generate local op assert.strictEqual(string.getText(), "hello"); - string.insertText(5, "; long amount of text that will produce a high index"); + string.insertText(5, " / First op"); // op is submitted on top of first op at some later time (not in the same JS turn, so not batched) await Promise.resolve(); - string.insertText(string.getLength(), ", for testing purposes"); - assert.strictEqual( - string.getText(), - "hello; long amount of text that will produce a high index, for testing purposes", - ); + string.insertText(string.getLength(), " / Second op"); + assert.strictEqual(string.getText(), "hello / First op / Second op"); const stashP = new Promise((resolve) => { container.on("op", (op) => { @@ -1106,19 +1242,25 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); }); provider.opProcessingController.resumeProcessing(container); - const stashedOps = await stashP; - - // when this container tries to apply the second op, it will not have replayed the first - // op yet, because the reference sequence number of the second op is lower than the sequence number - // of the first op - const container2 = await loader.resolve({ url }, stashedOps); + const pendingLocalState = await stashP; + + // Op stream [client ID] at this point -- These are in "savedOps" in the pendingLocalState + // 1: Join op [A] + // 2: "hello" (from test setup) [A] + // 3: Join op [B] + // 4: " / First op" [B] + // + // Stashed Ops (ref seq num is 3) -- These are in ContainerRuntime's PendingStateManager.initialMessages + // 4: "First op" [B] + // _: " / Second op" [B] + + // This container will have to replay the first op even though it was already sequenced, + // since both ops' reference sequence number is lower than the first op's sequence number. + const container2 = await loader.resolve({ url }, pendingLocalState); const defaultDataStore2 = (await container2.getEntryPoint()) as ITestFluidObject; const string2 = await defaultDataStore2.getSharedObject(stringId); await waitForContainerConnection(container2); - assert.strictEqual( - string2.getText(), - "hello; long amount of text that will produce a high index, for testing purposes", - ); + assert.strictEqual(string2.getText(), "hello / First op / Second op"); await provider.ensureSynchronized(); assert.strictEqual(string2.getText(), string1.getText()); }); @@ -1170,12 +1312,17 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { ); it("can make changes offline and resubmit them", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); + }, + ); - const container2 = await loadOffline(provider, { url }, pendingOps); + const container2 = await loadOffline(testContainerConfig, provider, { url }, pendingOps); const dataStore2 = (await container2.container.getEntryPoint()) as ITestFluidObject; const map2 = await dataStore2.getSharedObject(mapId); @@ -1210,10 +1357,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("fails when session time expires using stashed time", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); + }, + ); const pendingState = JSON.parse(pendingOps); assert.ok(pendingState.pendingRuntimeState.sessionExpiryTimerStarted); pendingState.pendingRuntimeState.sessionExpiryTimerStarted = 1; @@ -1225,12 +1377,17 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("can make changes offline and stash them", async function () { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); + }, + ); - const container2 = await loadOffline(provider, { url }, pendingOps); + const container2 = await loadOffline(testContainerConfig, provider, { url }, pendingOps); const dataStore2 = (await container2.container.getEntryPoint()) as ITestFluidObject; const map2 = await dataStore2.getSharedObject(mapId); @@ -1248,7 +1405,12 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { // get stashed ops from this container without connecting const morePendingOps = await container2.container.closeAndGetPendingLocalState?.(); - const container3 = await loadOffline(provider, { url }, morePendingOps); + const container3 = await loadOffline( + testContainerConfig, + provider, + { url }, + morePendingOps, + ); const dataStore3 = (await container3.container.getEntryPoint()) as ITestFluidObject; const map3 = await dataStore3.getSharedObject(mapId); @@ -1289,10 +1451,15 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { { eventName: "fluid:telemetry:Container:WaitBeforeClientLeave_end" }, ], async () => { - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + [...Array(lots).keys()].map((i) => map.set(i.toString(), i)); + }, + ); const container2: IContainerExperimental = await loader.resolve({ url }, pendingOps); const dataStore2 = (await container2.getEntryPoint()) as ITestFluidObject; @@ -1346,7 +1513,7 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { ); it("offline blob upload", async function () { - const container = await loadOffline(provider, { url }); + const container = await loadOffline(testContainerConfig, provider, { url }); const dataStore = (await container.container.getEntryPoint()) as ITestFluidObject; const map = await dataStore.getSharedObject(mapId); @@ -1455,7 +1622,7 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { ["local"], async function () { // upload blob offline so an entry is added to redirect table - const container = await loadOffline(provider, { url }); + const container = await loadOffline(testContainerConfig, provider, { url }); const dataStore = (await container.container.getEntryPoint()) as ITestFluidObject; const map = await dataStore.getSharedObject(mapId); @@ -1467,16 +1634,16 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { // wait for summary with redirect table await provider.ensureSynchronized(); - await waitForSummary(); + await waitForSummary(container1); // should be able to load entirely offline - const stashBlob = await getPendingOps(provider, true); - await loadOffline(provider, { url }, stashBlob); + const stashBlob = await getPendingOps(testContainerConfig, provider, true); + await loadOffline(testContainerConfig, provider, { url }, stashBlob); }, ); it("load offline from stashed ops with pending blob", async function () { - const container = await loadOffline(provider, { url }); + const container = await loadOffline(testContainerConfig, provider, { url }); const dataStore = (await container.container.getEntryPoint()) as ITestFluidObject; const map = await dataStore.getSharedObject(mapId); @@ -1488,7 +1655,12 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { const stashedChanges = await stashedChangesP; - const container3 = await loadOffline(provider, { url }, stashedChanges); + const container3 = await loadOffline( + testContainerConfig, + provider, + { url }, + stashedChanges, + ); const dataStore3 = (await container3.container.getEntryPoint()) as ITestFluidObject; const map3 = await dataStore3.getSharedObject(mapId); @@ -1512,7 +1684,7 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { }); it("stashed changes with blobs", async function () { - const container = await loadOffline(provider, { url }); + const container = await loadOffline(testContainerConfig, provider, { url }); const dataStore = (await container.container.getEntryPoint()) as ITestFluidObject; const map = await dataStore.getSharedObject(mapId); @@ -1545,27 +1717,32 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { const newMapId = "newMap"; let id; // stash attach op - const pendingOps = await getPendingOps(provider, false, async (container, d) => { - const defaultDataStore = (await container.getEntryPoint()) as ITestFluidObject; - const runtime = defaultDataStore.context.containerRuntime; - - const createdDataStore = await runtime.createDataStore(["default"]); - const dataStore = (await createdDataStore.entryPoint.get()) as ITestFluidObject; - id = dataStore.context.id; - - const channel = dataStore.runtime.createChannel( - newMapId, - "https://graph.microsoft.com/types/map", - ); - assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); - - ((await channel.handle.get()) as SharedObject).bindToContext(); - defaultDataStore.root.set("someDataStore", dataStore.handle); - (channel as ISharedMap).set(testKey, testValue); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (container, d) => { + const defaultDataStore = (await container.getEntryPoint()) as ITestFluidObject; + const runtime = defaultDataStore.context.containerRuntime; + + const createdDataStore = await runtime.createDataStore(["default"]); + const dataStore = (await createdDataStore.entryPoint.get()) as ITestFluidObject; + id = dataStore.context.id; + + const channel = dataStore.runtime.createChannel( + newMapId, + "https://graph.microsoft.com/types/map", + ); + assert.strictEqual(channel.handle.isAttached, false, "Channel should be detached"); + + ((await channel.handle.get()) as SharedObject).bindToContext(); + defaultDataStore.root.set("someDataStore", dataStore.handle); + (channel as ISharedMap).set(testKey, testValue); + }, + ); // load offline; new datastore should be accessible - const container2 = await loadOffline(provider, { url }, pendingOps); + const container2 = await loadOffline(testContainerConfig, provider, { url }, pendingOps); { const entryPoint = (await container2.container.getEntryPoint()) as ITestFluidObject; const containerRuntime = entryPoint.context.containerRuntime as ContainerRuntime; @@ -1647,15 +1824,20 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => { // TODO: https://github.com/microsoft/FluidFramework/issues/10729 it("works with summary while offline", async function () { map1.set("test op 1", "test op 1"); - await waitForSummary(); + await waitForSummary(container1); - const pendingOps = await getPendingOps(provider, false, async (c, d) => { - const map = await d.getSharedObject(mapId); - map.set(testKey, testValue); - }); + const pendingOps = await getPendingOps( + testContainerConfig, + provider, + false, // Don't send ops from first container instance before closing + async (c, d) => { + const map = await d.getSharedObject(mapId); + map.set(testKey, testValue); + }, + ); map1.set("test op 2", "test op 2"); - await waitForSummary(); + await waitForSummary(container1); // load container with pending ops, which should resend the op not sent by previous container const container2 = await loader.resolve({ url }, pendingOps);