Skip to content

Commit

Permalink
REFACTOR bulkRemove (#6634)
Browse files Browse the repository at this point in the history
* REFACTOR bulkRemove

* FIX docs
  • Loading branch information
pubkey authored Nov 25, 2024
1 parent 9b845b6 commit bf72ef0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 93 deletions.
9 changes: 9 additions & 0 deletions docs-src/docs/rx-collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ const result = await myCollection.bulkRemove([
// }
```

Instead of providing the document ids, you can also use the [RxDocument](./rx-document.md) instances. This can have better performance if your code knows them already at the moment of removing them:
```js
const result = await myCollection.bulkRemove([
myRxDocument1,
myRxDocument2,
/* ... */
]);
```

### upsert()
Inserts the document if it does not exist within the collection, otherwise it will overwrite it. Returns the new or overwritten RxDocument.
```js
Expand Down
30 changes: 24 additions & 6 deletions src/rx-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,12 @@ export class RxCollectionBase<
}

async bulkRemove(
ids: string[]
/**
* You can either remove the documents by their ids
* or by directly providing the RxDocument instances
* if you have them already. This improves performance a bit.
*/
idsOrDocs: string[] | RxDocument<RxDocumentType>[]
): Promise<{
success: RxDocument<RxDocumentType, OrmMethods>[];
error: RxStorageWriteError<RxDocumentType>[];
Expand All @@ -447,14 +452,21 @@ export class RxCollectionBase<
* Optimization shortcut,
* do nothing when called with an empty array
*/
if (ids.length === 0) {
if (idsOrDocs.length === 0) {
return {
success: [],
error: []
};
}

const rxDocumentMap = await this.findByIds(ids).exec();
let rxDocumentMap: Map<string, RxDocument<RxDocumentType, OrmMethods>>;
if (typeof idsOrDocs[0] === 'string') {
rxDocumentMap = await this.findByIds(idsOrDocs as string[]).exec();
} else {
rxDocumentMap = new Map();
(idsOrDocs as RxDocument<RxDocumentType, OrmMethods>[]).forEach(d => rxDocumentMap.set(d.primary, d));
}

const docsData: RxDocumentData<RxDocumentType>[] = [];
const docsMap: Map<string, RxDocumentData<RxDocumentType>> = new Map();
Array.from(rxDocumentMap.values()).forEach(rxDocument => {
Expand Down Expand Up @@ -488,7 +500,14 @@ export class RxCollectionBase<
removeDocs,
results
);
const successIds: string[] = success.map(d => d[primaryPath] as string);

const deletedRxDocuments: RxDocument<RxDocumentType, OrmMethods>[] = [];
const successIds: string[] = success.map(d => {
const id = d[primaryPath] as string;
const doc = this._docCache.getCachedRxDocument(d);
deletedRxDocuments.push(doc);
return id;
});

// run hooks
await Promise.all(
Expand All @@ -502,10 +521,9 @@ export class RxCollectionBase<
})
);

const rxDocuments = successIds.map(id => getFromMapOrThrow(rxDocumentMap, id));

return {
success: rxDocuments,
success: deletedRxDocuments,
error: results.error
};
}
Expand Down
82 changes: 11 additions & 71 deletions src/rx-document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
RXJS_SHARE_REPLAY_DEFAULTS,
getProperty,
getFromMapOrCreate,
getFromMapOrThrow,
ensureNotFalsy
} from './plugins/utils/index.ts';
import {
Expand All @@ -34,9 +33,7 @@ import type {
RxDocumentWriteData,
UpdateQuery,
CRDTEntry,
ModifyFunction,
BulkWriteRow,
RxStorageWriteError
ModifyFunction
} from './types/index.d.ts';
import { getDocumentDataOfRxChangeEvent } from './rx-change-event.ts';
import { overwritable } from './overwritable.ts';
Expand Down Expand Up @@ -371,21 +368,24 @@ export const basePrototype = {
* instead deleted documents get flagged with _deleted=true.
*/
async remove(this: RxDocument): Promise<RxDocument> {
const collection = this.collection;
if (this.deleted) {
return Promise.reject(newRxError('DOC13', {
document: this,
id: this.primary
}));
}

const removeResult = await removeRxDocuments([this]);
if (removeResult.errors.length > 0) {
const error = removeResult.errors[0];
const deletedData = getFromMapOrThrow(removeResult.deleteDataById, this.primary);
throwIfIsStorageWriteError(collection, this.primary, deletedData, error);
const removeResult = await this.collection.bulkRemove([this]);
if (removeResult.error.length > 0) {
const error = removeResult.error[0];
throwIfIsStorageWriteError(
this.collection,
this.primary,
this._data,
error
);
}
return removeResult.docs[0];
return removeResult.success[0];
},
incrementalRemove(this: RxDocument): Promise<RxDocument> {
return this.incrementalModify(async (docData) => {
Expand Down Expand Up @@ -537,63 +537,3 @@ function getDocumentProperty(doc: RxDocument, objPath: string): any | null {
}
);
};



/**
* To remove documents we use a bulk operations
* to have a better performance on RxQuery.find().remove();
*/
export async function removeRxDocuments<RxDocType>(
docs: RxDocument<RxDocType>[]
): Promise<{
errors: RxStorageWriteError<RxDocType>[];
docs: RxDocument<RxDocType>[];
deleteDataById: Map<string, RxDocumentData<RxDocType>>;
}> {
if (docs.length === 0) {
throw newRxError('SNH');
}
const collection = docs[0].collection;
const primaryPath = collection.schema.primaryPath;
const writeRows: BulkWriteRow<RxDocType>[] = [];
const docsById = new Map<string, RxDocument<RxDocType>>();
const deleteDataById = new Map<string, RxDocumentData<RxDocType>>();
await Promise.all(
docs.map(async (doc) => {
docsById.set(doc.primary, doc);
const deletedData = flatClone(doc._data);
await collection._runHooks('pre', 'remove', deletedData, doc);
deletedData._deleted = true;
deleteDataById.set(doc.primary, deletedData);
const writeRow = {
previous: doc._data,
document: deletedData
};
writeRows.push(writeRow);
})
);
const writeResult = await collection.storageInstance.bulkWrite(writeRows, 'rx-document-remove');
const errors: RxStorageWriteError<RxDocType>[] = writeResult.error;
const writtenDocsData = getWrittenDocumentsFromBulkWriteResponse(
primaryPath,
writeRows,
writeResult
);
const returnDocs: RxDocument<RxDocType>[] = [];
await Promise.all(
writtenDocsData.map(async (writtenDocData) => {
const id = writtenDocData[primaryPath] as string;
const doc = getFromMapOrThrow(docsById, id);
const deletedData = getFromMapOrThrow(deleteDataById, id);
await collection._runHooks('post', 'remove', deletedData, doc);
returnDocs.push(collection._docCache.getCachedRxDocument(writtenDocData));
})
);

return {
docs: returnDocs,
errors,
deleteDataById
};
}
30 changes: 14 additions & 16 deletions src/rx-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
appendToArray
} from './plugins/utils/index.ts';
import {
newRxError
newRxError,
rxStorageWriteErrorToRxError
} from './rx-error.ts';
import {
runPluginHooks
Expand Down Expand Up @@ -53,7 +54,6 @@ import {

} from './rx-query-helper.ts';
import { RxQuerySingleResult } from './rx-query-single-result.ts';
import { removeRxDocuments } from './rx-document.ts';

let _queryCount = 0;
const newQueryID = function (): number {
Expand Down Expand Up @@ -379,20 +379,18 @@ export class RxQueryBase<
* deletes all found documents
* @return promise with deleted documents
*/
remove(): Promise<RxQueryResult> {
return this
.exec()
.then(docs => {
if (Array.isArray(docs)) {
if (docs.length === 0) {
return [];
} else {
return removeRxDocuments(docs).then(r => r.docs);
}
} else {
return (docs as any).remove();
}
});
async remove(): Promise<RxQueryResult> {
const docs = await this.exec();
if (Array.isArray(docs)) {
const result = await this.collection.bulkRemove(docs);
if (result.error.length > 0) {
throw rxStorageWriteErrorToRxError(result.error[0]);
} else {
return result.success as any;
}
} else {
return (docs as any).remove();
}
}
incrementalRemove(): Promise<RxQueryResult> {
return runQueryUpdateFunction(
Expand Down

0 comments on commit bf72ef0

Please sign in to comment.