Skip to content

Commit

Permalink
For read connection, wait until caught up before declaring "connected"
Browse files Browse the repository at this point in the history
  • Loading branch information
markfields committed Mar 5, 2022
1 parent 1e52e54 commit d5e22d7
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 32 deletions.
25 changes: 17 additions & 8 deletions packages/loader/container-loader/src/connectionStateHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface IConnectionStateHandler {
maxClientLeaveWaitTime: number | undefined,
logConnectionIssue: (eventName: string) => void,
connectionStateChanged: () => void,
onCaughtUp: (callback: () => {}) => void,
}

export interface ILocalSequencedClient extends ISequencedClient {
Expand Down Expand Up @@ -188,25 +189,33 @@ export class ConnectionStateHandler {
// Report telemetry after we set client id, but before transitioning to Connected state below!
this.handler.logConnectionStateChangeTelemetry(ConnectionState.Connecting, oldState);

const quorumClients = this.handler.quorumClients();
// Check if we already processed our own join op through delta storage!
// we are fetching ops from storage in parallel to connecting to ordering service
// Given async processes, it's possible that we have already processed our own join message before
// connection was fully established.
// Note that we might be still initializing quorum - connection is established proactively on load!
if ((quorumClients !== undefined && quorumClients.getMember(details.clientId) !== undefined)
|| connectionMode === "read"
) {
assert(!this.prevClientLeftTimer.hasTimer, 0x2a6 /* "there should be no timer for 'read' connections" */);
this.setConnectionState(ConnectionState.Connected);
} else if (connectionMode === "write") {
const alreadyProcessedOwnJoinOp = this.handler.quorumClients()?.getMember(details.clientId) !== undefined;
const writeConnection = connectionMode === "write";
if (writeConnection && !alreadyProcessedOwnJoinOp) {
this.startJoinOpTimer();
} else { // read connection || alreadyProcessedOwnJoinOp
// There should be no timer for 'read' connections,
// and if we processed our Join op we would have processed our previous Leave op prior to that
assert(!this.prevClientLeftTimer.hasTimer, 0x2a6 /* "Unexpected timer state" */);

//* I wonder if I need this check...
//* shouldn't onCaughtUp always fire immediately if we've already processed our join op?
if (writeConnection) {
this.setConnectionState(ConnectionState.Connected);
} else {
this.handler.onCaughtUp(() => this.setConnectionState(ConnectionState.Connected));
}
}
}

private setConnectionState(value: ConnectionState.Disconnected, reason: string);
private setConnectionState(value: ConnectionState.Connected);
private setConnectionState(value: ConnectionState, reason?: string) {
private setConnectionState(value: ConnectionState, reason?: string): void {
if (this.connectionState === value) {
// Already in the desired state - exit early
this.logger.sendErrorEvent({ eventName: "setConnectionStateSame", value });
Expand Down
53 changes: 29 additions & 24 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,29 @@ export enum ConnectionState {
Connected,
}

/** Invoke the callback once deltaManager has processed all ops known at the time this function is called */
function waitForOps(container: IContainer, resolve: (hasCheckpointSequenceNumber: boolean) => void) {
const deltaManager = container.deltaManager;
assert(container.connectionState !== ConnectionState.Disconnected,
0x0cd /* "Container disconnected while waiting for ops!" */);
const hasCheckpointSequenceNumber = deltaManager.hasCheckpointSequenceNumber;

const targetSeqNumber = deltaManager.lastKnownSeqNumber;
assert(deltaManager.lastSequenceNumber <= targetSeqNumber,
0x266 /* "lastKnownSeqNumber should never be below last processed sequence number" */);
if (deltaManager.lastSequenceNumber === targetSeqNumber) {
resolve(hasCheckpointSequenceNumber);
return;
}
const callbackOps = (message: ISequencedDocumentMessage) => {
if (message.sequenceNumber >= targetSeqNumber) {
resolve(hasCheckpointSequenceNumber);
deltaManager.off("op", callbackOps);
}
};
deltaManager.on("op", callbackOps);
};

/**
* Waits until container connects to delta storage and gets up-to-date
* Useful when resolving URIs and hitting 404, due to container being loaded from (stale) snapshot and not being
Expand All @@ -174,43 +197,22 @@ export async function waitContainerToCatchUp(container: IContainer) {
}

return new Promise<boolean>((resolve, reject) => {
const deltaManager = container.deltaManager;

container.on("closed", reject);

const waitForOps = () => {
assert(container.connectionState !== ConnectionState.Disconnected,
0x0cd /* "Container disconnected while waiting for ops!" */);
const hasCheckpointSequenceNumber = deltaManager.hasCheckpointSequenceNumber;

const connectionOpSeqNumber = deltaManager.lastKnownSeqNumber;
assert(deltaManager.lastSequenceNumber <= connectionOpSeqNumber,
0x266 /* "lastKnownSeqNumber should never be below last processed sequence number" */);
if (deltaManager.lastSequenceNumber === connectionOpSeqNumber) {
resolve(hasCheckpointSequenceNumber);
return;
}
const callbackOps = (message: ISequencedDocumentMessage) => {
if (connectionOpSeqNumber <= message.sequenceNumber) {
resolve(hasCheckpointSequenceNumber);
deltaManager.off("op", callbackOps);
}
};
deltaManager.on("op", callbackOps);
};
//* I don't fully understand this comment

// We can leverage DeltaManager's "connect" event here and test for ConnectionState.Disconnected
// But that works only if service provides us checkPointSequenceNumber
// Our internal testing is based on R11S that does not, but almost all tests connect as "write" and
// use this function to catch up, so leveraging our own join op as a fence/barrier
if (container.connectionState === ConnectionState.Connected) {
waitForOps();
waitForOps(container, resolve);
return;
}

const callback = () => {
container.off(connectedEventName, callback);
waitForOps();
waitForOps(container, resolve);
};
container.on(connectedEventName, callback);

Expand Down Expand Up @@ -607,6 +609,9 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
this.propagateConnectionState();
}
},
onCaughtUp: (callback: () => {}) => {
waitForOps(this, callback);
}
},
this.mc.logger,
);
Expand Down

0 comments on commit d5e22d7

Please sign in to comment.