Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server Changes in Fluid Server Cluster Discovery Feature #9685

Merged
merged 36 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2fd9a5e
FRS-Echo-Openapis-POC
tianzhu007 Jul 29, 2021
b801558
Update openapi.json
tianzhu007 Jul 30, 2021
aa136b6
Update openapi.json
tianzhu007 Jul 30, 2021
41e35af
Upload openapi.yaml
tianzhu007 Aug 3, 2021
48b352f
Update openapi.json
tianzhu007 Aug 4, 2021
ef18375
Update openapi.json
tianzhu007 Aug 4, 2021
f5302a9
Update openapi.json
tianzhu007 Aug 4, 2021
007588a
Update openapi.json
tianzhu007 Aug 5, 2021
46e2b32
Remove openapi
tianzhu007 Nov 17, 2021
b4ae673
Merge branch 'microsoft:main' into main
tianzhu007 Dec 30, 2021
32559e6
Merge branch 'microsoft:main' into main
tianzhu007 Feb 5, 2022
14176fb
Merge branch 'microsoft:main' into main
tianzhu007 Feb 7, 2022
dcdec98
Merge branch 'microsoft:main' into main
tianzhu007 Mar 3, 2022
a03e2fa
Merge branch 'microsoft:main' into main
tianzhu007 Mar 4, 2022
16bc082
Merge branch 'microsoft:main' into main
tianzhu007 Mar 7, 2022
f9d82cb
Merge branch 'microsoft:main' into main
tianzhu007 Mar 10, 2022
cc3ffeb
Merge branch 'microsoft:main' into main
tianzhu007 Mar 30, 2022
336015a
Add routing feature for server side
tianzhu007 Mar 30, 2022
924c7e5
Remove unnecessary code and command
tianzhu007 Mar 31, 2022
e342ab4
Add logs when creating session
tianzhu007 Apr 1, 2022
fefc3ae
Update the end flow, and cache method
tianzhu007 Apr 2, 2022
12d2622
Refactor the code as false
tianzhu007 Apr 4, 2022
b0ddcca
Refactor the sessionHelper.ts
tianzhu007 Apr 4, 2022
8410bf1
Update the comment
tianzhu007 Apr 4, 2022
ac8e259
Update the fetchAndCache
tianzhu007 Apr 4, 2022
7240b0b
Change the db update method
tianzhu007 Apr 5, 2022
42aebca
Merge branch 'microsoft:main' into main
tianzhu007 Apr 5, 2022
18b943b
Merge branch 'main' into Enable-Routing-Work-Server
tianzhu007 Apr 5, 2022
51b5a21
Update json files
tianzhu007 Apr 5, 2022
2962e36
Change the partitionActivitiyTimeout
tianzhu007 Apr 5, 2022
a86f968
Update close session
tianzhu007 Apr 5, 2022
b574e9e
Remove the extra quote
tianzhu007 Apr 5, 2022
6ee4f68
Format the if else block
tianzhu007 Apr 5, 2022
ecef313
Refactor the docuemnts.ts
tianzhu007 Apr 5, 2022
4e4dcb5
Refactor the sessionHelper.ts
tianzhu007 Apr 7, 2022
1bba9e2
Change the comments
tianzhu007 Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ export class RestGitService {
this.getSummaryCacheKey("container"),
async () => this.get<IWholeFlatSummary>(
`/repos/${this.getRepoPath()}/git/summaries/${encodeURIComponent(sha)}`),
useCache);
useCache,
true);
}

public async updateRef(ref: string, params: IPatchRefParamsExternal): Promise<git.IRef> {
Expand Down Expand Up @@ -362,8 +363,7 @@ export class RestGitService {
useCache);
}

private getStorageRoutingHeaderValue()
{
private getStorageRoutingHeaderValue() {
return `${this.tenantId}:${this.documentId}`;
}

Expand Down Expand Up @@ -431,10 +431,23 @@ export class RestGitService {
}
}

/**
* Caches by the given key.
*/
private async fetchAndCache<T>(key: string, fetch: () => Promise<T>): Promise<T> {
winston.info(`Fetching ${key}`);
Lumberjack.info(`Fetching ${key}`, this.lumberProperties);
const value = await fetch();
if (this.cache) {
this.setCache(key, value);
}
return value;
}

