Skip to content

Commit

Permalink
Client Changes in Fluid Server Cluster Discovery Feature (#9706)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianzhu007 authored Apr 20, 2022
1 parent 4ed03f5 commit 1c8d867
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 39 deletions.
7 changes: 5 additions & 2 deletions api-report/routerlicious-driver.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class DocumentService implements api.IDocumentService {

// @public (undocumented)
export class DocumentStorageService extends DocumentStorageServiceProxy {
constructor(id: string, manager: GitManager, logger: ITelemetryLogger, policies?: IDocumentStorageServicePolicies, driverPolicies?: IRouterliciousDriverPolicies, blobCache?: ICache<ArrayBufferLike>, snapshotTreeCache?: ICache<ISnapshotTreeVersion>);
constructor(id: string, manager: GitManager, logger: ITelemetryLogger, policies?: IDocumentStorageServicePolicies, driverPolicies?: IRouterliciousDriverPolicies, blobCache?: ICache<ArrayBufferLike>, snapshotTreeCache?: ICache<ISnapshotTreeVersion>, noCacheGitManager?: GitManager | undefined);
// (undocumented)
getSnapshotTree(version?: IVersion): Promise<ISnapshotTree | null>;
// (undocumented)
Expand All @@ -85,11 +85,14 @@ export class DocumentStorageService extends DocumentStorageServiceProxy {
get logTailSha(): string | undefined;
// (undocumented)
manager: GitManager;
// (undocumented)
noCacheGitManager?: GitManager | undefined;
}

// @public (undocumented)
export interface IRouterliciousDriverPolicies {
aggregateBlobsSmallerThanBytes: number | undefined;
enableDiscovery?: boolean;
enablePrefetch: boolean;
enableRestLess: boolean;
enableWholeSummaryUpload: boolean;
Expand Down Expand Up @@ -143,7 +146,7 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact
constructor(tokenProvider: ITokenProvider, driverPolicies?: Partial<IRouterliciousDriverPolicies>);
// (undocumented)
createContainer(createNewSummary: ISummaryTree | undefined, resolvedUrl: IResolvedUrl, logger?: ITelemetryBaseLogger, clientIsSummarizer?: boolean): Promise<IDocumentService>;
createDocumentService(resolvedUrl: IResolvedUrl, logger?: ITelemetryBaseLogger, clientIsSummarizer?: boolean): Promise<IDocumentService>;
createDocumentService(resolvedUrl: IResolvedUrl, logger?: ITelemetryBaseLogger, clientIsSummarizer?: boolean, isCreateContainer?: boolean): Promise<IDocumentService>;
// (undocumented)
readonly protocolName = "fluid:";
}
Expand Down
8 changes: 6 additions & 2 deletions packages/drivers/local-driver/src/localDocumentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class LocalDocumentService implements api.IDocumentService {
private readonly innerDocumentService?: api.IDocumentService,
) { }

public dispose() {}
public dispose() { }

/**
* Creates and returns a document storage service for local use.
Expand All @@ -43,7 +43,11 @@ export class LocalDocumentService implements api.IDocumentService {
this.documentId,
new GitManager(new TestHistorian(this.localDeltaConnectionServer.testDbFactory.testDatabase)),
new TelemetryNullLogger(),
{ minBlobSize: 2048 }); // Test blob aggregation.
{ minBlobSize: 2048 }, // Test blob aggregation.
undefined,
undefined,
undefined,
new GitManager(new TestHistorian(this.localDeltaConnectionServer.testDbFactory.testDatabase)));
}

/**
Expand Down
9 changes: 8 additions & 1 deletion packages/drivers/routerlicious-driver/src/documentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ export class DocumentService implements api.IDocumentService {
false,
storageRestWrapper);
const gitManager = new GitManager(historian);
const noCacheHistorian = new Historian(
this.gitUrl,
true,
true,
storageRestWrapper);
const noCacheGitManager = new GitManager(noCacheHistorian);
const documentStorageServicePolicies: api.IDocumentStorageServicePolicies = {
caching: this.driverPolicies.enablePrefetch
? api.LoaderCachingPolicy.Prefetch
Expand All @@ -88,7 +94,8 @@ export class DocumentService implements api.IDocumentService {
documentStorageServicePolicies,
this.driverPolicies,
this.blobCache,
this.snapshotTreeCache);
this.snapshotTreeCache,
noCacheGitManager);
return this.documentStorageService;
}

Expand Down
69 changes: 50 additions & 19 deletions packages/drivers/routerlicious-driver/src/documentServiceFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import {
RateLimiter,
} from "@fluidframework/driver-utils";
import { ChildLogger } from "@fluidframework/telemetry-utils";
import { ISession } from "@fluidframework/server-services-client";
import { DocumentService } from "./documentService";
import { IRouterliciousDriverPolicies } from "./policies";
import { ITokenProvider } from "./tokens";
import { RouterliciousOrdererRestWrapper } from "./restWrapper";
import { convertSummaryToCreateNewSummary } from "./createNewUtils";
import { parseFluidUrl, replaceDocumentIdInPath } from "./urlUtils";
import { parseFluidUrl, replaceDocumentIdInPath, getDiscoveredFluidResolvedUrl } from "./urlUtils";
import { InMemoryCache } from "./cache";
import { pkgVersion as driverVersion } from "./packageVersion";
import { ISnapshotTreeVersion } from "./definitions";
Expand All @@ -33,6 +34,7 @@ const defaultRouterliciousDriverPolicies: IRouterliciousDriverPolicies = {
maxConcurrentStorageRequests: 100,
maxConcurrentOrdererRequests: 100,
aggregateBlobsSmallerThanBytes: undefined,
enableDiscovery: false,
enableWholeSummaryUpload: false,
enableRestLess: true,
};
Expand Down Expand Up @@ -66,7 +68,7 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact
ensureFluidResolvedUrl(resolvedUrl);
assert(!!createNewSummary, 0x204 /* "create empty file not supported" */);
assert(!!resolvedUrl.endpoints.ordererUrl, 0x0b2 /* "Missing orderer URL!" */);
const parsedUrl = parseFluidUrl(resolvedUrl.url);
let parsedUrl = parseFluidUrl(resolvedUrl.url);
if (!parsedUrl.pathname) {
throw new Error("Parsed url should contain tenant and doc Id!!");
}
Expand All @@ -92,15 +94,14 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact
resolvedUrl.endpoints.ordererUrl,
);

// the backend responds with the actual document ID associated with the new container.

// @TODO: Remove returned "string" type when removing back-compat code
const res = await ordererRestWrapper.post<{ id: string, token?: string } | string>(
const res = await ordererRestWrapper.post<{ id: string, token?: string, session?: ISession } | string>(
`/documents/${tenantId}`,
{
summary: convertSummaryToCreateNewSummary(appSummary),
sequenceNumber: documentAttributes.sequenceNumber,
values: quorumValues,
enableDiscovery: this.driverPolicies.enableDiscovery,
generateToken: this.tokenProvider.documentPostCreateCallback !== undefined,
},
);
Expand All @@ -111,13 +112,21 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact

let documentId: string;
let token: string | undefined;

let session: ISession | undefined;
let fluidResolvedUrl: IResolvedUrl;
if (typeof res === "string") {
documentId = res;
} else {
documentId = res.id;
token = res.token;
session = res.session;
}
if (session && this.driverPolicies.enableDiscovery) {
fluidResolvedUrl = getDiscoveredFluidResolvedUrl(resolvedUrl, session);
} else {
fluidResolvedUrl = resolvedUrl;
}
parsedUrl = parseFluidUrl(fluidResolvedUrl.url);

// @TODO: Remove token from the condition, checking the documentPostCreateCallback !== undefined
// is sufficient to determine if the token will be undefined or not.
Expand All @@ -126,7 +135,7 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact
}

parsedUrl.set("pathname", replaceDocumentIdInPath(parsedUrl.pathname, documentId));
const deltaStorageUrl = resolvedUrl.endpoints.deltaStorageUrl;
const deltaStorageUrl = fluidResolvedUrl.endpoints.deltaStorageUrl;
if (!deltaStorageUrl) {
throw new Error(
`All endpoints urls must be provided. [deltaStorageUrl:${deltaStorageUrl}]`);
Expand All @@ -136,16 +145,17 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact

return this.createDocumentService(
{
...resolvedUrl,
...fluidResolvedUrl,
url: parsedUrl.toString(),
id: documentId,
endpoints: {
...resolvedUrl.endpoints,
...fluidResolvedUrl.endpoints,
deltaStorageUrl: parsedDeltaStorageUrl.toString(),
},
},
logger,
clientIsSummarizer,
true,
);
}

Expand All @@ -159,10 +169,40 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact
resolvedUrl: IResolvedUrl,
logger?: ITelemetryBaseLogger,
clientIsSummarizer?: boolean,
isCreateContainer?: boolean,
): Promise<IDocumentService> {
ensureFluidResolvedUrl(resolvedUrl);

const fluidResolvedUrl = resolvedUrl;
const parsedUrl = parseFluidUrl(resolvedUrl.url);
const [, tenantId, documentId] = parsedUrl.pathname.split("/");
if (!documentId || !tenantId) {
throw new Error(
`Couldn't parse documentId and/or tenantId. [documentId:${documentId}][tenantId:${tenantId}]`);
}
const logger2 = ChildLogger.create(logger, "RouterliciousDriver", { all: { driverVersion }});

let fluidResolvedUrl: IResolvedUrl;
if (!isCreateContainer && this.driverPolicies.enableDiscovery) {
const rateLimiter = new RateLimiter(this.driverPolicies.maxConcurrentOrdererRequests);
const ordererRestWrapper = await RouterliciousOrdererRestWrapper.load(
tenantId,
documentId,
this.tokenProvider,
logger2,
rateLimiter,
this.driverPolicies.enableRestLess,
resolvedUrl.endpoints.ordererUrl,
);

// the backend responds with the actual document session associated with the container.
const session: ISession = await ordererRestWrapper.get<ISession>(
`/documents/${tenantId}/session/${documentId}`,
);
fluidResolvedUrl = getDiscoveredFluidResolvedUrl(resolvedUrl, session);
} else {
fluidResolvedUrl = resolvedUrl;
}

const storageUrl = fluidResolvedUrl.endpoints.storageUrl;
const ordererUrl = fluidResolvedUrl.endpoints.ordererUrl;
const deltaStorageUrl = fluidResolvedUrl.endpoints.deltaStorageUrl;
Expand All @@ -171,15 +211,6 @@ export class RouterliciousDocumentServiceFactory implements IDocumentServiceFact
`All endpoints urls must be provided. [ordererUrl:${ordererUrl}][deltaStorageUrl:${deltaStorageUrl}]`);
}

