Skip to content

Commit

Permalink
Server Changes in Fluid Server Cluster Discovery Feature (#9685)
Browse files Browse the repository at this point in the history
Server Changes in Fluid Server Cluster Discovery Feature
  • Loading branch information
tianzhu007 authored Apr 8, 2022
1 parent 83b6ac6 commit 86e4ed7
Show file tree
Hide file tree
Showing 19 changed files with 281 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ export class RestGitService {
this.getSummaryCacheKey("container"),
async () => this.get<IWholeFlatSummary>(
`/repos/${this.getRepoPath()}/git/summaries/${encodeURIComponent(sha)}`),
useCache);
useCache,
true);
}

public async updateRef(ref: string, params: IPatchRefParamsExternal): Promise<git.IRef> {
Expand Down Expand Up @@ -362,8 +363,7 @@ export class RestGitService {
useCache);
}

private getStorageRoutingHeaderValue()
{
private getStorageRoutingHeaderValue() {
return `${this.tenantId}:${this.documentId}`;
}

Expand Down Expand Up @@ -431,10 +431,23 @@ export class RestGitService {
}
}

/**
* Caches by the given key.
*/
private async fetchAndCache<T>(key: string, fetch: () => Promise<T>): Promise<T> {
winston.info(`Fetching ${key}`);
Lumberjack.info(`Fetching ${key}`, this.lumberProperties);
const value = await fetch();
if (this.cache) {
this.setCache(key, value);
}
return value;
}