/**
* Deletes the given key from the cache. Will log any errors with the cache.
*/
private deleteFromCache(key: string): void {
private deleteFromCache(key: string): void {
if (this.cache) {
// Attempt to delete the key from Redis - log any errors but don't fail
this.cache.delete(key).catch((error) => {
Expand All @@ -444,7 +457,10 @@ export class RestGitService {
}
}

private async resolve<T>(key: string, fetch: () => Promise<T>, useCache: boolean): Promise<T> {
private async resolve<T>(key: string,
fetch: () => Promise<T>,
useCache: boolean,
resolvingSummary: boolean = false): Promise<T> {
if (this.cache && useCache) {
// Attempt to grab the value from the cache. Log any errors but don't fail the request
const cachedValue: T | undefined = await this.cache.get<T>(key).catch((error) => {
Expand All @@ -460,15 +476,18 @@ export class RestGitService {
}

// Value is not cached - fetch it with the provided function and then cache the value
winston.info(`Fetching ${key}`);
Lumberjack.info(`Fetching ${key}`, this.lumberProperties);
const value = await fetch();
this.setCache(key, value);
return this.fetchAndCache(key, fetch);
}

return value;
} else {
return fetch();
if (resolvingSummary) {
tianzhu007 marked this conversation as resolved.
Show resolved Hide resolved
/**
* We set the useCache flag as false when we fetch the summary at the first time. We need to
* get the summary from the strage, and update the cache. If not, the following calls with
tianzhu007 marked this conversation as resolved.
Show resolved Hide resolved
* useCache enabled might read the outdated summary from cache in case of the historian service change.
tianzhu007 marked this conversation as resolved.
Show resolved Hide resolved
*/
return this.fetchAndCache(key, fetch);
}
return fetch();
}

tianzhu007 marked this conversation as resolved.
Show resolved Hide resolved
private getSummaryCacheKey(type: IWholeSummaryPayloadType): string {
Expand Down
10 changes: 10 additions & 0 deletions server/routerlicious/api-report/server-services-client.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ export interface IPatchRefParamsExternal extends resources.IPatchRefParams {
config?: IExternalWriterConfig;
}

// @public (undocumented)
export interface ISession {
// (undocumented)
historianUrl: string;
// (undocumented)
isSessionAlive: boolean;
// (undocumented)
ordererUrl: string;
}

// @public (undocumented)
export interface ISummaryTree extends ISummaryTree_2 {
// (undocumented)
Expand Down
3 changes: 3 additions & 0 deletions server/routerlicious/packages/lambdas/src/deli/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ export class DeliLambda extends TypedEventEmitter<IDeliLambdaEvents> implements
public handler(rawMessage: IQueuedMessage) {
// In cases where we are reprocessing messages we have already checkpointed exit early
if (rawMessage.offset <= this.logOffset) {
Lumberjack.info(`rawMessage.offset: ${rawMessage.offset} <= this.logOffset: ${this.logOffset}`,
getLumberBaseProperties(this.documentId, this.tenantId));

this.updateCheckpointMessages(rawMessage);

if (this.checkpointInfo.currentKafkaCheckpointMessage) {
Expand Down
30 changes: 24 additions & 6 deletions server/routerlicious/packages/lambdas/src/deli/lambdaFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
IProducer,
IServiceConfiguration,
ITenantManager,
LambdaCloseType,
MongoManager,
} from "@fluidframework/server-services-core";
import { generateServiceProtocolEntries } from "@fluidframework/protocol-base";
Expand Down Expand Up @@ -55,8 +56,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
private readonly tenantManager: ITenantManager,
private readonly forwardProducer: IProducer,
private readonly reverseProducer: IProducer,
private readonly serviceConfiguration: IServiceConfiguration,
globalDbMongoManager?: MongoManager) {
tianzhu007 marked this conversation as resolved.
Show resolved Hide resolved
private readonly serviceConfiguration: IServiceConfiguration) {
super();
}

Expand Down Expand Up @@ -124,7 +124,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
lastCheckpoint.logOffset = -1;
lastCheckpoint.epoch = leaderEpoch;
context.log?.info(`Deli checkpoint from summary:
${ JSON.stringify(lastCheckpoint)}`, { messageMetaData });
${JSON.stringify(lastCheckpoint)}`, { messageMetaData });
}
} else {
lastCheckpoint = JSON.parse(dbObject.deli);
Expand Down Expand Up @@ -152,7 +152,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
const checkpointManager = createDeliCheckpointManagerFromCollection(tenantId, documentId, this.collection);

// Should the lambda reaize that term has flipped to send a no-op message at the beginning?
return new DeliLambda(
const deliLambda = new DeliLambda(
context,
tenantId,
documentId,
Expand All @@ -164,14 +164,32 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
this.serviceConfiguration,
sessionMetric,
sessionStartMetric);

deliLambda.on("close", (closeType) => {
const handler = async () => {
if ((closeType === LambdaCloseType.ActivityTimeout || closeType === LambdaCloseType.Error)) {
const query = { documentId, tenantId, session: { $exists: true } };
const data = { "session.isSessionAlive": false };
await this.collection.update(query, data, null);
context.log?.info(`Marked isSessionAlive as false for closeType: ${JSON.stringify(closeType)}`,
{ messageMetaData });
}
};
handler().catch((e) => {
context.log?.error(`Failed to handle isSessionAlive with exception ${e}`
, { messageMetaData });
});
});

return deliLambda;
}

private logSessionFailureMetrics(
sessionMetric: Lumber<LumberEventName.SessionResult> | undefined,
sessionStartMetric: Lumber<LumberEventName.StartSessionResult> | undefined,
errMsg: string) {
sessionMetric?.error(errMsg);
sessionStartMetric?.error(errMsg);
sessionMetric?.error(errMsg);
sessionStartMetric?.error(errMsg);
}

public async dispose(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
MongoManager,
IThrottler,
ICache,
ICollection,
IDocument,
} from "@fluidframework/server-services-core";
import { json, urlencoded } from "body-parser";
import compression from "compression";
Expand Down Expand Up @@ -49,7 +51,7 @@ export function create(
appTenants: IAlfredTenant[],
operationsDbMongoManager: MongoManager,
producer: IProducer,
globalDbMongoManager?: MongoManager) {
documentsCollection: ICollection<IDocument>) {
// Maximum REST request size
const requestSize = config.get("alfred:restJsonSize");

Expand Down Expand Up @@ -111,7 +113,7 @@ export function create(
storage,
producer,
appTenants,
globalDbMongoManager);
documentsCollection);

app.use("/public", cors(), express.static(path.join(__dirname, "../../public")));
app.use(routes.api);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

import * as crypto from "crypto";
import {
IDocument,
IDocumentStorage,
IThrottler,
ITenantManager,
ICache,
MongoManager,
ICollection,
} from "@fluidframework/server-services-core";
import {
verifyStorageToken,
Expand All @@ -20,10 +21,10 @@ import {
} from "@fluidframework/server-services-utils";
import { Router } from "express";
import winston from "winston";
import { IAlfredTenant } from "@fluidframework/server-services-client";
import { IAlfredTenant, ISession } from "@fluidframework/server-services-client";
import { Provider } from "nconf";
import { v4 as uuid } from "uuid";
import { Constants, handleResponse } from "../../../utils";
import { Constants, handleResponse, getSession } from "../../../utils";

export function create(
storage: IDocumentStorage,
Expand All @@ -32,9 +33,10 @@ export function create(
singleUseTokenCache: ICache,
config: Provider,
tenantManager: ITenantManager,
globalDbMongoManager?: MongoManager): Router {
documentsCollection: ICollection<IDocument>): Router {
const router: Router = Router();

const ordererUrl = config.get("worker:serverUrl");
const historianUrl = config.get("worker:blobStorageUrl");
// Whether to enforce server-generated document ids in create doc flow
const enforceServerGeneratedDocumentId: boolean = config.get("alfred:enforceServerGeneratedDocumentId") ?? false;

Expand All @@ -61,7 +63,7 @@ export function create(
(error) => {
response.status(400).json(error);
});
});
});

/**
* Creates a new document with initial summary.
Expand All @@ -74,7 +76,7 @@ export function create(
singleUseTokenCache,
}),
throttle(throttler, winston, commonThrottleOptions),
(request, response, next) => {
async (request, response, next) => {
// Tenant and document
const tenantId = getParam(request.params, "tenantId");
// If enforcing server generated document id, ignore id parameter
Expand All @@ -95,26 +97,53 @@ export function create(
sequenceNumber,
1,
crypto.randomBytes(4).toString("hex"),
ordererUrl,
historianUrl,
values);

// Generate creation token given a jwt from header
const authorizationHeader = request.header("Authorization");
const tokenRegex = /Basic (.+)/;
const tokenMatch = tokenRegex.exec(authorizationHeader);
const token = tokenMatch[1];
const enableDiscovery: boolean = request.body.enableDiscovery ?? false;

const tenantKeyP = tenantManager.getKey(tenantId);

handleResponse(Promise.all([createP, tenantKeyP]).then(([_, key]) => {
// @TODO: Modify it to return an object only, it returns string for back-compat.
return generateToken
? {
id,
token: getCreationToken(token, key, id),
}
: id;
}), response, undefined, 201);
// Handle backwards compatibility for older driver versions.
// TODO: remove condition once old drivers are phased out and all clients can handle object response
const clientAcceptsObjectResponse = enableDiscovery === true || generateToken === true;
if (clientAcceptsObjectResponse) {
const responseBody = { id, token: undefined, session: undefined };
if (generateToken) {
// Generate creation token given a jwt from header
const authorizationHeader = request.header("Authorization");
const tokenRegex = /Basic (.+)/;
const tokenMatch = tokenRegex.exec(authorizationHeader);
const token = tokenMatch[1];
const tenantKey = await tenantManager.getKey(tenantId);
responseBody.token = getCreationToken(token, tenantKey, id);
}
if (enableDiscovery) {
// Session information
const session: ISession = {
ordererUrl,
historianUrl,
isSessionAlive: false,
};
responseBody.session = session;
}
handleResponse(createP.then(() => responseBody), response, undefined, 201);
} else {
handleResponse(createP.then(() => id), response, undefined, 201);
}
});

/**
* Get the session information.
*/
router.get(
"/:tenantId/session/:id",
verifyStorageToken(tenantManager, config),
throttle(throttler, winston, commonThrottleOptions),
async (request, response, next) => {
const documentId = getParam(request.params, "id");
const tenantId = getParam(request.params, "tenantId");
const session = getSession(ordererUrl, historianUrl, tenantId, documentId, documentsCollection);
handleResponse(session, response, undefined, 200);
});
return router;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import {
ICache,
ICollection,
IDocument,
IDocumentStorage,
IProducer,
ITenantManager,
Expand All @@ -28,7 +30,7 @@ export function create(
operationsDbMongoManager: MongoManager,
producer: IProducer,
appTenants: IAlfredTenant[],
globalDbMongoManager?: MongoManager): Router {
documentsCollection: ICollection<IDocument>): Router {
const router: Router = Router();
const deltasRoute = deltas.create(config, tenantManager, operationsDbMongoManager, appTenants, throttler);
const documentsRoute = documents.create(
Expand All @@ -38,7 +40,7 @@ export function create(
singleUseTokenCache,
config,
tenantManager,
globalDbMongoManager);
documentsCollection);
const apiRoute = api.create(config, producer, tenantManager, storage, throttler);

router.use(cors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {
MongoManager,
IThrottler,
ICache,
ICollection,
IDocument,
} from "@fluidframework/server-services-core";
import { Router } from "express";
import { Provider } from "nconf";
Expand All @@ -30,7 +32,7 @@ export function create(
storage: IDocumentStorage,
producer: IProducer,
appTenants: IAlfredTenant[],
globalDbMongoManager?: MongoManager) {
documentsCollection: ICollection<IDocument>) {
return {
api: api.create(
config,
Expand All @@ -41,7 +43,7 @@ export function create(
operationsDbMongoManager,
producer,
appTenants,
globalDbMongoManager,
documentsCollection,
),
};
}
Loading