Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait to raise Connected event for Read connection until container is caught up #9377

Merged
merged 56 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
93a0576
For read connection, wait until caught up before declaring "connected"
markfields Mar 5, 2022
9cd2bf2
Remove unnecessary read v. write check
markfields Mar 5, 2022
07fe3a0
PR Feedback so far
markfields Apr 28, 2022
e066e4f
Simplify waitContainerToCatchUp.
markfields Apr 29, 2022
ae6daa9
Fix bug where we didn't always wait for catch up before "connected" s…
markfields Apr 29, 2022
e398758
PR feedback
markfields Apr 29, 2022
1d3c732
typo
markfields May 3, 2022
d149e1e
Some comments / renames
markfields May 4, 2022
e28bfb4
More clarity (ish)
markfields May 4, 2022
2f0f295
Note to self
markfields May 4, 2022
80f4704
Introduce CatchUpWaiter class
markfields May 4, 2022
f99386a
Switch to callback instead of event for CatchUpWaiter
markfields May 4, 2022
881d158
Reshuffling stuff
markfields May 4, 2022
74a3fb9
Finalize shape and name of CatchUpMonitor
markfields May 4, 2022
edc009a
ICatchupMonitor interface and Immediate impl for when flight is disabled
markfields May 5, 2022
d8c251e
Merge branch 'main' into connected-event-for-read
markfields May 24, 2022
0c41c00
lock
markfields May 25, 2022
471e700
Adding some comments, some cleanup
markfields May 25, 2022
2b22dc7
Merge remote-tracking branch 'origin/main' into connected-event-for-read
markfields Jun 2, 2022
92b308c
lock files after merge main and install
markfields Jun 2, 2022
c4c47a9
Build fixes post-merge
markfields Jun 2, 2022
6036a1e
Minor updates, and write notes on what tests to implement
markfields Jun 3, 2022
18fa2d0
lock update on install
markfields Jun 3, 2022
865161a
Trying to add some tests.
markfields Jun 3, 2022
bb73c6d
revert changes to waitContainerToCatchUp.
markfields Jun 4, 2022
56236fe
Pass in deltaManager to receivedConnectEvent
markfields Jun 4, 2022
f1ae708
Switch to optional deltaManager parameter
markfields Jun 4, 2022
5826828
0x2a6 test
markfields Jun 4, 2022
5cc95f6
Some cleanup/clarifying refactors
markfields Jun 4, 2022
5712a9c
Write broken test
markfields Jun 4, 2022
4ee6c50
Fix case where assert 0x2a6 fires for write connection
markfields Jun 6, 2022
521bbd5
Fix typo
markfields Jun 6, 2022
4bf6cce
Add a test for when quorum is not yet initialized when "connect" happens
markfields Jun 6, 2022
372ce16
Logging changes
markfields Jun 6, 2022
71d8265
Lint fix and rename
markfields Jun 6, 2022
b75e8ea
Merge branch 'main' into fix-assert-0x2a6
markfields Jun 6, 2022
4a686b8
lock file changes after a clean install
markfields Jun 6, 2022
1d10985
Build fixes post-merge
markfields Jun 6, 2022
de1bddc
Merge branch 'main' into fix-assert-0x2a6
markfields Jun 13, 2022
d384e69
Merge branch 'main' into fix-assert-0x2a6
markfields Jun 16, 2022
a3940d9
PR feedback
markfields Jun 16, 2022
5ab4033
Revert unrelated lock file changes
markfields Jun 16, 2022
9258e23
nit
markfields Jun 16, 2022
7174ecd
Merge remote-tracking branch 'origin/main' into fix-assert-0x2a6
markfields Jun 16, 2022
7623c55
Merge branch 'fix-assert-0x2a6' into connected-event-for-read
markfields Jun 16, 2022
100dcee
Merge remote-tracking branch 'origin/main' into connected-event-for-read
markfields Jun 16, 2022
7b6a7f5
Minor cleanup post-merge
markfields Jun 17, 2022
a8ea8ea
FIx build in tests
markfields Jun 17, 2022
55ec3b9
Unit Tests are green
markfields Jun 17, 2022
25159ab
Minor clean up / fixes
markfields Jun 17, 2022
2c510f6
Revert unrelated changes
markfields Jun 17, 2022
868b3d9
CatchUpMonitor tests
markfields Jun 17, 2022
1022cce
Merge branch 'main' into connected-event-for-read
markfields Jun 17, 2022
6358703
Merge remote-tracking branch 'origin/main' into connected-event-for-read
markfields Jul 21, 2022
5b88b4d
PR Feedback
markfields Jul 21, 2022
4618761
Fix test
markfields Jul 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions packages/loader/container-loader/src/catchUpMonitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*!
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
* Licensed under the MIT License.
*/