const parsedUrl = parseFluidUrl(fluidResolvedUrl.url);
const [, tenantId, documentId] = parsedUrl.pathname.split("/");
if (!documentId || !tenantId) {
throw new Error(
`Couldn't parse documentId and/or tenantId. [documentId:${documentId}][tenantId:${tenantId}]`);
}

const logger2 = ChildLogger.create(logger, "RouterliciousDriver", { all: { driverVersion }});

return new DocumentService(
fluidResolvedUrl,
ordererUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,18 @@ export class DocumentStorageService extends DocumentStorageServiceProxy {
policies: IDocumentStorageServicePolicies,
driverPolicies?: IRouterliciousDriverPolicies,
blobCache?: ICache<ArrayBufferLike>,
snapshotTreeCache?: ICache<ISnapshotTreeVersion>): IDocumentStorageService {
snapshotTreeCache?: ICache<ISnapshotTreeVersion>,
noCacheGitManager?: GitManager): IDocumentStorageService {
const storageService = driverPolicies?.enableWholeSummaryUpload ?
new WholeSummaryDocumentStorageService(
id,
manager,
logger,
policies,
driverPolicies,
blobCache,
snapshotTreeCache,
noCacheGitManager,
) :
new ShreddedSummaryDocumentStorageService(
id,
Expand All @@ -70,7 +73,8 @@ export class DocumentStorageService extends DocumentStorageServiceProxy {
policies: IDocumentStorageServicePolicies = {},
driverPolicies?: IRouterliciousDriverPolicies,
blobCache?: ICache<ArrayBufferLike>,
snapshotTreeCache?: ICache<ISnapshotTreeVersion>) {
snapshotTreeCache?: ICache<ISnapshotTreeVersion>,
public noCacheGitManager?: GitManager) {
super(DocumentStorageService.loadInternalDocumentStorageService(
id,
manager,
Expand All @@ -79,6 +83,7 @@ export class DocumentStorageService extends DocumentStorageServiceProxy {
driverPolicies,
blobCache,
snapshotTreeCache,
noCacheGitManager,
));
}

Expand Down
5 changes: 5 additions & 0 deletions packages/drivers/routerlicious-driver/src/policies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ export interface IRouterliciousDriverPolicies {
* Default: false
*/
enableWholeSummaryUpload: boolean;
/**
* Enable service endpoint discovery when creating or joining a session.
* Default: false
*/
enableDiscovery?: boolean;
/**
* Enable using RestLess which avoids CORS preflight requests.
* Default: true
Expand Down
31 changes: 31 additions & 0 deletions packages/drivers/routerlicious-driver/src/urlUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* Licensed under the MIT License.
*/

import { IFluidResolvedUrl } from "@fluidframework/driver-definitions";
import URLParse from "url-parse";
import { ISession } from "@fluidframework/server-services-client";

export const parseFluidUrl = (fluidUrl: string): URLParse => {
return new URLParse(fluidUrl, true);
Expand All @@ -17,3 +19,32 @@ export const parseFluidUrl = (fluidUrl: string): URLParse => {
*/
export const replaceDocumentIdInPath = (urlPath: string, documentId: string): string =>
urlPath.split("/").slice(0, -1).concat([documentId]).join("/");

export const getDiscoveredFluidResolvedUrl = (resolvedUrl: IFluidResolvedUrl, session: ISession): IFluidResolvedUrl => {
if (session) {
const discoveredOrdererUrl = new URLParse(session.ordererUrl);
const deltaStorageUrl = new URLParse(resolvedUrl.endpoints.deltaStorageUrl);
deltaStorageUrl.set("host", discoveredOrdererUrl.host);

const discoveredStorageUrl = new URLParse(session.historianUrl);
const storageUrl = new URLParse(resolvedUrl.endpoints.storageUrl);
storageUrl.set("host", discoveredStorageUrl.host);

const parsedUrl = parseFluidUrl(resolvedUrl.url);
const discoveredResolvedUrl: IFluidResolvedUrl = {
endpoints: {
deltaStorageUrl: deltaStorageUrl.toString(),
ordererUrl: session.ordererUrl,
storageUrl: storageUrl.toString(),
},
id: resolvedUrl.id,
tokens: resolvedUrl.tokens,
type: resolvedUrl.type,
url: new URLParse(`fluid://${discoveredOrdererUrl.host}${parsedUrl.pathname}`).toString(),
};

return discoveredResolvedUrl;
} else {
return resolvedUrl;
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
IDocumentStorageService,
ISummaryContext,
IDocumentStorageServicePolicies,
} from "@fluidframework/driver-definitions";
} from "@fluidframework/driver-definitions";
import {
ICreateBlobResponse,
ISnapshotTree,
Expand All @@ -31,6 +31,7 @@ import {
import { PerformanceEvent } from "@fluidframework/telemetry-utils";
import { ICache, InMemoryCache } from "./cache";
import { ISnapshotTreeVersion } from "./definitions";
import { IRouterliciousDriverPolicies } from "./policies";

const latestSnapshotId: string = "latest";

Expand All @@ -47,8 +48,10 @@ export class WholeSummaryDocumentStorageService implements IDocumentStorageServi
protected readonly manager: GitManager,
protected readonly logger: ITelemetryLogger,
public readonly policies: IDocumentStorageServicePolicies = {},
private readonly driverPolicies?: IRouterliciousDriverPolicies,
private readonly blobCache: ICache<ArrayBufferLike> = new InMemoryCache(),
private readonly snapshotTreeCache: ICache<ISnapshotTreeVersion> = new InMemoryCache()) {
private readonly snapshotTreeCache: ICache<ISnapshotTreeVersion> = new InMemoryCache(),
protected readonly noCacheGitManager?: GitManager) {
this.summaryUploadManager = new WholeSummaryUploadManager(manager);
}

Expand All @@ -64,7 +67,9 @@ export class WholeSummaryDocumentStorageService implements IDocumentStorageServi
// Fetch latest summary, cache it, and return its id.
if (this.firstVersionsCall && count === 1) {
this.firstVersionsCall = false;
const { id: _id, snapshotTree } = await this.fetchAndCacheSnapshotTree(latestSnapshotId);
const { id: _id, snapshotTree } = !this.driverPolicies?.enableDiscovery ?
await this.fetchAndCacheSnapshotTree(latestSnapshotId, false) :
await this.fetchAndCacheSnapshotTree(latestSnapshotId, true);
return [{
id: _id,
treeId: snapshotTree.id!,
Expand Down Expand Up @@ -170,7 +175,7 @@ export class WholeSummaryDocumentStorageService implements IDocumentStorageServi
);
}

private async fetchAndCacheSnapshotTree(versionId: string): Promise<ISnapshotTreeVersion> {
private async fetchAndCacheSnapshotTree(versionId: string, disableCache?: boolean): Promise<ISnapshotTreeVersion> {
const cachedSnapshotTreeVersion = await this.snapshotTreeCache.get(versionId);
if (cachedSnapshotTreeVersion !== undefined) {
return { id: cachedSnapshotTreeVersion.id, snapshotTree: cachedSnapshotTreeVersion.snapshotTree };
Expand All @@ -183,7 +188,9 @@ export class WholeSummaryDocumentStorageService implements IDocumentStorageServi
treeId: versionId,
},
async (event) => {
const response = await this.manager.getSummary(versionId);
const response = disableCache && this.noCacheGitManager !== undefined ?
await this.noCacheGitManager.getSummary(versionId) :
await this.manager.getSummary(versionId);
event.end({
size: response.trees[0]?.entries.length,
});
Expand All @@ -194,7 +201,7 @@ export class WholeSummaryDocumentStorageService implements IDocumentStorageServi
const wholeFlatSummaryId: string = wholeFlatSummary.id;
const snapshotTreeId = normalizedWholeSummary.snapshotTree.id;
assert(snapshotTreeId !== undefined, 0x275 /* "Root tree should contain the id" */);
const snapshotTreeVersion = { id: wholeFlatSummaryId , snapshotTree: normalizedWholeSummary.snapshotTree };
const snapshotTreeVersion = { id: wholeFlatSummaryId, snapshotTree: normalizedWholeSummary.snapshotTree };

const cachePs: Promise<any>[] = [
this.snapshotTreeCache.put(
Expand Down
Loading

0 comments on commit 1c8d867

Please sign in to comment.