Skip to content

Commit

Permalink
Make the updateExistingSession Atomic (#12669)
Browse files Browse the repository at this point in the history
If two clients joined a non-live session from different clusters, they
would have a race condition to modify the database collection using
different URLs. For example:
Tenant A opens document A and calls the getSession method to update
URLs.
While Tenant B opens document A and hits the race condition. Tenant A's
interaction with cosmos DB hasn't been done, tenant B might still read
the isSessionAlive as false, and update the URLs for the truth again.

To fix it:
When the service wants to update the truth in CosmosDB collection, we
only make updates as atomic in condition when the isSessionAlive ==
false. We are doing the check at the very last instant (as a part of the
upsert command itself), it will not create the race.


![MicrosoftTeams-image](https://user-images.githubusercontent.com/66701969/197908085-5552bfb6-f3de-4cfd-9197-768115036182.png)
  • Loading branch information
tianzhu007 authored Nov 22, 2022
1 parent 08be0c2 commit 7b3f64a
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import { ISession, NetworkError } from "@fluidframework/server-services-client";
import { IDocument, ICollection } from "@fluidframework/server-services-core";
import { IDocument, ICollection, runWithRetry } from "@fluidframework/server-services-core";
import { getLumberBaseProperties, Lumberjack } from "@fluidframework/server-services-telemetry";

const defaultSessionStickinessDurationMs = 60 * 60 * 1000; // 60 minutes
Expand All @@ -17,6 +17,7 @@ async function createNewSession(
ordererUrl: string,
historianUrl: string,
deltaStreamUrl: string,
tenantId,
documentId,
documentsCollection: ICollection<IDocument>,
lumberjackProperties: Record<string, any>,
Expand All @@ -31,6 +32,7 @@ async function createNewSession(
try {
await documentsCollection.upsert(
{
tenantId,
documentId,
},
{
Expand Down Expand Up @@ -59,10 +61,12 @@ async function updateExistingSession(
deltaStreamUrl: string,
document: IDocument,
existingSession: ISession,
documentId,
documentId: string,
tenantId: string,
documentsCollection: ICollection<IDocument>,
sessionStickinessDurationMs: number,
lumberjackProperties: Record<string, any>,
count: number,
): Promise<ISession> {
let updatedDeli: string | undefined;
let updatedScribe: string | undefined;
Expand All @@ -88,7 +92,7 @@ async function updateExistingSession(
documentLastAccessTime: document.lastAccessTime,
sessionStickyCalculationTimestamp,
sessionStickinessDurationMs,
});
});
if (!isSessionSticky || !sessionHasLocation) {
// Allow session location to be moved.
if (
Expand All @@ -105,7 +109,7 @@ async function updateExistingSession(
// eslint-disable-next-line max-len
oldSessionLocation: { ordererUrl: existingSession.ordererUrl, historianUrl: existingSession.historianUrl, deltaStreamUrl: existingSession.deltaStreamUrl },
newSessionLocation: { ordererUrl, historianUrl, deltaStreamUrl },
});
});
updatedOrdererUrl = ordererUrl;
updatedHistorianUrl = historianUrl;
updatedDeltaStreamUrl = deltaStreamUrl;
Expand Down Expand Up @@ -134,20 +138,53 @@ async function updateExistingSession(
isSessionActive: false,
};
try {
await documentsCollection.upsert(
const result = await documentsCollection.findAndUpdate(
{
tenantId,
documentId,
"session.isSessionAlive": false,
},
{
createTime: document.createTime,
deli: updatedDeli ?? document.deli,
scribe: updatedScribe ?? document.scribe,
documentId: document.documentId,
session: updatedSession,
},
null);
scribe: updatedScribe ?? document.scribe,
tenantId: document.tenantId,
version: document.version,
});
Lumberjack.info(
`The Session ${JSON.stringify(updatedSession)} was updated into the document collection`,
`The original document in updateExistingSession: ${JSON.stringify(result)}`,
lumberjackProperties,
);
// There is no document with isSessionAlive as false. It means this session has been discovered by
// another call, and there is a race condition with different clients writing truth into the cosmosdb
// from different clusters. Thus, let it get the truth from the cosmosdb with isSessionAlive as true.
if (!result.existing) {
Lumberjack.info(`The document with isSessionAlive as false does not exist`, lumberjackProperties);
const doc: IDocument = await runWithRetry(
async () => documentsCollection.findOne({ tenantId, documentId, "session.isSessionAlive": true,}),
"getDocumentWithAlive",
3 /* maxRetries */,
1000 /* retryAfterMs */,
lumberjackProperties,
undefined, /* shouldIgnoreError */
(error) => true, /* shouldRetry */
);
if (!doc && !doc.session) {
Lumberjack.error(
`Error running getSession from document: ${JSON.stringify(doc)}`,
lumberjackProperties,
);
throw new NetworkError(500, "Error running getSession, please try again");
}
return doc.session;
} else {
Lumberjack.info(
`The Session ${JSON.stringify(updatedSession)} was updated into the documents collection`,
lumberjackProperties,
);
}
} catch (error) {
Lumberjack.error("Error persisting update to existing document session", lumberjackProperties, error);
throw new NetworkError(500, "Failed to persist update to document session");
Expand Down Expand Up @@ -186,6 +223,7 @@ export async function getSession(
documentId: string,
documentsCollection: ICollection<IDocument>,
sessionStickinessDurationMs: number = defaultSessionStickinessDurationMs,
count: number = 0,
): Promise<ISession> {
const lumberjackProperties = getLumberBaseProperties(documentId, tenantId);

Expand All @@ -206,6 +244,7 @@ export async function getSession(
ordererUrl,
historianUrl,
deltaStreamUrl,
tenantId,
documentId,
documentsCollection,
lumberjackProperties,
Expand All @@ -226,9 +265,11 @@ export async function getSession(
document,
existingSession,
documentId,
tenantId,
documentsCollection,
sessionStickinessDurationMs,
lumberjackProperties,
count,
);
return convertSessionToFreshSession(updatedSession, lumberjackProperties);
}
12 changes: 12 additions & 0 deletions server/routerlicious/packages/services-core/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ export interface ICollection<T> {
*/
findOrCreate(query: any, value: T): Promise<{ value: T; existing: boolean; }>;

/**
* Finds query in the database and replace its value.
* Do nothing if query was not found.
*
* @param query - data we want to find
* @param value - data to update to the database
*/
findAndUpdate(query: any, value: T): Promise<{
value: T;
existing: boolean;
}>;

/**
* Finds the query in the database. If it exists, update the value to set.
* Throws if query cannot be found.
Expand Down
15 changes: 15 additions & 0 deletions server/routerlicious/packages/services/src/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ export class MongoCollection<T> implements core.ICollection<T>, core.IRetryable
);
}

public async findAndUpdate(query: any, value: T): Promise<{ value: T; existing: boolean; }> {
const result = await this.collection.findOneAndUpdate(
query,
{
$set: value,
},
{
returnOriginal: true,
});

return result.value
? { value: result.value, existing: true }
: { value, existing: false };
}

private async updateCore(filter: any, set: any, addToSet: any, upsert: boolean): Promise<void> {
const update: any = {};
if (set) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ describe("Test for TestUtils", () => {
assert.deepStrictEqual(found.value, expected);
assert.strictEqual(found.existing, false);
});
it("finds and updates, existing element", async () => {
const testCollection = new TestCollection([...items]);
const itemToCreate = { _id: 1, value: "uno", group: "odd" };
const expected = item1;
const found = await testCollection.findAndUpdate({ _id: 1 }, itemToCreate);
assert.deepStrictEqual(found.value, expected);
assert.strictEqual(found.existing, true);
});
it("not find, and do nothing, non-existing element", async () => {
const testCollection = new TestCollection([...items]);
const itemToCreate = { _id: 4, value: "four", group: "even" };
const expected = null;
const found = await testCollection.findAndUpdate({ _id: 4 }, itemToCreate);
assert.deepStrictEqual(found.value, expected);
assert.strictEqual(found.existing, false);
});
it("inserts and finds multiple elements", async () => {
const testCollection = new TestCollection([...items]);
const newItems = [
Expand Down
10 changes: 10 additions & 0 deletions server/routerlicious/packages/test-utils/src/testCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ export class TestCollection implements ICollection<any> {
return { value: this.insertOneInternal(value), existing: false };
}

public async findAndUpdate(query: any, value: any): Promise<{ value: any; existing: boolean; }> {
const existingValue = this.findOneInternal(query);
if (!existingValue) {
return { value: existingValue, existing: false };
}
this.removeOneInternal(existingValue);
this.insertOneInternal(value)
return { value: existingValue, existing: true };
}

public async insertMany(values: any[], ordered: boolean): Promise<void> {
values.forEach((value) => {
this.collection.push(value);
Expand Down

0 comments on commit 7b3f64a

Please sign in to comment.