Skip to content

Commit

Permalink
Resume to-device message queue after resumed sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Anderas committed Nov 30, 2022
1 parent ad16b26 commit c5bc715
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/ToDeviceMessageQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ limitations under the License.
*/

import { logger } from "./logger";
import { MatrixError, MatrixClient } from "./matrix";
import { MatrixError, MatrixClient, ClientEvent } from "./matrix";
import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage";
import { MatrixScheduler } from "./scheduler";
import { SyncState } from "./sync";

const MAX_BATCH_SIZE = 20;

Expand All @@ -37,12 +38,14 @@ export class ToDeviceMessageQueue {
public start(): void {
this.running = true;
this.sendQueue();
this.client.on(ClientEvent.Sync, this.onResumedSync);
}

public stop(): void {
this.running = false;
if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
this.retryTimeout = null;
this.client.removeListener(ClientEvent.Sync, this.onResumedSync);
}

public async queueBatch(batch: ToDeviceBatch): Promise<void> {
Expand Down Expand Up @@ -128,4 +131,15 @@ export class ToDeviceMessageQueue {

await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId);
}

/**
* Listen to sync state changes and automatically resend any pending events
* once syncing is resumed
*/
private onResumedSync = (state: SyncState | null, oldState: SyncState | null): void => {
if (state === SyncState.Syncing && oldState !== SyncState.Syncing) {
logger.info(`Resuming queue after resumed sync`);
this.sendQueue();
}
};
}

0 comments on commit c5bc715

Please sign in to comment.