import { IDisposable, IEvent } from "@fluidframework/common-definitions";
import { assert, TypedEventEmitter } from "@fluidframework/common-utils";
import { IDeltaManager } from "@fluidframework/container-definitions";
import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions";

/** @see ICatchUpMonitor for usage */
type CaughtUpListener = () => void;

/** @see ICatchUpMonitor for usage */
export interface ICatchUpMonitorEvents extends IEvent {
(event: "caughtUp", listener: CaughtUpListener): void;
}

/** Monitor that emits an event when a Container has caught up to a given point in the op stream */
export interface ICatchUpMonitor extends TypedEventEmitter<ICatchUpMonitorEvents>, IDisposable { }
tyler-cai-microsoft marked this conversation as resolved.
Show resolved Hide resolved

/**
* Monitors a Container's DeltaManager, notifying listeners when all ops have been processed
* that were known at the time the monitor was created.
*/
export class CatchUpMonitor extends TypedEventEmitter<ICatchUpMonitorEvents> implements ICatchUpMonitor {
private readonly targetSeqNumber: number;
private caughtUp: boolean = false;

private readonly opHandler = (message: Pick<ISequencedDocumentMessage, "sequenceNumber">) => {
if (!this.caughtUp && message.sequenceNumber >= this.targetSeqNumber) {
this.caughtUp = true;
this.emit("caughtUp");
}
};

/**
* Create the CatchUpMonitor, setting the target sequence number to wait for based on DeltaManager's current state.
*/
constructor(
private readonly deltaManager: IDeltaManager<any, any>,
) {
super();

this.targetSeqNumber = this.deltaManager.lastKnownSeqNumber;

assert(this.targetSeqNumber >= this.deltaManager.lastSequenceNumber,
"Cannot wait for seqNumber below last processed sequence number");

this.deltaManager.on("op", this.opHandler);

// Simulate the last processed op to set caughtUp in case we already are
this.opHandler({ sequenceNumber: this.deltaManager.lastSequenceNumber });
markfields marked this conversation as resolved.
Show resolved Hide resolved

// If a listener is added after we are already caught up, notify that new listener immediately
this.on("newListener", (event: string, listener) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like instead of using events, it's better to use a property that exposes a promise. I.e. something like

public caughtUp: Promise<number>;

Promise seems to be better because this event files once, and past "firing" reflected on new listeners - that's exactly what promise does.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a great point. I don't remember why I switched to event-based to begin with, but I bet it would work better with Promise. That said, I don't have time to try it out now, and also leaving it will reduce merge conflicts with your follow-on refactor.

if (event === "caughtUp") {
const caughtUpListener = listener as CaughtUpListener;
if (this.caughtUp) {
caughtUpListener();
}
}
});
}

public disposed: boolean = false;
public dispose() {
if (this.disposed) {
return;
}
this.disposed = true;

this.removeAllListeners();
this.deltaManager.off("op", this.opHandler);
}
}

/** Monitor that always notifies listeners immediately */
export class ImmediateCatchUpMonitor extends TypedEventEmitter<ICatchUpMonitorEvents> implements ICatchUpMonitor {
constructor() {
super();
this.on("newListener", (event: string, listener) => {
markfields marked this conversation as resolved.
Show resolved Hide resolved
markfields marked this conversation as resolved.
Show resolved Hide resolved
if (event === "caughtUp") {
const caughtUpListener = listener as CaughtUpListener;
caughtUpListener();
}
});
}

public disposed: boolean = false;
public dispose() {
if (this.disposed) {
return;
}
this.disposed = true;

this.removeAllListeners();
}
}
70 changes: 55 additions & 15 deletions packages/loader/container-loader/src/connectionStateHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

import { ITelemetryLogger, ITelemetryProperties } from "@fluidframework/common-definitions";
import { assert, Timer } from "@fluidframework/common-utils";
import { IConnectionDetails } from "@fluidframework/container-definitions";
import { IConnectionDetails, IDeltaManager } from "@fluidframework/container-definitions";
import { ILocalSequencedClient, IProtocolHandler } from "@fluidframework/protocol-base";
import { ConnectionMode, IQuorumClients } from "@fluidframework/protocol-definitions";
import { PerformanceEvent } from "@fluidframework/telemetry-utils";
import { ConnectionState } from "./connectionState";
import { CatchUpMonitor, ICatchUpMonitor, ImmediateCatchUpMonitor } from "./catchUpMonitor";

