Skip to content

Commit

Permalink
feat(presence): Add support for signal batching (microsoft#23075)
Browse files Browse the repository at this point in the history
## Summary of changes

Presence updates are grouped together and throttled to prevent flooding
the network with messages when presence values are rapidly updated. This
means the presence infrastructure will not immediately broadcast updates
but will broadcast them after a configurable delay.

The `allowableUpdateLatencyMs` property configures how long a local
update may be delayed under normal circumstances, enabling batching with
other updates. The default `allowableUpdateLatencyMs` is **60
milliseconds** but may be (1) specified during configuration of a States
Workspace or Value Manager and/or (2) updated later using the `controls`
member of Workspace or Value Manager. States Workspace configuration
applies when a Value Manager does not have its own setting.

Notifications are never queued; they effectively always have an
`allowableUpdateLatencyMs` of 0. However, they may be batched with other
updates that were already queued.

Note that due to throttling, clients receiving updates may not see
updates for all values set by another. For example,
with `Latest*ValueManagers`, the only value sent is the value at the
time the outgoing batched message is sent. Previous
values set by the client will not be broadcast or seen by other clients.

### Example

You can configure the batching and throttling behavior using the
`allowableUpdateLatencyMs` property as in the following example:

```ts
// Configure a states workspace
const stateWorkspace = presence.getStates("app:v1states",
	{
		// This value manager has an allowable latency of 100ms.
		position: Latest({ x: 0, y: 0 }, { allowableUpdateLatencyMs: 100 }),
		// This value manager uses the workspace default.
		count: Latest({ num: 0 }),
	},
	// Specify the default for all value managers in this workspace to 200ms,
	// overriding the default value of 60ms.
	{ allowableUpdateLatencyMs: 200 }
);

// Temporarily set count updates to send as soon as possible
const countState = stateWorkspace.props.count;
countState.controls.allowableUpdateLatencyMs = 0;
countState.local = { num: 5000 };

// Reset the update latency to the workspace default
countState.controls.allowableUpdateLatencyMs = undefined;
```

## Test cases

1. Signals send immediately when allowable latency is 0
1. Signals are batched when they are received within the send deadline
(send deadline is set to now + allowable latency iff a new message is
queued; i.e. a message was not previously queued - this is the first
message to be put in the queue)
1. Signals from LVMs with different allowable latencies are batched
correctly
1. Queued signal is sent with immediate signal (i.e. if a signal is
queued and an immediate message comes in, it should merge and send
immediately)
1. Multiple workspaces; messages from multiple workspaces should be
batched and queued
1. Include sending notifications - they should be sent immediately

The test cases themselves include inline comments with the time and
expected deadline for each operation. Signals are also numbered inline
to make it easier to match up with the expected signals.

## Known gaps

LatestMapValueManager is not tested.

---------

Co-authored-by: Jason Hartman <[email protected]>
  • Loading branch information
tylerbutler and jason-ha authored Nov 26, 2024
1 parent f7be965 commit abde76d
Show file tree
Hide file tree
Showing 7 changed files with 1,307 additions and 25 deletions.
45 changes: 45 additions & 0 deletions .changeset/shy-files-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
"@fluidframework/presence": minor
---
---
"section": feature
---

Presence updates are now batched and throttled

Presence updates are grouped together and throttled to prevent flooding the network with messages when presence values are rapidly updated. This means the presence infrastructure will not immediately broadcast updates but will broadcast them after a configurable delay.

The `allowableUpdateLatencyMs` property configures how long a local update may be delayed under normal circumstances, enabling batching with other updates. The default `allowableUpdateLatencyMs` is **60 milliseconds** but may be (1) specified during configuration of a [States Workspace](https://github.com/microsoft/FluidFramework/tree/main/packages/framework/presence#value-managers#states-workspace) or [Value Manager](https://github.com/microsoft/FluidFramework/tree/main/packages/framework/presence#value-managers#value-managers) and/or (2) updated later using the `controls` member of Workspace or Value Manager. [States Workspace](https://github.com/microsoft/FluidFramework/tree/main/packages/framework/presence#value-managers#states-workspace) configuration applies when a Value Manager does not have its own setting.

Notifications are never queued; they effectively always have an `allowableUpdateLatencyMs` of 0. However, they may be batched with other updates that were already queued.

Note that due to throttling, clients receiving updates may not see updates for all values set by another. For example,
with `Latest*ValueManagers`, the only value sent is the value at the time the outgoing batched message is sent. Previous
values set by the client will not be broadcast or seen by other clients.

#### Example

You can configure the batching and throttling behavior using the `allowableUpdateLatencyMs` property as in the following example:

```ts
// Configure a states workspace
const stateWorkspace = presence.getStates("app:v1states",
{
// This value manager has an allowable latency of 100ms.
position: Latest({ x: 0, y: 0 }, { allowableUpdateLatencyMs: 100 }),
// This value manager uses the workspace default.
count: Latest({ num: 0 }),
},
// Specify the default for all value managers in this workspace to 200ms,
// overriding the default value of 60ms.
{ allowableUpdateLatencyMs: 200 }
);

// Temporarily set count updates to send as soon as possible
const countState = stateWorkspace.props.count;
countState.controls.allowableUpdateLatencyMs = 0;
countState.local = { num: 5000 };

// Reset the update latency to the workspace default
countState.controls.allowableUpdateLatencyMs = undefined;
```
39 changes: 37 additions & 2 deletions packages/framework/presence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,44 @@ Notifications API is partially implemented. All messages are always broadcast ev

Notifications are fundamentally unreliable at this time as there are no built-in acknowledgements nor retained state. To prevent most common loss of notifications, always check for connection before sending.

### Throttling
### Throttling/batching

Throttling is not yet implemented. `BroadcastControls` exists in the API to provide control over throttling of value updates, but throttling is not yet implemented. It is recommended that `BroadcastControls.allowableUpdateLatencyMs` use is considered and specified to light up once support is added.
Presence updates are grouped together and throttled to prevent flooding the network with messages when presence values are rapidly updated. This means the presence infrastructure will not immediately broadcast updates but will broadcast them after a configurable delay.

The `allowableUpdateLatencyMs` property configures how long a local update may be delayed under normal circumstances, enabling batching with other updates. The default `allowableUpdateLatencyMs` is **60 milliseconds** but may be (1) specified during configuration of a [States Workspace](#states-workspace) or [Value Manager](#value-managers) and/or (2) updated later using the `controls` member of Workspace or Value Manager. [States Workspace](#states-workspace) configuration applies when a Value Manager does not have its own setting.

Notifications are never queued; they effectively always have an `allowableUpdateLatencyMs` of 0. However, they may be batched with other updates that were already queued.

Note that due to throttling, clients receiving updates may not see updates for all values set by another. For example,
with `Latest*ValueManagers`, the only value sent is the value at the time the outgoing batched message is sent. Previous
values set by the client will not be broadcast or seen by other clients.

#### Example

You can configure the batching and throttling behavior using the `allowableUpdateLatencyMs` property as in the following example:

```ts
// Configure a states workspace
const stateWorkspace = presence.getStates("app:v1states",
{
// This value manager has an allowable latency of 100ms.
position: Latest({ x: 0, y: 0 }, { allowableUpdateLatencyMs: 100 }),
// This value manager uses the workspace default.
count: Latest({ num: 0 }),
},
// Specify the default for all value managers in this workspace to 200ms,
// overriding the default value of 60ms.
{ allowableUpdateLatencyMs: 200 }
);

// Temporarily set count updates to send as soon as possible
const countState = stateWorkspace.props.count;
countState.controls.allowableUpdateLatencyMs = 0;
countState.local = { num: 5000 };

// Reset the update latency to the workspace default
countState.controls.allowableUpdateLatencyMs = undefined;
```

<!-- AUTO-GENERATED-CONTENT:START (README_FOOTER) -->

Expand Down
159 changes: 137 additions & 22 deletions packages/framework/presence/src/presenceDatastoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { ITelemetryLoggerExt } from "@fluidframework/telemetry-utils/intern

import type { ClientConnectionId } from "./baseTypes.js";
import type { BroadcastControlSettings } from "./broadcastControls.js";
import { brandedObjectEntries } from "./internalTypes.js";
import type { IEphemeralRuntime } from "./internalTypes.js";
import type { ClientSessionId, ISessionClient } from "./presence.js";
import type {
Expand All @@ -17,8 +18,13 @@ import type {
PresenceStatesInternal,
ValueElementMap,
} from "./presenceStates.js";
import { createPresenceStates, mergeUntrackedDatastore } from "./presenceStates.js";
import {
createPresenceStates,
mergeUntrackedDatastore,
mergeValueDirectory,
} from "./presenceStates.js";
import type { SystemWorkspaceDatastore } from "./systemWorkspace.js";
import { TimerManager } from "./timerManager.js";
import type {
PresenceStates,
PresenceStatesSchema,
Expand Down Expand Up @@ -93,6 +99,43 @@ export interface PresenceDatastoreManager {
processSignal(message: IExtensionMessage, local: boolean): void;
}

function mergeGeneralDatastoreMessageContent(
base: GeneralDatastoreMessageContent | undefined,
newData: GeneralDatastoreMessageContent,
): GeneralDatastoreMessageContent {
// This function-local "datastore" will hold the merged message data.
const queueDatastore = base ?? {};

// Merge the current data with the existing data, if any exists.
// Iterate over the current message data; individual items are workspaces.
for (const [workspaceName, workspaceData] of Object.entries(newData)) {
// Initialize the merged data as the queued datastore entry for the workspace.
// Since the key might not exist, create an empty object in that case. It will
// be set explicitly after the loop.
const mergedData = queueDatastore[workspaceName] ?? {};

// Iterate over each value manager and its data, merging it as needed.
for (const valueManagerKey of Object.keys(workspaceData)) {
for (const [clientSessionId, value] of brandedObjectEntries(
workspaceData[valueManagerKey],
)) {
mergedData[valueManagerKey] ??= {};
const oldData = mergedData[valueManagerKey][clientSessionId];
mergedData[valueManagerKey][clientSessionId] = mergeValueDirectory(
oldData,
value,
0, // local values do not need a time shift
);
}
}

// Store the merged data in the function-local queue workspace. The whole contents of this
// datastore will be sent as the message data.
queueDatastore[workspaceName] = mergedData;
}
return queueDatastore;
}

/**
* Manages singleton datastore for all Presence.
*/
Expand All @@ -101,7 +144,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
private averageLatency = 0;
private returnedMessages = 0;
private refreshBroadcastRequested = false;

private readonly timer = new TimerManager();
private readonly workspaces = new Map<
string,
PresenceWorkspaceEntry<PresenceStatesSchema>
Expand Down Expand Up @@ -162,27 +205,17 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
return;
}

const clientConnectionId = this.runtime.clientId;
assert(clientConnectionId !== undefined, 0xa59 /* Client connected without clientId */);
const currentClientToSessionValueState =
this.datastore["system:presence"].clientToSessionId[clientConnectionId];

const updates: GeneralDatastoreMessageContent[InternalWorkspaceAddress] = {};
for (const [key, value] of Object.entries(states)) {
updates[key] = { [this.clientSessionId]: value };
}
this.localUpdate({
// Always send current connection mapping for some resiliency against
// lost signals. This ensures that client session id found in `updates`
// (which is this client's client session id) is always represented in
// system workspace of recipient clients.
"system:presence": {
clientToSessionId: {
[clientConnectionId]: { ...currentClientToSessionValueState },
},

this.enqueueMessage(
{
[internalWorkspaceAddress]: updates,
},
[internalWorkspaceAddress]: updates,
});
options,
);
};

const entry = createPresenceStates(
Expand All @@ -200,14 +233,96 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
return entry.public;
}

private localUpdate(data: DatastoreMessageContent): void {
const content = {
/**
* The combined contents of all queued updates. Will be undefined when no messages are queued.
*/
private queuedData: GeneralDatastoreMessageContent | undefined;

/**
* Enqueues a new message to be sent. The message may be queued or may be sent immediately depending on the state of
* the send timer, other messages in the queue, the configured allowed latency, etc.
*/
private enqueueMessage(
data: GeneralDatastoreMessageContent,
options: RuntimeLocalUpdateOptions,
): void {
// Merging the message with any queued messages effectively queues the message.
// It is OK to queue all incoming messages as long as when we send, we send the queued data.
this.queuedData = mergeGeneralDatastoreMessageContent(this.queuedData, data);

const { allowableUpdateLatencyMs } = options;
const now = Date.now();
const thisMessageDeadline = now + allowableUpdateLatencyMs;

if (
// If the timer has not expired, we can short-circuit because the timer will fire
// and cover this update. In other words, queuing this will be fast enough to
// meet its deadline, because a timer is already scheduled to fire before its deadline.
!this.timer.hasExpired() &&
// If the deadline for this message is later than the overall send deadline, then
// we can exit early since a timer will take care of sending it.
thisMessageDeadline >= this.timer.expireTime
) {
return;
}

// Either we need to send this message immediately, or we need to schedule a timer
// to fire at the send deadline that will take care of it.

// Note that timeoutInMs === allowableUpdateLatency, but the calculation is done this way for clarity.
const timeoutInMs = thisMessageDeadline - now;
const scheduleForLater = timeoutInMs > 0;

if (scheduleForLater) {
// Schedule the queued messages to be sent at the updateDeadline
this.timer.setTimeout(this.sendQueuedMessage.bind(this), timeoutInMs);
} else {
this.sendQueuedMessage();
}
}

/**
* Send any queued signal immediately. Does nothing if no message is queued.
*/
private sendQueuedMessage(): void {
this.timer.clearTimeout();

if (this.queuedData === undefined) {
return;
}

// Check for connectivity before sending updates.
if (!this.runtime.connected) {
// Clear the queued data since we're disconnected. We don't want messages
// to queue infinitely while disconnected.
this.queuedData = undefined;
return;
}

const clientConnectionId = this.runtime.clientId;
assert(clientConnectionId !== undefined, 0xa59 /* Client connected without clientId */);
const currentClientToSessionValueState =
this.datastore["system:presence"].clientToSessionId[clientConnectionId];

const newMessage = {
sendTimestamp: Date.now(),
avgLatency: this.averageLatency,
// isComplete: false,
data,
data: {
// Always send current connection mapping for some resiliency against
// lost signals. This ensures that client session id found in `updates`
// (which is this client's client session id) is always represented in
// system workspace of recipient clients.
"system:presence": {
clientToSessionId: {
[clientConnectionId]: { ...currentClientToSessionValueState },
},
},
...this.queuedData,
},
} satisfies DatastoreUpdateMessage["content"];
this.runtime.submitSignal(datastoreUpdateMessageType, content);
this.queuedData = undefined;
this.runtime.submitSignal(datastoreUpdateMessageType, newMessage);
}

private broadcastAllKnownState(): void {
Expand Down
7 changes: 6 additions & 1 deletion packages/framework/presence/src/presenceStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ function isValueDirectory<
return "items" in value;
}

function mergeValueDirectory<
/**
* Merge a value directory.
*
* @internal
*/
export function mergeValueDirectory<
T,
TValueState extends
| InternalTypes.ValueRequiredState<T>
Expand Down
Loading

0 comments on commit abde76d

Please sign in to comment.