Skip to content

Commit

Permalink
Refactor LiveObjects to maintain internal state sync state
Browse files Browse the repository at this point in the history
  • Loading branch information
VeskeR committed Jan 31, 2025
1 parent dc90742 commit 98d66e2
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,34 @@ import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage, StateOperationAction } from './statemessage';
import { SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';

enum LiveObjectsEvents {
SyncCompleted = 'SyncCompleted',
export enum LiveObjectsEvent {
syncing = 'syncing',
synced = 'synced',
}

export enum LiveObjectsState {
initialized = 'initialized',
syncing = 'syncing',
synced = 'synced',
}

const StateToEventsMap: Record<LiveObjectsState, LiveObjectsEvent | undefined> = {
initialized: undefined,
syncing: LiveObjectsEvent.syncing,
synced: LiveObjectsEvent.synced,
};

type BatchCallback = (batchContext: BatchContext) => void;

export class LiveObjects {
private _client: BaseClient;
private _channel: RealtimeChannel;
private _state: LiveObjectsState;
// composition over inheritance since we cannot import class directly into plugin code.
// instead we obtain a class type from the client
private _eventEmitter: EventEmitter;
private _liveObjectsPool: LiveObjectsPool;
private _syncLiveObjectsDataPool: SyncLiveObjectsDataPool;
private _syncInProgress: boolean;
private _currentSyncId: string | undefined;
private _currentSyncCursor: string | undefined;
private _bufferedStateOperations: StateMessage[];
Expand All @@ -36,10 +49,10 @@ export class LiveObjects {
constructor(channel: RealtimeChannel) {
this._channel = channel;
this._client = channel.client;
this._state = LiveObjectsState.initialized;
this._eventEmitter = new this._client.EventEmitter(this._client.logger);
this._liveObjectsPool = new LiveObjectsPool(this);
this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this);
this._syncInProgress = true;
this._bufferedStateOperations = [];
}

Expand All @@ -51,9 +64,9 @@ export class LiveObjects {
async getRoot<T extends API.LiveMapType = API.DefaultRoot>(): Promise<LiveMap<T>> {
this.throwIfMissingStateSubscribeMode();

// SYNC is currently in progress, wait for SYNC sequence to finish
if (this._syncInProgress) {
await this._eventEmitter.once(LiveObjectsEvents.SyncCompleted);
// if we're not synced yet, wait for SYNC sequence to finish before returning root
if (this._state !== LiveObjectsState.synced) {
await this._eventEmitter.once(LiveObjectsEvent.synced);
}

return this._liveObjectsPool.get(ROOT_OBJECT_ID) as LiveMap<T>;
Expand Down Expand Up @@ -183,7 +196,7 @@ export class LiveObjects {
* @internal
*/
handleStateMessages(stateMessages: StateMessage[]): void {
if (this._syncInProgress) {
if (this._state !== LiveObjectsState.synced) {
// The client receives state messages in realtime over the channel concurrently with the SYNC sequence.
// Some of the incoming state messages may have already been applied to the state objects described in
// the SYNC sequence, but others may not; therefore we must buffer these messages so that we can apply
Expand Down Expand Up @@ -274,7 +287,7 @@ export class LiveObjects {
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
this._syncInProgress = true;
this._stateChange(LiveObjectsState.syncing);
}

private _endSync(): void {
Expand All @@ -287,8 +300,7 @@ export class LiveObjects {
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = undefined;
this._currentSyncCursor = undefined;
this._syncInProgress = false;
this._eventEmitter.emit(LiveObjectsEvents.SyncCompleted);
this._stateChange(LiveObjectsState.synced);
}

private _parseSyncChannelSerial(syncChannelSerial: string | null | undefined): {
Expand Down Expand Up @@ -407,4 +419,18 @@ export class LiveObjects {
throw new this._client.ErrorInfo(`"${expectedMode}" channel mode must be set for this operation`, 40160, 400);
}
}

private _stateChange(state: LiveObjectsState): void {
if (this._state === state) {
return;
}

this._state = state;
const event = StateToEventsMap[state];
if (!event) {
return;
}

this._eventEmitter.emit(event);
}
}

0 comments on commit 98d66e2

Please sign in to comment.