/** Constructor parameter type for passing in dependencies needed by the ConnectionStateHandler */
export interface IConnectionStateHandlerInputs {
Expand All @@ -36,7 +37,7 @@ const JoinOpTimeoutMs = 45000;
* sequenced or blocked by the server before emitting the new "connected" event and allowing runtime to resubmit ops.
*
* Each connection is assigned a clientId by the service, and the connection is book-ended by a Join and a Leave op
* generated by the service. Due to the distributed nature of the ordering service, in the case of reconnect we cannot
* generated by the service. Due to the distributed nature of the Relay Service, in the case of reconnect we cannot
* make any assumptions about ordering of operations between the old and new connections - i.e. new Join op could
* be sequenced before old Leave op (and some acks from pending ops that were in flight when we disconnected).
*
Expand All @@ -47,14 +48,17 @@ const JoinOpTimeoutMs = 45000;
* pending ops can safely be submitted with the new clientId without fear of duplication in the sequenced op stream.
* (B) We process the Join op for the new clientId (identified when the underlying connection was first established),
* indicating the service is ready to sequence ops sent with the new clientId.
* (C) We process all ops known at the time the underlying connection was established (so we are "caught up")
*
* For (A) we give up waiting after some time (same timeout as server uses), and go ahead and transition to Connected.
* For (B) we log telemetry if it takes too long, but still only transition to Connected when the Join op is processed
* and we are added to the Quorum.
* For (C) this is optional behavior, controlled by the parameters of receivedConnectEvent
*/
export class ConnectionStateHandler {
private _connectionState = ConnectionState.Disconnected;
private _pendingClientId: string | undefined;
private catchUpMonitor: ICatchUpMonitor | undefined;
private readonly prevClientLeftTimer: Timer;
private readonly joinOpTimer: Timer;

Expand Down Expand Up @@ -106,7 +110,7 @@ export class ConnectionStateHandler {
const quorumClients = this.handler.quorumClients();
const details = {
quorumInitialized: quorumClients !== undefined,
hasPendingClientId: this.pendingClientId !== undefined,
pendingClientId: this.pendingClientId,
inQuorum: quorumClients?.getMember(this.pendingClientId ?? "") !== undefined,
waitingForLeaveOp: this.waitingForLeaveOp,
};
Expand Down Expand Up @@ -184,7 +188,10 @@ export class ConnectionStateHandler {
&& !this.waitingForLeaveOp
) {
this.waitEvent?.end({ source });
this.setConnectionState(ConnectionState.Connected);
markfields marked this conversation as resolved.
Show resolved Hide resolved

assert(this.catchUpMonitor !== undefined,
"catchUpMonitor should always be set if pendingClientId is set");
this.catchUpMonitor.on("caughtUp", this.transitionToConnectedState);
} else {
// Adding this event temporarily so that we can get help debugging if something goes wrong.
this.logger.sendTelemetryEvent({
Expand All @@ -210,32 +217,47 @@ export class ConnectionStateHandler {
}

public receivedDisconnectEvent(reason: string) {
if (this.joinOpTimer.hasTimer) {
this.stopJoinOpTimer();
}
this.setConnectionState(ConnectionState.Disconnected, reason);
}

private readonly transitionToConnectedState = () => {
markfields marked this conversation as resolved.
Show resolved Hide resolved
// Defensive measure, we should always be in CatchingUp state when this is called.
if (this._connectionState === ConnectionState.CatchingUp) {
this.setConnectionState(ConnectionState.Connected);
} else {
this.logger.sendTelemetryEvent({
eventName: "cannotTransitionToConnectedState",
connectionState: ConnectionState[this._connectionState],
});
}
};

/**
* The "connect" event indicates the connection to the Relay Service is live.
* However, some additional conditions must be met before we can fully transition to
* "Connected" state. This function handles that interim period, known as "Connecting" state.
* @param connectionMode - Read or Write connection
* @param details - Connection details returned from the ordering service
* @param details - Connection details returned from the Relay Service
* @param deltaManager - DeltaManager to be used for delaying Connected transition until caught up.
* If it's undefined, then don't delay and transition to Connected as soon as Leave/Join op are accounted for
*/
public receivedConnectEvent(
connectionMode: ConnectionMode,
details: IConnectionDetails,
deltaManager?: IDeltaManager<any, any>,
) {
const oldState = this._connectionState;
this._connectionState = ConnectionState.CatchingUp;

const writeConnection = connectionMode === "write";
assert(writeConnection || !this.handler.shouldClientJoinWrite(),
assert(!this.handler.shouldClientJoinWrite() || writeConnection,
0x30a /* shouldClientJoinWrite should imply this is a writeConnection */);
assert(writeConnection || !this.waitingForLeaveOp,
assert(!this.waitingForLeaveOp || writeConnection,
0x2a6 /* "waitingForLeaveOp should imply writeConnection (we need to be ready to flush pending ops)" */);

// Defensive measure in case catchUpMonitor from previous connection attempt wasn't already cleared
this.catchUpMonitor?.dispose();

// Note that this may be undefined since the connection is established proactively on load
// and the quorum may still be under initialization.
const quorumClients: IQuorumClients | undefined = this.handler.quorumClients();
Expand All @@ -248,6 +270,11 @@ export class ConnectionStateHandler {
// we know there can no longer be outstanding ops that we sent with the previous client id.
this._pendingClientId = details.clientId;

// We may want to catch up to known ops as of now before transitioning to Connected state
this.catchUpMonitor = deltaManager !== undefined
? new CatchUpMonitor(deltaManager)
: new ImmediateCatchUpMonitor();

// IMPORTANT: Report telemetry after we set _pendingClientId, but before transitioning to Connected state
this.handler.logConnectionStateChangeTelemetry(ConnectionState.CatchingUp, oldState);

Expand All @@ -264,9 +291,21 @@ export class ConnectionStateHandler {
this.startJoinOpTimer();
} else if (!this.waitingForLeaveOp) {
// We're not waiting for Join or Leave op (if read-only connection those don't even apply),
// go ahead and declare the state to be Connected!
// If we are waiting for Leave op still, do nothing for now, we will transition to Connected later.
this.setConnectionState(ConnectionState.Connected);
// but we do need to wait until we are caught up (to now-known ops) before transitioning to Connected state.
this.catchUpMonitor.on("caughtUp", this.transitionToConnectedState);
markfields marked this conversation as resolved.
Show resolved Hide resolved
}
// else - We are waiting for Leave op still, do nothing for now, we will transition to Connected later
}

/** Clear all the state used during the Connecting phase (set in receivedConnectEvent) */
private clearPendingConnectionState() {
this._pendingClientId = undefined;

this.catchUpMonitor?.dispose();
this.catchUpMonitor = undefined;

if (this.joinOpTimer.hasTimer) {
this.stopJoinOpTimer();
}
}

Expand Down Expand Up @@ -295,8 +334,9 @@ export class ConnectionStateHandler {
}
this._clientId = this.pendingClientId;
} else if (value === ConnectionState.Disconnected) {
// Important as we process our own joinSession message through delta request
this._pendingClientId = undefined;
// Clear pending state immediately to prepare for reconnect
this.clearPendingConnectionState();

// Only wait for "leave" message if the connected client exists in the quorum because only the write
// client will exist in the quorum and only for those clients we will receive "removeMember" event and
// the client has some unacked ops.
Expand Down
12 changes: 11 additions & 1 deletion packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ export async function waitContainerToCatchUp(container: IContainer) {
};
container.on("closed", closedCallback);

// Depending on config, transition to "connected" state may include the guarantee
// that all known ops have been processed. If so, we may introduce additional wait here.
Copy link
Member Author

@markfields markfields Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, we may introduce additional wait here

What do you all think - is this correct or not? It may not add additional delay since events fire synchronously.

// Waiting for "connected" state in either case gets us at least to our own Join op
// which is a reasonable approximation of "caught up"
const waitForOps = () => {
assert(container.connectionState === ConnectionState.CatchingUp
|| container.connectionState === ConnectionState.Connected,
Expand Down Expand Up @@ -543,7 +547,7 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
/**
* Returns true if container is dirty.
* Which means data loss if container is closed at that same moment
* Most likely that happens when there is no network connection to ordering service
* Most likely that happens when there is no network connection to Relay Service
*/
public get isDirty() {
return this._dirtyContainer;
Expand Down Expand Up @@ -1551,9 +1555,15 @@ export class Container extends EventEmitterWithErrorHandling<IContainerEvents> i
}
}

const deltaManagerForCatchingUp =
this.mc.config.getBoolean("Fluid.Container.CatchUpBeforeDeclaringConnected") === true ?
jatgarg marked this conversation as resolved.
Show resolved Hide resolved
markfields marked this conversation as resolved.
Show resolved Hide resolved
this.deltaManager
: undefined;

this.connectionStateHandler.receivedConnectEvent(
this.connectionMode,
details,
deltaManagerForCatchingUp,
);
});

Expand Down
Loading