Skip to content

Commit

Permalink
Add new hooks in the replication write (#4754)
Browse files Browse the repository at this point in the history
* add new hooks in the replication write

* Fix build not passing

* Fix code style
  • Loading branch information
jonathan-neugber authored Jun 2, 2023
1 parent 73a69eb commit a73c91a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
16 changes: 15 additions & 1 deletion src/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,21 @@ export const HOOKS: { [k: string]: any[]; } = {
* runs after a database has been removed
* @async
*/
postRemoveRxDatabase: []
postRemoveRxDatabase: [],

/**
* runs before the replication writes the rows to master
* but before the rows have been modified
* @async
*/
preReplicationMasterWrite: [],

/**
* runs after the replication has been sent to the server
* but before the new documents have been handled
* @async
*/
preReplicationMasterWriteDocumentsHandle: [],
};

export function runPluginHooks(hookKey: string, obj: any) {
Expand Down
22 changes: 22 additions & 0 deletions src/plugins/replication/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ import {
import { addRxPlugin } from '../../plugin';
import { hasEncryption } from '../../rx-storage-helper';
import { overwritable } from '../../overwritable';
import {
runAsyncPluginHooks
} from '../../hooks';


export const REPLICATION_STATE_BY_COLLECTION: WeakMap<RxCollection, RxReplicationState<any, any>[]> = new WeakMap();
Expand Down Expand Up @@ -240,6 +243,12 @@ export class RxReplicationState<RxDocType, CheckpointType> {
return [];
}
let done = false;

await runAsyncPluginHooks('preReplicationMasterWrite', {
rows,
collection: this.collection
});

const useRows = await Promise.all(
rows.map(async (row) => {
row.newDocumentState = await pushModifier(row.newDocumentState);
Expand All @@ -257,6 +266,13 @@ export class RxReplicationState<RxDocType, CheckpointType> {
);

let result: WithDeleted<RxDocType>[] = null as any;

// In case all the rows have been filtered and nothing has to be sent
if (useRows.length === 0) {
done = true;
result = [];
}

while (!done && !this.isStopped()) {
try {
result = await this.push.handler(useRows);
Expand Down Expand Up @@ -290,6 +306,12 @@ export class RxReplicationState<RxDocType, CheckpointType> {
if (this.isStopped()) {
return [];
}

await runAsyncPluginHooks('preReplicationMasterWriteDocumentsHandle', {
result,
collection: this.collection
});

const conflicts = handlePulledDocuments(this.collection, this.deletedField, ensureNotFalsy(result));
return conflicts;
}
Expand Down
12 changes: 11 additions & 1 deletion src/types/rx-plugin.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import type {
RxDatabase,
RxDatabaseCreator,
RxDocument,
RxStorage
RxStorage,
RxReplicationWriteToMasterRow,
WithDeleted
} from '../types';
import type { RxSchema } from '../rx-schema';

Expand Down Expand Up @@ -128,5 +130,13 @@ export interface RxPlugin {
preCreateRxStorageInstance?: RxPluginHooks<RxStorageInstanceCreationParams<any, any>>;
preMigrateDocument?: RxPluginHooks<any>;
postMigrateDocument?: RxPluginHooks<any>;
preReplicationMasterWrite?: RxPluginHooks<{
rows: RxReplicationWriteToMasterRow<any>[];
collection: RxCollection;
}>;
preReplicationMasterWriteDocumentsHandle?: RxPluginHooks<{
result: WithDeleted<any>[];
collection: RxCollection;
}>;
};
}

0 comments on commit a73c91a

Please sign in to comment.