Skip to content

Commit

Permalink
Wait to raise Connected event for Read connection until container is …
Browse files Browse the repository at this point in the history
…caught up (#9377)

The desired outcome is that regardless of connection mode (which is an implementation detail - we use read connection even if we have write permissions sometimes), the semantics of the "connected" event are consisted:  It should mean that you're connected and caught up.  This has been the case for write but not for read connections.

This change is "dark" meaning no behavior change is expected with this PR, but there's a setting "Fluid.Container.CatchUpBeforeDeclaringConnected" which can be set to true to enable the new behavior.  Once this is tested and permanently enabled, we should be able to deprecate `waitContainerToCatchUp` in favor of just responding to the `connected` event.
  • Loading branch information
markfields authored Jul 22, 2022
1 parent 4beded1 commit 0b5950d
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 18 deletions.
99 changes: 99 additions & 0 deletions packages/loader/container-loader/src/catchUpMonitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { IDisposable, IEvent } from "@fluidframework/common-definitions";
import { assert, TypedEventEmitter } from "@fluidframework/common-utils";
import { IDeltaManager } from "@fluidframework/container-definitions";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";

/** @see ICatchUpMonitor for usage */
type CaughtUpListener = () => void;

/** @see ICatchUpMonitor for usage */
export interface ICatchUpMonitorEvents extends IEvent {
(event: "caughtUp", listener: CaughtUpListener): void;
}

/** Monitor that emits an event when a Container has caught up to a given point in the op stream */
export interface ICatchUpMonitor extends TypedEventEmitter<ICatchUpMonitorEvents>, IDisposable { }

/**
* Monitors a Container's DeltaManager, notifying listeners when all ops have been processed
* that were known at the time the monitor was created.
*/
export class CatchUpMonitor extends TypedEventEmitter<ICatchUpMonitorEvents> implements ICatchUpMonitor {
private readonly targetSeqNumber: number;
private caughtUp: boolean = false;

private readonly opHandler = (message: Pick<ISequencedDocumentMessage, "sequenceNumber">) => {
if (!this.caughtUp && message.sequenceNumber >= this.targetSeqNumber) {
this.caughtUp = true;
this.emit("caughtUp");
}
};

/**
* Create the CatchUpMonitor, setting the target sequence number to wait for based on DeltaManager's current state.
*/
constructor(
private readonly deltaManager: IDeltaManager<any, any>,
) {
super();

this.targetSeqNumber = this.deltaManager.lastKnownSeqNumber;

assert(this.targetSeqNumber >= this.deltaManager.lastSequenceNumber,
"Cannot wait for seqNumber below last processed sequence number");

this.deltaManager.on("op", this.opHandler);

// Simulate the last processed op to set caughtUp in case we already are
this.opHandler({ sequenceNumber: this.deltaManager.lastSequenceNumber });

// If a listener is added after we are already caught up, notify that new listener immediately
this.on("newListener", (event: string, listener) => {
if (event === "caughtUp") {
const caughtUpListener = listener as CaughtUpListener;
if (this.caughtUp) {
caughtUpListener();
}
}
});
}

public disposed: boolean = false;
public dispose() {
if (this.disposed) {
return;
}
this.disposed = true;

this.removeAllListeners();
this.deltaManager.off("op", this.opHandler);
}
}

/** Monitor that always notifies listeners immediately */
export class ImmediateCatchUpMonitor extends TypedEventEmitter<ICatchUpMonitorEvents> implements ICatchUpMonitor {
constructor() {
super();
this.on("newListener", (event: string, listener) => {
if (event === "caughtUp") {
const caughtUpListener = listener as CaughtUpListener;
caughtUpListener();
}
});
}

public disposed: boolean = false;
public dispose() {
if (this.disposed) {
return;
}
this.disposed = true;

this.removeAllListeners();
}
}
70 changes: 55 additions & 15 deletions packages/loader/container-loader/src/connectionStateHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

import { ITelemetryLogger, ITelemetryProperties } from "@fluidframework/common-definitions";
import { assert, Timer } from "@fluidframework/common-utils";
import { IConnectionDetails } from "@fluidframework/container-definitions";
import { IConnectionDetails, IDeltaManager } from "@fluidframework/container-definitions";
import { ILocalSequencedClient, IProtocolHandler } from "@fluidframework/protocol-base";
import { ConnectionMode, IQuorumClients } from "@fluidframework/protocol-definitions";
import { PerformanceEvent } from "@fluidframework/telemetry-utils";
import { ConnectionState } from "./connectionState";
import { CatchUpMonitor, ICatchUpMonitor, ImmediateCatchUpMonitor } from "./catchUpMonitor";

/** Constructor parameter type for passing in dependencies needed by the ConnectionStateHandler */
export interface IConnectionStateHandlerInputs {
Expand All @@ -36,7 +37,7 @@ const JoinOpTimeoutMs = 45000;
* sequenced or blocked by the server before emitting the new "connected" event and allowing runtime to resubmit ops.
*
* Each connection is assigned a clientId by the service, and the connection is book-ended by a Join and a Leave op
* generated by the service. Due to the distributed nature of the ordering service, in the case of reconnect we cannot
* generated by the service. Due to the distributed nature of the Relay Service, in the case of reconnect we cannot
* make any assumptions about ordering of operations between the old and new connections - i.e. new Join op could
* be sequenced before old Leave op (and some acks from pending ops that were in flight when we disconnected).
*
Expand All @@ -47,14 +48,17 @@ const JoinOpTimeoutMs = 45000;
* pending ops can safely be submitted with the new clientId without fear of duplication in the sequenced op stream.
* (B) We process the Join op for the new clientId (identified when the underlying connection was first established),
* indicating the service is ready to sequence ops sent with the new clientId.
* (C) We process all ops known at the time the underlying connection was established (so we are "caught up")
*
* For (A) we give up waiting after some time (same timeout as server uses), and go ahead and transition to Connected.
* For (B) we log telemetry if it takes too long, but still only transition to Connected when the Join op is processed
* and we are added to the Quorum.
* For (C) this is optional behavior, controlled by the parameters of receivedConnectEvent
*/
export class ConnectionStateHandler {
private _connectionState = ConnectionState.Disconnected;
private _pendingClientId: string | undefined;
private catchUpMonitor: ICatchUpMonitor | undefined;
private readonly prevClientLeftTimer: Timer;
private readonly joinOpTimer: Timer;

Expand Down Expand Up @@ -106,7 +110,7 @@ export class ConnectionStateHandler {
const quorumClients = this.handler.quorumClients();
const details = {
quorumInitialized: quorumClients !== undefined,
hasPendingClientId: this.pendingClientId !== undefined,
pendingClientId: this.pendingClientId,
inQuorum: quorumClients?.getMember(this.pendingClientId ?? "") !== undefined,
waitingForLeaveOp: this.waitingForLeaveOp,
};
Expand Down Expand Up @@ -184,7 +188,10 @@ export class ConnectionStateHandler {
&& !this.waitingForLeaveOp
) {
this.waitEvent?.end({ source });
this.setConnectionState(ConnectionState.Connected);

assert(this.catchUpMonitor !== undefined,
"catchUpMonitor should always be set if pendingClientId is set");
this.catchUpMonitor.on("caughtUp", this.transitionToConnectedState);
} else {
// Adding this event temporarily so that we can get help debugging if something goes wrong.
this.logger.sendTelemetryEvent({
Expand All @@ -210,32 +217,47 @@ export class ConnectionStateHandler {
}

public receivedDisconnectEvent(reason: string) {
if (this.joinOpTimer.hasTimer) {
this.stopJoinOpTimer();
}
this.setConnectionState(ConnectionState.Disconnected, reason);
}

private readonly transitionToConnectedState = () => {
// Defensive measure, we should always be in CatchingUp state when this is called.
if (this._connectionState === ConnectionState.CatchingUp) {
this.setConnectionState(ConnectionState.Connected);
} else {
this.logger.sendTelemetryEvent({
eventName: "cannotTransitionToConnectedState",
connectionState: ConnectionState[this._connectionState],
});
}
};

/**
* The "connect" event indicates the connection to the Relay Service is live.
* However, some additional conditions must be met before we can fully transition to
* "Connected" state. This function handles that interim period, known as "Connecting" state.
* @param connectionMode - Read or Write connection
* @param details - Connection details returned from the ordering service
* @param details - Connection details returned from the Relay Service
* @param deltaManager - DeltaManager to be used for delaying Connected transition until caught up.
* If it's undefined, then don't delay and transition to Connected as soon as Leave/Join op are accounted for
*/
public receivedConnectEvent(
connectionMode: ConnectionMode,
details: IConnectionDetails,
deltaManager?: IDeltaManager<any, any>,
) {
const oldState = this._connectionState;
this._connectionState = ConnectionState.CatchingUp;

const writeConnection = connectionMode === "write";
assert(writeConnection || !this.handler.shouldClientJoinWrite(),
assert(!this.handler.shouldClientJoinWrite() || writeConnection,
0x30a /* shouldClientJoinWrite should imply this is a writeConnection */);
assert(writeConnection || !this.waitingForLeaveOp,
assert(!this.waitingForLeaveOp || writeConnection,
0x2a6 /* "waitingForLeaveOp should imply writeConnection (we need to be ready to flush pending ops)" */);

// Defensive measure in case catchUpMonitor from previous connection attempt wasn't already cleared
this.catchUpMonitor?.dispose();

// Note that this may be undefined since the connection is established proactively on load
// and the quorum may still be under initialization.
const quorumClients: IQuorumClients | undefined = this.handler.quorumClients();
Expand All @@ -248,6 +270,11 @@ export class ConnectionStateHandler {
// we know there can no longer be outstanding ops that we sent with the previous client id.
this._pendingClientId = details.clientId;

// We may want to catch up to known ops as of now before transitioning to Connected state
this.catchUpMonitor = deltaManager !== undefined
? new CatchUpMonitor(deltaManager)
: new ImmediateCatchUpMonitor();

// IMPORTANT: Report telemetry after we set _pendingClientId, but before transitioning to Connected state
this.handler.logConnectionStateChangeTelemetry(ConnectionState.CatchingUp, oldState);

Expand All @@ -264,9 +291,21 @@ export class ConnectionStateHandler {
this.startJoinOpTimer();
} else if (!this.waitingForLeaveOp) {
// We're not waiting for Join or Leave op (if read-only connection those don't even apply),
// go ahead and declare the state to be Connected!
// If we are waiting for Leave op still, do nothing for now, we will transition to Connected later.
this.setConnectionState(ConnectionState.Connected);
// but we do need to wait until we are caught up (to now-known ops) before transitioning to Connected state.
this.catchUpMonitor.on("caughtUp", this.transitionToConnectedState);
}
// else - We are waiting for Leave op still, do nothing for now, we will transition to Connected later
}

/** Clear all the state used during the Connecting phase (set in receivedConnectEvent) */
private clearPendingConnectionState() {
this._pendingClientId = undefined;

this.catchUpMonitor?.dispose();
this.catchUpMonitor = undefined;

if (this.joinOpTimer.hasTimer) {
this.stopJoinOpTimer();
}
}

Expand Down Expand Up @@ -295,8 +334,9 @@ export class ConnectionStateHandler {
}
this._clientId = this.pendingClientId;
} else if (value === ConnectionState.Disconnected) {
// Important as we process our own joinSession message through delta request
this._pendingClientId = undefined;
// Clear pending state immediately to prepare for reconnect
this.clearPendingConnectionState();

// Only wait for "leave" message if the connected client exists in the quorum because only the write
// client will exist in the quorum and only for those clients we will receive "removeMember" event and
// the client has some unacked ops.
Expand Down
12 changes: 11 additions & 1 deletion packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ export async function waitContainerToCatchUp(container: IContainer) {
};
container.on("closed", closedCallback);

// Depending on config, transition to "connected" state may include the guarantee
// that all known ops have been processed. If so, we may introduce additional wait here.
// Waiting for "connected" state in either case gets us at least to our own Join op
// which is a reasonable approximation of "caught up"
const waitForOps = () => {
assert(container.connectionState === ConnectionState.CatchingUp
|| container.connectionState === ConnectionState.Connected,
Expand Down Expand Up @@ -543,7 +547,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
/**
* Returns true if container is dirty.
* Which means data loss if container is closed at that same moment
* Most likely that happens when there is no network connection to ordering service
* Most likely that happens when there is no network connection to Relay Service
*/
public get isDirty() {
return this._dirtyContainer;
Expand Down Expand Up @@ -1551,9 +1555,15 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
}
}

const deltaManagerForCatchingUp =
this.mc.config.getBoolean("Fluid.Container.CatchUpBeforeDeclaringConnected") === true ?
this.deltaManager
: undefined;

this.connectionStateHandler.receivedConnectEvent(
this.connectionMode,
details,
deltaManagerForCatchingUp,
);
});

Expand Down
Loading

0 comments on commit 0b5950d

Please sign in to comment.