/**
* Deletes the given key from the cache. Will log any errors with the cache.
*/
private deleteFromCache(key: string): void {
private deleteFromCache(key: string): void {
if (this.cache) {
// Attempt to delete the key from Redis - log any errors but don't fail
this.cache.delete(key).catch((error) => {
Expand All @@ -444,7 +457,10 @@ export class RestGitService {
}
}

private async resolve<T>(key: string, fetch: () => Promise<T>, useCache: boolean): Promise<T> {
private async resolve<T>(key: string,
fetch: () => Promise<T>,
useCache: boolean,
resolvingSummary: boolean = false): Promise<T> {
if (this.cache && useCache) {
// Attempt to grab the value from the cache. Log any errors but don't fail the request
const cachedValue: T | undefined = await this.cache.get<T>(key).catch((error) => {
Expand All @@ -460,15 +476,18 @@ export class RestGitService {
}

// Value is not cached - fetch it with the provided function and then cache the value
winston.info(`Fetching ${key}`);
Lumberjack.info(`Fetching ${key}`, this.lumberProperties);
const value = await fetch();
this.setCache(key, value);
return this.fetchAndCache(key, fetch);
}

return value;
} else {
return fetch();
if (resolvingSummary) {
/**
* We set the useCache flag as false when we fetch the summary at the first time. We need to
* get the summary from the storage, and update the cache. If not, the following calls with
* useCache enabled might read the outdated summary from cache in case of the cluster change.
*/
return this.fetchAndCache(key, fetch);
}
return fetch();
}

private getSummaryCacheKey(type: IWholeSummaryPayloadType): string {
Expand Down
10 changes: 10 additions & 0 deletions server/routerlicious/api-report/server-services-client.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ export interface IPatchRefParamsExternal extends resources.IPatchRefParams {
config?: IExternalWriterConfig;
}

// @public (undocumented)
export interface ISession {
// (undocumented)
historianUrl: string;
// (undocumented)
isSessionAlive: boolean;
// (undocumented)
ordererUrl: string;
}

// @public (undocumented)
export interface ISummaryTree extends ISummaryTree_2 {
// (undocumented)
Expand Down
3 changes: 3 additions & 0 deletions server/routerlicious/packages/lambdas/src/deli/lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ export class DeliLambda extends TypedEventEmitter<IDeliLambdaEvents> implements
public handler(rawMessage: IQueuedMessage) {
// In cases where we are reprocessing messages we have already checkpointed exit early
if (rawMessage.offset <= this.logOffset) {
Lumberjack.info(`rawMessage.offset: ${rawMessage.offset} <= this.logOffset: ${this.logOffset}`,
getLumberBaseProperties(this.documentId, this.tenantId));

this.updateCheckpointMessages(rawMessage);

if (this.checkpointInfo.currentKafkaCheckpointMessage) {
Expand Down
30 changes: 24 additions & 6 deletions server/routerlicious/packages/lambdas/src/deli/lambdaFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
IProducer,
IServiceConfiguration,
ITenantManager,
LambdaCloseType,
MongoManager,
} from "@fluidframework/server-services-core";
import { generateServiceProtocolEntries } from "@fluidframework/protocol-base";
Expand Down Expand Up @@ -55,8 +56,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
private readonly tenantManager: ITenantManager,
private readonly forwardProducer: IProducer,
private readonly reverseProducer: IProducer,
private readonly serviceConfiguration: IServiceConfiguration,
globalDbMongoManager?: MongoManager) {
private readonly serviceConfiguration: IServiceConfiguration) {
super();
}

Expand Down Expand Up @@ -124,7 +124,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
lastCheckpoint.logOffset = -1;
lastCheckpoint.epoch = leaderEpoch;
context.log?.info(`Deli checkpoint from summary:
${ JSON.stringify(lastCheckpoint)}`, { messageMetaData });
${JSON.stringify(lastCheckpoint)}`, { messageMetaData });
}
} else {
lastCheckpoint = JSON.parse(dbObject.deli);
Expand Down Expand Up @@ -152,7 +152,7 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
const checkpointManager = createDeliCheckpointManagerFromCollection(tenantId, documentId, this.collection);

// Should the lambda reaize that term has flipped to send a no-op message at the beginning?
return new DeliLambda(
const deliLambda = new DeliLambda(
context,
tenantId,
documentId,
Expand All @@ -164,14 +164,32 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
this.serviceConfiguration,
sessionMetric,
sessionStartMetric);

deliLambda.on("close", (closeType) => {
const handler = async () => {
if ((closeType === LambdaCloseType.ActivityTimeout || closeType === LambdaCloseType.Error)) {
const query = { documentId, tenantId, session: { $exists: true } };
const data = { "session.isSessionAlive": false };
await this.collection.update(query, data, null);
context.log?.info(`Marked isSessionAlive as false for closeType: ${JSON.stringify(closeType)}`,
{ messageMetaData });
}
};
handler().catch((e) => {
context.log?.error(`Failed to handle isSessionAlive with exception ${e}`
, { messageMetaData });
});
});

return deliLambda;
}

private logSessionFailureMetrics(
sessionMetric: Lumber<LumberEventName.SessionResult> | undefined,
sessionStartMetric: Lumber<LumberEventName.StartSessionResult> | undefined,
errMsg: string) {
sessionMetric?.error(errMsg);
sessionStartMetric?.error(errMsg);
sessionMetric?.error(errMsg);
sessionStartMetric?.error(errMsg);
}

public async dispose(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
MongoManager,
IThrottler,
ICache,
ICollection,
IDocument,
} from "@fluidframework/server-services-core";
import { json, urlencoded } from "body-parser";
import compression from "compression";
Expand Down Expand Up @@ -49,7 +51,7 @@ export function create(
appTenants: IAlfredTenant[],
operationsDbMongoManager: MongoManager,
producer: IProducer,
globalDbMongoManager?: MongoManager) {
documentsCollection: ICollection<IDocument>) {
// Maximum REST request size
const requestSize = config.get("alfred:restJsonSize");

Expand Down Expand Up @@ -111,7 +113,7 @@ export function create(
storage,
producer,
appTenants,
globalDbMongoManager);
documentsCollection);

app.use("/public", cors(), express.static(path.join(__dirname, "../../public")));
app.use(routes.api);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

import * as crypto from "crypto";
import {
IDocument,
IDocumentStorage,
IThrottler,
ITenantManager,
ICache,
MongoManager,
ICollection,
} from "@fluidframework/server-services-core";
import {
verifyStorageToken,
Expand All @@ -20,10 +21,10 @@ import {
} from "@fluidframework/server-services-utils";
import { Router } from "express";
import winston from "winston";
import { IAlfredTenant } from "@fluidframework/server-services-client";
import { IAlfredTenant, ISession } from "@fluidframework/server-services-client";
import { Provider } from "nconf";
import { v4 as uuid } from "uuid";
import { Constants, handleResponse } from "../../../utils";
import { Constants, handleResponse, getSession } from "../../../utils";

export function create(
storage: IDocumentStorage,
Expand All @@ -32,9 +33,10 @@ export function create(
singleUseTokenCache: ICache,
config: Provider,
tenantManager: ITenantManager,
globalDbMongoManager?: MongoManager): Router {
documentsCollection: ICollection<IDocument>): Router {
const router: Router = Router();

const ordererUrl = config.get("worker:serverUrl");
const historianUrl = config.get("worker:blobStorageUrl");
// Whether to enforce server-generated document ids in create doc flow
const enforceServerGeneratedDocumentId: boolean = config.get("alfred:enforceServerGeneratedDocumentId") ?? false;

Expand All @@ -61,7 +63,7 @@ export function create(
(error) => {
response.status(400).json(error);
});
});
});

/**
* Creates a new document with initial summary.
Expand All @@ -74,7 +76,7 @@ export function create(
singleUseTokenCache,
}),
throttle(throttler, winston, commonThrottleOptions),
(request, response, next) => {
async (request, response, next) => {
// Tenant and document
const tenantId = getParam(request.params, "tenantId");
// If enforcing server generated document id, ignore id parameter
Expand All @@ -95,26 +97,53 @@ export function create(
sequenceNumber,
1,
crypto.randomBytes(4).toString("hex"),
ordererUrl,
historianUrl,
values);

// Generate creation token given a jwt from header
const authorizationHeader = request.header("Authorization");
const tokenRegex = /Basic (.+)/;
const tokenMatch = tokenRegex.exec(authorizationHeader);
const token = tokenMatch[1];
const enableDiscovery: boolean = request.body.enableDiscovery ?? false;

const tenantKeyP = tenantManager.getKey(tenantId);

handleResponse(Promise.all([createP, tenantKeyP]).then(([_, key]) => {
// @TODO: Modify it to return an object only, it returns string for back-compat.
return generateToken
? {
id,
token: getCreationToken(token, key, id),
}
: id;
}), response, undefined, 201);
// Handle backwards compatibility for older driver versions.
// TODO: remove condition once old drivers are phased out and all clients can handle object response
const clientAcceptsObjectResponse = enableDiscovery === true || generateToken === true;
if (clientAcceptsObjectResponse) {
const responseBody = { id, token: undefined, session: undefined };
if (generateToken) {
// Generate creation token given a jwt from header
const authorizationHeader = request.header("Authorization");
const tokenRegex = /Basic (.+)/;
const tokenMatch = tokenRegex.exec(authorizationHeader);
const token = tokenMatch[1];
const tenantKey = await tenantManager.getKey(tenantId);
responseBody.token = getCreationToken(token, tenantKey, id);
}
if (enableDiscovery) {
// Session information
const session: ISession = {
ordererUrl,
historianUrl,
isSessionAlive: false,
};
responseBody.session = session;
}
handleResponse(createP.then(() => responseBody), response, undefined, 201);
} else {
handleResponse(createP.then(() => id), response, undefined, 201);
}
});

/**
* Get the session information.
*/
router.get(
"/:tenantId/session/:id",
verifyStorageToken(tenantManager, config),
throttle(throttler, winston, commonThrottleOptions),
async (request, response, next) => {
const documentId = getParam(request.params, "id");
const tenantId = getParam(request.params, "tenantId");
const session = getSession(ordererUrl, historianUrl, tenantId, documentId, documentsCollection);
handleResponse(session, response, undefined, 200);
});
return router;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import {
ICache,
ICollection,
IDocument,
IDocumentStorage,
IProducer,
ITenantManager,
Expand All @@ -28,7 +30,7 @@ export function create(
operationsDbMongoManager: MongoManager,
producer: IProducer,
appTenants: IAlfredTenant[],
globalDbMongoManager?: MongoManager): Router {
documentsCollection: ICollection<IDocument>): Router {
const router: Router = Router();
const deltasRoute = deltas.create(config, tenantManager, operationsDbMongoManager, appTenants, throttler);
const documentsRoute = documents.create(
Expand All @@ -38,7 +40,7 @@ export function create(
singleUseTokenCache,
config,
tenantManager,
globalDbMongoManager);
documentsCollection);
const apiRoute = api.create(config, producer, tenantManager, storage, throttler);

router.use(cors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {
MongoManager,
IThrottler,
ICache,
ICollection,
IDocument,
} from "@fluidframework/server-services-core";
import { Router } from "express";
import { Provider } from "nconf";
Expand All @@ -30,7 +32,7 @@ export function create(
storage: IDocumentStorage,
producer: IProducer,
appTenants: IAlfredTenant[],
globalDbMongoManager?: MongoManager) {
documentsCollection: ICollection<IDocument>) {
return {
api: api.create(
config,
Expand All @@ -41,7 +43,7 @@ export function create(
operationsDbMongoManager,
producer,
appTenants,
globalDbMongoManager,
documentsCollection,
),
};
}
Loading

0 comments on commit 86e4ed7

Please sign in to comment.